##// END OF EJS Templates
Merging vvatsa's ipcluster-dev branch....
Brian Granger -
r1833:e4b173fe merge
parent child Browse files
Show More
@@ -1,903 +1,903 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3
3
4 """A Twisted Service Representation of the IPython core.
4 """A Twisted Service Representation of the IPython core.
5
5
6 The IPython Core exposed to the network is called the Engine. Its
6 The IPython Core exposed to the network is called the Engine. Its
7 representation in Twisted in the EngineService. Interfaces and adapters
7 representation in Twisted in the EngineService. Interfaces and adapters
8 are used to abstract out the details of the actual network protocol used.
8 are used to abstract out the details of the actual network protocol used.
9 The EngineService is an Engine that knows nothing about the actual protocol
9 The EngineService is an Engine that knows nothing about the actual protocol
10 used.
10 used.
11
11
12 The EngineService is exposed with various network protocols in modules like:
12 The EngineService is exposed with various network protocols in modules like:
13
13
14 enginepb.py
14 enginepb.py
15 enginevanilla.py
15 enginevanilla.py
16
16
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 felt that we had over-engineered things. To improve the maintainability of the
18 felt that we had over-engineered things. To improve the maintainability of the
19 code we have taken out the ICompleteEngine interface and the completeEngine
19 code we have taken out the ICompleteEngine interface and the completeEngine
20 method that automatically added methods to engines.
20 method that automatically added methods to engines.
21
21
22 """
22 """
23
23
24 __docformat__ = "restructuredtext en"
24 __docformat__ = "restructuredtext en"
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Copyright (C) 2008 The IPython Development Team
27 # Copyright (C) 2008 The IPython Development Team
28 #
28 #
29 # Distributed under the terms of the BSD License. The full license is in
29 # Distributed under the terms of the BSD License. The full license is in
30 # the file COPYING, distributed as part of this software.
30 # the file COPYING, distributed as part of this software.
31 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
32
32
33 #-------------------------------------------------------------------------------
33 #-------------------------------------------------------------------------------
34 # Imports
34 # Imports
35 #-------------------------------------------------------------------------------
35 #-------------------------------------------------------------------------------
36
36
37 import os, sys, copy
37 import os, sys, copy
38 import cPickle as pickle
38 import cPickle as pickle
39 from new import instancemethod
39 from new import instancemethod
40
40
41 from twisted.application import service
41 from twisted.application import service
42 from twisted.internet import defer, reactor
42 from twisted.internet import defer, reactor
43 from twisted.python import log, failure, components
43 from twisted.python import log, failure, components
44 import zope.interface as zi
44 import zope.interface as zi
45
45
46 from IPython.kernel.core.interpreter import Interpreter
46 from IPython.kernel.core.interpreter import Interpreter
47 from IPython.kernel import newserialized, error, util
47 from IPython.kernel import newserialized, error, util
48 from IPython.kernel.util import printer
48 from IPython.kernel.util import printer
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
50 from IPython.kernel import codeutil
50 from IPython.kernel import codeutil
51
51
52
52
53 #-------------------------------------------------------------------------------
53 #-------------------------------------------------------------------------------
54 # Interface specification for the Engine
54 # Interface specification for the Engine
55 #-------------------------------------------------------------------------------
55 #-------------------------------------------------------------------------------
56
56
57 class IEngineCore(zi.Interface):
57 class IEngineCore(zi.Interface):
58 """The minimal required interface for the IPython Engine.
58 """The minimal required interface for the IPython Engine.
59
59
60 This interface provides a formal specification of the IPython core.
60 This interface provides a formal specification of the IPython core.
61 All these methods should return deferreds regardless of what side of a
61 All these methods should return deferreds regardless of what side of a
62 network connection they are on.
62 network connection they are on.
63
63
64 In general, this class simply wraps a shell class and wraps its return
64 In general, this class simply wraps a shell class and wraps its return
65 values as Deferred objects. If the underlying shell class method raises
65 values as Deferred objects. If the underlying shell class method raises
66 an exception, this class should convert it to a twisted.failure.Failure
66 an exception, this class should convert it to a twisted.failure.Failure
67 that will be propagated along the Deferred's errback chain.
67 that will be propagated along the Deferred's errback chain.
68
68
69 In addition, Failures are aggressive. By this, we mean that if a method
69 In addition, Failures are aggressive. By this, we mean that if a method
70 is performing multiple actions (like pulling multiple object) if any
70 is performing multiple actions (like pulling multiple object) if any
71 single one fails, the entire method will fail with that Failure. It is
71 single one fails, the entire method will fail with that Failure. It is
72 all or nothing.
72 all or nothing.
73 """
73 """
74
74
75 id = zi.interface.Attribute("the id of the Engine object")
75 id = zi.interface.Attribute("the id of the Engine object")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
77
77
78 def execute(lines):
78 def execute(lines):
79 """Execute lines of Python code.
79 """Execute lines of Python code.
80
80
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
82 upon success.
82 upon success.
83
83
84 Returns a failure object if the execution of lines raises an exception.
84 Returns a failure object if the execution of lines raises an exception.
85 """
85 """
86
86
87 def push(namespace):
87 def push(namespace):
88 """Push dict namespace into the user's namespace.
88 """Push dict namespace into the user's namespace.
89
89
90 Returns a deferred to None or a failure.
90 Returns a deferred to None or a failure.
91 """
91 """
92
92
93 def pull(keys):
93 def pull(keys):
94 """Pulls values out of the user's namespace by keys.
94 """Pulls values out of the user's namespace by keys.
95
95
96 Returns a deferred to a tuple objects or a single object.
96 Returns a deferred to a tuple objects or a single object.
97
97
98 Raises NameError if any one of objects doess not exist.
98 Raises NameError if any one of objects doess not exist.
99 """
99 """
100
100
101 def push_function(namespace):
101 def push_function(namespace):
102 """Push a dict of key, function pairs into the user's namespace.
102 """Push a dict of key, function pairs into the user's namespace.
103
103
104 Returns a deferred to None or a failure."""
104 Returns a deferred to None or a failure."""
105
105
106 def pull_function(keys):
106 def pull_function(keys):
107 """Pulls functions out of the user's namespace by keys.
107 """Pulls functions out of the user's namespace by keys.
108
108
109 Returns a deferred to a tuple of functions or a single function.
109 Returns a deferred to a tuple of functions or a single function.
110
110
111 Raises NameError if any one of the functions does not exist.
111 Raises NameError if any one of the functions does not exist.
112 """
112 """
113
113
114 def get_result(i=None):
114 def get_result(i=None):
115 """Get the stdin/stdout/stderr of command i.
115 """Get the stdin/stdout/stderr of command i.
116
116
117 Returns a deferred to a dict with keys
117 Returns a deferred to a dict with keys
118 (id, number, stdin, stdout, stderr).
118 (id, number, stdin, stdout, stderr).
119
119
120 Raises IndexError if command i does not exist.
120 Raises IndexError if command i does not exist.
121 Raises TypeError if i in not an int.
121 Raises TypeError if i in not an int.
122 """
122 """
123
123
124 def reset():
124 def reset():
125 """Reset the shell.
125 """Reset the shell.
126
126
127 This clears the users namespace. Won't cause modules to be
127 This clears the users namespace. Won't cause modules to be
128 reloaded. Should also re-initialize certain variables like id.
128 reloaded. Should also re-initialize certain variables like id.
129 """
129 """
130
130
131 def kill():
131 def kill():
132 """Kill the engine by stopping the reactor."""
132 """Kill the engine by stopping the reactor."""
133
133
134 def keys():
134 def keys():
135 """Return the top level variables in the users namspace.
135 """Return the top level variables in the users namspace.
136
136
137 Returns a deferred to a dict."""
137 Returns a deferred to a dict."""
138
138
139
139
140 class IEngineSerialized(zi.Interface):
140 class IEngineSerialized(zi.Interface):
141 """Push/Pull methods that take Serialized objects.
141 """Push/Pull methods that take Serialized objects.
142
142
143 All methods should return deferreds.
143 All methods should return deferreds.
144 """
144 """
145
145
146 def push_serialized(namespace):
146 def push_serialized(namespace):
147 """Push a dict of keys and Serialized objects into the user's namespace."""
147 """Push a dict of keys and Serialized objects into the user's namespace."""
148
148
149 def pull_serialized(keys):
149 def pull_serialized(keys):
150 """Pull objects by key from the user's namespace as Serialized.
150 """Pull objects by key from the user's namespace as Serialized.
151
151
152 Returns a list of or one Serialized.
152 Returns a list of or one Serialized.
153
153
154 Raises NameError is any one of the objects does not exist.
154 Raises NameError is any one of the objects does not exist.
155 """
155 """
156
156
157
157
158 class IEngineProperties(zi.Interface):
158 class IEngineProperties(zi.Interface):
159 """Methods for access to the properties object of an Engine"""
159 """Methods for access to the properties object of an Engine"""
160
160
161 properties = zi.Attribute("A StrictDict object, containing the properties")
161 properties = zi.Attribute("A StrictDict object, containing the properties")
162
162
163 def set_properties(properties):
163 def set_properties(properties):
164 """set properties by key and value"""
164 """set properties by key and value"""
165
165
166 def get_properties(keys=None):
166 def get_properties(keys=None):
167 """get a list of properties by `keys`, if no keys specified, get all"""
167 """get a list of properties by `keys`, if no keys specified, get all"""
168
168
169 def del_properties(keys):
169 def del_properties(keys):
170 """delete properties by `keys`"""
170 """delete properties by `keys`"""
171
171
172 def has_properties(keys):
172 def has_properties(keys):
173 """get a list of bool values for whether `properties` has `keys`"""
173 """get a list of bool values for whether `properties` has `keys`"""
174
174
175 def clear_properties():
175 def clear_properties():
176 """clear the properties dict"""
176 """clear the properties dict"""
177
177
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
179 """The basic engine interface that EngineService will implement.
179 """The basic engine interface that EngineService will implement.
180
180
181 This exists so it is easy to specify adapters that adapt to and from the
181 This exists so it is easy to specify adapters that adapt to and from the
182 API that the basic EngineService implements.
182 API that the basic EngineService implements.
183 """
183 """
184 pass
184 pass
185
185
186 class IEngineQueued(IEngineBase):
186 class IEngineQueued(IEngineBase):
187 """Interface for adding a queue to an IEngineBase.
187 """Interface for adding a queue to an IEngineBase.
188
188
189 This interface extends the IEngineBase interface to add methods for managing
189 This interface extends the IEngineBase interface to add methods for managing
190 the engine's queue. The implicit details of this interface are that the
190 the engine's queue. The implicit details of this interface are that the
191 execution of all methods declared in IEngineBase should appropriately be
191 execution of all methods declared in IEngineBase should appropriately be
192 put through a queue before execution.
192 put through a queue before execution.
193
193
194 All methods should return deferreds.
194 All methods should return deferreds.
195 """
195 """
196
196
197 def clear_queue():
197 def clear_queue():
198 """Clear the queue."""
198 """Clear the queue."""
199
199
200 def queue_status():
200 def queue_status():
201 """Get the queued and pending commands in the queue."""
201 """Get the queued and pending commands in the queue."""
202
202
203 def register_failure_observer(obs):
203 def register_failure_observer(obs):
204 """Register an observer of pending Failures.
204 """Register an observer of pending Failures.
205
205
206 The observer must implement IFailureObserver.
206 The observer must implement IFailureObserver.
207 """
207 """
208
208
209 def unregister_failure_observer(obs):
209 def unregister_failure_observer(obs):
210 """Unregister an observer of pending Failures."""
210 """Unregister an observer of pending Failures."""
211
211
212
212
213 class IEngineThreaded(zi.Interface):
213 class IEngineThreaded(zi.Interface):
214 """A place holder for threaded commands.
214 """A place holder for threaded commands.
215
215
216 All methods should return deferreds.
216 All methods should return deferreds.
217 """
217 """
218 pass
218 pass
219
219
220
220
221 #-------------------------------------------------------------------------------
221 #-------------------------------------------------------------------------------
222 # Functions and classes to implement the EngineService
222 # Functions and classes to implement the EngineService
223 #-------------------------------------------------------------------------------
223 #-------------------------------------------------------------------------------
224
224
225
225
226 class StrictDict(dict):
226 class StrictDict(dict):
227 """This is a strict copying dictionary for use as the interface to the
227 """This is a strict copying dictionary for use as the interface to the
228 properties of an Engine.
228 properties of an Engine.
229
229
230 :IMPORTANT:
230 :IMPORTANT:
231 This object copies the values you set to it, and returns copies to you
231 This object copies the values you set to it, and returns copies to you
232 when you request them. The only way to change properties os explicitly
232 when you request them. The only way to change properties os explicitly
233 through the setitem and getitem of the dictionary interface.
233 through the setitem and getitem of the dictionary interface.
234
234
235 Example:
235 Example:
236 >>> e = get_engine(id)
236 >>> e = get_engine(id)
237 >>> L = [1,2,3]
237 >>> L = [1,2,3]
238 >>> e.properties['L'] = L
238 >>> e.properties['L'] = L
239 >>> L == e.properties['L']
239 >>> L == e.properties['L']
240 True
240 True
241 >>> L.append(99)
241 >>> L.append(99)
242 >>> L == e.properties['L']
242 >>> L == e.properties['L']
243 False
243 False
244
244
245 Note that getitem copies, so calls to methods of objects do not affect
245 Note that getitem copies, so calls to methods of objects do not affect
246 the properties, as seen here:
246 the properties, as seen here:
247
247
248 >>> e.properties[1] = range(2)
248 >>> e.properties[1] = range(2)
249 >>> print e.properties[1]
249 >>> print e.properties[1]
250 [0, 1]
250 [0, 1]
251 >>> e.properties[1].append(2)
251 >>> e.properties[1].append(2)
252 >>> print e.properties[1]
252 >>> print e.properties[1]
253 [0, 1]
253 [0, 1]
254 """
254 """
255 def __init__(self, *args, **kwargs):
255 def __init__(self, *args, **kwargs):
256 dict.__init__(self, *args, **kwargs)
256 dict.__init__(self, *args, **kwargs)
257 self.modified = True
257 self.modified = True
258
258
259 def __getitem__(self, key):
259 def __getitem__(self, key):
260 return copy.deepcopy(dict.__getitem__(self, key))
260 return copy.deepcopy(dict.__getitem__(self, key))
261
261
262 def __setitem__(self, key, value):
262 def __setitem__(self, key, value):
263 # check if this entry is valid for transport around the network
263 # check if this entry is valid for transport around the network
264 # and copying
264 # and copying
265 try:
265 try:
266 pickle.dumps(key, 2)
266 pickle.dumps(key, 2)
267 pickle.dumps(value, 2)
267 pickle.dumps(value, 2)
268 newvalue = copy.deepcopy(value)
268 newvalue = copy.deepcopy(value)
269 except:
269 except:
270 raise error.InvalidProperty(value)
270 raise error.InvalidProperty(value)
271 dict.__setitem__(self, key, newvalue)
271 dict.__setitem__(self, key, newvalue)
272 self.modified = True
272 self.modified = True
273
273
274 def __delitem__(self, key):
274 def __delitem__(self, key):
275 dict.__delitem__(self, key)
275 dict.__delitem__(self, key)
276 self.modified = True
276 self.modified = True
277
277
278 def update(self, dikt):
278 def update(self, dikt):
279 for k,v in dikt.iteritems():
279 for k,v in dikt.iteritems():
280 self[k] = v
280 self[k] = v
281
281
282 def pop(self, key):
282 def pop(self, key):
283 self.modified = True
283 self.modified = True
284 return dict.pop(self, key)
284 return dict.pop(self, key)
285
285
286 def popitem(self):
286 def popitem(self):
287 self.modified = True
287 self.modified = True
288 return dict.popitem(self)
288 return dict.popitem(self)
289
289
290 def clear(self):
290 def clear(self):
291 self.modified = True
291 self.modified = True
292 dict.clear(self)
292 dict.clear(self)
293
293
294 def subDict(self, *keys):
294 def subDict(self, *keys):
295 d = {}
295 d = {}
296 for key in keys:
296 for key in keys:
297 d[key] = self[key]
297 d[key] = self[key]
298 return d
298 return d
299
299
300
300
301
301
302 class EngineAPI(object):
302 class EngineAPI(object):
303 """This is the object through which the user can edit the `properties`
303 """This is the object through which the user can edit the `properties`
304 attribute of an Engine.
304 attribute of an Engine.
305 The Engine Properties object copies all object in and out of itself.
305 The Engine Properties object copies all object in and out of itself.
306 See the EngineProperties object for details.
306 See the EngineProperties object for details.
307 """
307 """
308 _fix=False
308 _fix=False
309 def __init__(self, id):
309 def __init__(self, id):
310 self.id = id
310 self.id = id
311 self.properties = StrictDict()
311 self.properties = StrictDict()
312 self._fix=True
312 self._fix=True
313
313
314 def __setattr__(self, k,v):
314 def __setattr__(self, k,v):
315 if self._fix:
315 if self._fix:
316 raise error.KernelError("I am protected!")
316 raise error.KernelError("I am protected!")
317 else:
317 else:
318 object.__setattr__(self, k, v)
318 object.__setattr__(self, k, v)
319
319
320 def __delattr__(self, key):
320 def __delattr__(self, key):
321 raise error.KernelError("I am protected!")
321 raise error.KernelError("I am protected!")
322
322
323
323
324 _apiDict = {}
324 _apiDict = {}
325
325
326 def get_engine(id):
326 def get_engine(id):
327 """Get the Engine API object, whcih currently just provides the properties
327 """Get the Engine API object, whcih currently just provides the properties
328 object, by ID"""
328 object, by ID"""
329 global _apiDict
329 global _apiDict
330 if not _apiDict.get(id):
330 if not _apiDict.get(id):
331 _apiDict[id] = EngineAPI(id)
331 _apiDict[id] = EngineAPI(id)
332 return _apiDict[id]
332 return _apiDict[id]
333
333
334 def drop_engine(id):
334 def drop_engine(id):
335 """remove an engine"""
335 """remove an engine"""
336 global _apiDict
336 global _apiDict
337 if _apiDict.has_key(id):
337 if _apiDict.has_key(id):
338 del _apiDict[id]
338 del _apiDict[id]
339
339
340 class EngineService(object, service.Service):
340 class EngineService(object, service.Service):
341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
342
342
343 zi.implements(IEngineBase)
343 zi.implements(IEngineBase)
344 name = 'EngineService'
344 name = 'EngineService'
345
345
346 def __init__(self, shellClass=Interpreter, mpi=None):
346 def __init__(self, shellClass=Interpreter, mpi=None):
347 """Create an EngineService.
347 """Create an EngineService.
348
348
349 shellClass: something that implements IInterpreter or core1
349 shellClass: something that implements IInterpreter or core1
350 mpi: an mpi module that has rank and size attributes
350 mpi: an mpi module that has rank and size attributes
351 """
351 """
352 self.shellClass = shellClass
352 self.shellClass = shellClass
353 self.shell = self.shellClass()
353 self.shell = self.shellClass()
354 self.mpi = mpi
354 self.mpi = mpi
355 self.id = None
355 self.id = None
356 self.properties = get_engine(self.id).properties
356 self.properties = get_engine(self.id).properties
357 if self.mpi is not None:
357 if self.mpi is not None:
358 log.msg("MPI started with rank = %i and size = %i" %
358 log.msg("MPI started with rank = %i and size = %i" %
359 (self.mpi.rank, self.mpi.size))
359 (self.mpi.rank, self.mpi.size))
360 self.id = self.mpi.rank
360 self.id = self.mpi.rank
361 self._seedNamespace()
361 self._seedNamespace()
362
362
363 # Make id a property so that the shell can get the updated id
363 # Make id a property so that the shell can get the updated id
364
364
365 def _setID(self, id):
365 def _setID(self, id):
366 self._id = id
366 self._id = id
367 self.properties = get_engine(id).properties
367 self.properties = get_engine(id).properties
368 self.shell.push({'id': id})
368 self.shell.push({'id': id})
369
369
370 def _getID(self):
370 def _getID(self):
371 return self._id
371 return self._id
372
372
373 id = property(_getID, _setID)
373 id = property(_getID, _setID)
374
374
375 def _seedNamespace(self):
375 def _seedNamespace(self):
376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
377
377
378 def executeAndRaise(self, msg, callable, *args, **kwargs):
378 def executeAndRaise(self, msg, callable, *args, **kwargs):
379 """Call a method of self.shell and wrap any exception."""
379 """Call a method of self.shell and wrap any exception."""
380 d = defer.Deferred()
380 d = defer.Deferred()
381 try:
381 try:
382 result = callable(*args, **kwargs)
382 result = callable(*args, **kwargs)
383 except:
383 except:
384 # This gives the following:
384 # This gives the following:
385 # et=exception class
385 # et=exception class
386 # ev=exception class instance
386 # ev=exception class instance
387 # tb=traceback object
387 # tb=traceback object
388 et,ev,tb = sys.exc_info()
388 et,ev,tb = sys.exc_info()
389 # This call adds attributes to the exception value
389 # This call adds attributes to the exception value
390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
391 # Add another attribute
391 # Add another attribute
392 ev._ipython_engine_info = msg
392 ev._ipython_engine_info = msg
393 f = failure.Failure(ev,et,None)
393 f = failure.Failure(ev,et,None)
394 d.errback(f)
394 d.errback(f)
395 else:
395 else:
396 d.callback(result)
396 d.callback(result)
397
397
398 return d
398 return d
399
399
400
400
401 # The IEngine methods. See the interface for documentation.
401 # The IEngine methods. See the interface for documentation.
402
402
403 def execute(self, lines):
403 def execute(self, lines):
404 msg = {'engineid':self.id,
404 msg = {'engineid':self.id,
405 'method':'execute',
405 'method':'execute',
406 'args':[lines]}
406 'args':[lines]}
407 d = self.executeAndRaise(msg, self.shell.execute, lines)
407 d = self.executeAndRaise(msg, self.shell.execute, lines)
408 d.addCallback(self.addIDToResult)
408 d.addCallback(self.addIDToResult)
409 return d
409 return d
410
410
411 def addIDToResult(self, result):
411 def addIDToResult(self, result):
412 result['id'] = self.id
412 result['id'] = self.id
413 return result
413 return result
414
414
415 def push(self, namespace):
415 def push(self, namespace):
416 msg = {'engineid':self.id,
416 msg = {'engineid':self.id,
417 'method':'push',
417 'method':'push',
418 'args':[repr(namespace.keys())]}
418 'args':[repr(namespace.keys())]}
419 d = self.executeAndRaise(msg, self.shell.push, namespace)
419 d = self.executeAndRaise(msg, self.shell.push, namespace)
420 return d
420 return d
421
421
422 def pull(self, keys):
422 def pull(self, keys):
423 msg = {'engineid':self.id,
423 msg = {'engineid':self.id,
424 'method':'pull',
424 'method':'pull',
425 'args':[repr(keys)]}
425 'args':[repr(keys)]}
426 d = self.executeAndRaise(msg, self.shell.pull, keys)
426 d = self.executeAndRaise(msg, self.shell.pull, keys)
427 return d
427 return d
428
428
429 def push_function(self, namespace):
429 def push_function(self, namespace):
430 msg = {'engineid':self.id,
430 msg = {'engineid':self.id,
431 'method':'push_function',
431 'method':'push_function',
432 'args':[repr(namespace.keys())]}
432 'args':[repr(namespace.keys())]}
433 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
433 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
434 return d
434 return d
435
435
436 def pull_function(self, keys):
436 def pull_function(self, keys):
437 msg = {'engineid':self.id,
437 msg = {'engineid':self.id,
438 'method':'pull_function',
438 'method':'pull_function',
439 'args':[repr(keys)]}
439 'args':[repr(keys)]}
440 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
440 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
441 return d
441 return d
442
442
443 def get_result(self, i=None):
443 def get_result(self, i=None):
444 msg = {'engineid':self.id,
444 msg = {'engineid':self.id,
445 'method':'get_result',
445 'method':'get_result',
446 'args':[repr(i)]}
446 'args':[repr(i)]}
447 d = self.executeAndRaise(msg, self.shell.getCommand, i)
447 d = self.executeAndRaise(msg, self.shell.getCommand, i)
448 d.addCallback(self.addIDToResult)
448 d.addCallback(self.addIDToResult)
449 return d
449 return d
450
450
451 def reset(self):
451 def reset(self):
452 msg = {'engineid':self.id,
452 msg = {'engineid':self.id,
453 'method':'reset',
453 'method':'reset',
454 'args':[]}
454 'args':[]}
455 del self.shell
455 del self.shell
456 self.shell = self.shellClass()
456 self.shell = self.shellClass()
457 self.properties.clear()
457 self.properties.clear()
458 d = self.executeAndRaise(msg, self._seedNamespace)
458 d = self.executeAndRaise(msg, self._seedNamespace)
459 return d
459 return d
460
460
461 def kill(self):
461 def kill(self):
462 drop_engine(self.id)
462 drop_engine(self.id)
463 try:
463 try:
464 reactor.stop()
464 reactor.stop()
465 except RuntimeError:
465 except RuntimeError:
466 log.msg('The reactor was not running apparently.')
466 log.msg('The reactor was not running apparently.')
467 return defer.fail()
467 return defer.fail()
468 else:
468 else:
469 return defer.succeed(None)
469 return defer.succeed(None)
470
470
471 def keys(self):
471 def keys(self):
472 """Return a list of variables names in the users top level namespace.
472 """Return a list of variables names in the users top level namespace.
473
473
474 This used to return a dict of all the keys/repr(values) in the
474 This used to return a dict of all the keys/repr(values) in the
475 user's namespace. This was too much info for the ControllerService
475 user's namespace. This was too much info for the ControllerService
476 to handle so it is now just a list of keys.
476 to handle so it is now just a list of keys.
477 """
477 """
478
478
479 remotes = []
479 remotes = []
480 for k in self.shell.user_ns.iterkeys():
480 for k in self.shell.user_ns.iterkeys():
481 if k not in ['__name__', '_ih', '_oh', '__builtins__',
481 if k not in ['__name__', '_ih', '_oh', '__builtins__',
482 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
482 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
483 remotes.append(k)
483 remotes.append(k)
484 return defer.succeed(remotes)
484 return defer.succeed(remotes)
485
485
486 def set_properties(self, properties):
486 def set_properties(self, properties):
487 msg = {'engineid':self.id,
487 msg = {'engineid':self.id,
488 'method':'set_properties',
488 'method':'set_properties',
489 'args':[repr(properties.keys())]}
489 'args':[repr(properties.keys())]}
490 return self.executeAndRaise(msg, self.properties.update, properties)
490 return self.executeAndRaise(msg, self.properties.update, properties)
491
491
492 def get_properties(self, keys=None):
492 def get_properties(self, keys=None):
493 msg = {'engineid':self.id,
493 msg = {'engineid':self.id,
494 'method':'get_properties',
494 'method':'get_properties',
495 'args':[repr(keys)]}
495 'args':[repr(keys)]}
496 if keys is None:
496 if keys is None:
497 keys = self.properties.keys()
497 keys = self.properties.keys()
498 return self.executeAndRaise(msg, self.properties.subDict, *keys)
498 return self.executeAndRaise(msg, self.properties.subDict, *keys)
499
499
500 def _doDel(self, keys):
500 def _doDel(self, keys):
501 for key in keys:
501 for key in keys:
502 del self.properties[key]
502 del self.properties[key]
503
503
504 def del_properties(self, keys):
504 def del_properties(self, keys):
505 msg = {'engineid':self.id,
505 msg = {'engineid':self.id,
506 'method':'del_properties',
506 'method':'del_properties',
507 'args':[repr(keys)]}
507 'args':[repr(keys)]}
508 return self.executeAndRaise(msg, self._doDel, keys)
508 return self.executeAndRaise(msg, self._doDel, keys)
509
509
510 def _doHas(self, keys):
510 def _doHas(self, keys):
511 return [self.properties.has_key(key) for key in keys]
511 return [self.properties.has_key(key) for key in keys]
512
512
513 def has_properties(self, keys):
513 def has_properties(self, keys):
514 msg = {'engineid':self.id,
514 msg = {'engineid':self.id,
515 'method':'has_properties',
515 'method':'has_properties',
516 'args':[repr(keys)]}
516 'args':[repr(keys)]}
517 return self.executeAndRaise(msg, self._doHas, keys)
517 return self.executeAndRaise(msg, self._doHas, keys)
518
518
519 def clear_properties(self):
519 def clear_properties(self):
520 msg = {'engineid':self.id,
520 msg = {'engineid':self.id,
521 'method':'clear_properties',
521 'method':'clear_properties',
522 'args':[]}
522 'args':[]}
523 return self.executeAndRaise(msg, self.properties.clear)
523 return self.executeAndRaise(msg, self.properties.clear)
524
524
525 def push_serialized(self, sNamespace):
525 def push_serialized(self, sNamespace):
526 msg = {'engineid':self.id,
526 msg = {'engineid':self.id,
527 'method':'push_serialized',
527 'method':'push_serialized',
528 'args':[repr(sNamespace.keys())]}
528 'args':[repr(sNamespace.keys())]}
529 ns = {}
529 ns = {}
530 for k,v in sNamespace.iteritems():
530 for k,v in sNamespace.iteritems():
531 try:
531 try:
532 unserialized = newserialized.IUnSerialized(v)
532 unserialized = newserialized.IUnSerialized(v)
533 ns[k] = unserialized.getObject()
533 ns[k] = unserialized.getObject()
534 except:
534 except:
535 return defer.fail()
535 return defer.fail()
536 return self.executeAndRaise(msg, self.shell.push, ns)
536 return self.executeAndRaise(msg, self.shell.push, ns)
537
537
538 def pull_serialized(self, keys):
538 def pull_serialized(self, keys):
539 msg = {'engineid':self.id,
539 msg = {'engineid':self.id,
540 'method':'pull_serialized',
540 'method':'pull_serialized',
541 'args':[repr(keys)]}
541 'args':[repr(keys)]}
542 if isinstance(keys, str):
542 if isinstance(keys, str):
543 keys = [keys]
543 keys = [keys]
544 if len(keys)==1:
544 if len(keys)==1:
545 d = self.executeAndRaise(msg, self.shell.pull, keys)
545 d = self.executeAndRaise(msg, self.shell.pull, keys)
546 d.addCallback(newserialized.serialize)
546 d.addCallback(newserialized.serialize)
547 return d
547 return d
548 elif len(keys)>1:
548 elif len(keys)>1:
549 d = self.executeAndRaise(msg, self.shell.pull, keys)
549 d = self.executeAndRaise(msg, self.shell.pull, keys)
550 @d.addCallback
550 @d.addCallback
551 def packThemUp(values):
551 def packThemUp(values):
552 serials = []
552 serials = []
553 for v in values:
553 for v in values:
554 try:
554 try:
555 serials.append(newserialized.serialize(v))
555 serials.append(newserialized.serialize(v))
556 except:
556 except:
557 return defer.fail(failure.Failure())
557 return defer.fail(failure.Failure())
558 return serials
558 return serials
559 return packThemUp
559 return packThemUp
560
560
561
561
562 def queue(methodToQueue):
562 def queue(methodToQueue):
563 def queuedMethod(this, *args, **kwargs):
563 def queuedMethod(this, *args, **kwargs):
564 name = methodToQueue.__name__
564 name = methodToQueue.__name__
565 return this.submitCommand(Command(name, *args, **kwargs))
565 return this.submitCommand(Command(name, *args, **kwargs))
566 return queuedMethod
566 return queuedMethod
567
567
568 class QueuedEngine(object):
568 class QueuedEngine(object):
569 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
569 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
570
570
571 The resulting object will implement IEngineQueued which extends
571 The resulting object will implement IEngineQueued which extends
572 IEngineCore which extends (IEngineBase, IEngineSerialized).
572 IEngineCore which extends (IEngineBase, IEngineSerialized).
573
573
574 This seems like the best way of handling it, but I am not sure. The
574 This seems like the best way of handling it, but I am not sure. The
575 other option is to have the various base interfaces be used like
575 other option is to have the various base interfaces be used like
576 mix-in intefaces. The problem I have with this is adpatation is
576 mix-in intefaces. The problem I have with this is adpatation is
577 more difficult and complicated because there can be can multiple
577 more difficult and complicated because there can be can multiple
578 original and final Interfaces.
578 original and final Interfaces.
579 """
579 """
580
580
581 zi.implements(IEngineQueued)
581 zi.implements(IEngineQueued)
582
582
583 def __init__(self, engine):
583 def __init__(self, engine):
584 """Create a QueuedEngine object from an engine
584 """Create a QueuedEngine object from an engine
585
585
586 engine: An implementor of IEngineCore and IEngineSerialized
586 engine: An implementor of IEngineCore and IEngineSerialized
587 keepUpToDate: whether to update the remote status when the
587 keepUpToDate: whether to update the remote status when the
588 queue is empty. Defaults to False.
588 queue is empty. Defaults to False.
589 """
589 """
590
590
591 # This is the right way to do these tests rather than
591 # This is the right way to do these tests rather than
592 # IEngineCore in list(zi.providedBy(engine)) which will only
592 # IEngineCore in list(zi.providedBy(engine)) which will only
593 # picks of the interfaces that are directly declared by engine.
593 # picks of the interfaces that are directly declared by engine.
594 assert IEngineBase.providedBy(engine), \
594 assert IEngineBase.providedBy(engine), \
595 "engine passed to QueuedEngine doesn't provide IEngineBase"
595 "engine passed to QueuedEngine doesn't provide IEngineBase"
596
596
597 self.engine = engine
597 self.engine = engine
598 self.id = engine.id
598 self.id = engine.id
599 self.queued = []
599 self.queued = []
600 self.history = {}
600 self.history = {}
601 self.engineStatus = {}
601 self.engineStatus = {}
602 self.currentCommand = None
602 self.currentCommand = None
603 self.failureObservers = []
603 self.failureObservers = []
604
604
605 def _get_properties(self):
605 def _get_properties(self):
606 return self.engine.properties
606 return self.engine.properties
607
607
608 properties = property(_get_properties, lambda self, _: None)
608 properties = property(_get_properties, lambda self, _: None)
609 # Queue management methods. You should not call these directly
609 # Queue management methods. You should not call these directly
610
610
611 def submitCommand(self, cmd):
611 def submitCommand(self, cmd):
612 """Submit command to queue."""
612 """Submit command to queue."""
613
613
614 d = defer.Deferred()
614 d = defer.Deferred()
615 cmd.setDeferred(d)
615 cmd.setDeferred(d)
616 if self.currentCommand is not None:
616 if self.currentCommand is not None:
617 if self.currentCommand.finished:
617 if self.currentCommand.finished:
618 # log.msg("Running command immediately: %r" % cmd)
618 # log.msg("Running command immediately: %r" % cmd)
619 self.currentCommand = cmd
619 self.currentCommand = cmd
620 self.runCurrentCommand()
620 self.runCurrentCommand()
621 else: # command is still running
621 else: # command is still running
622 # log.msg("Command is running: %r" % self.currentCommand)
622 # log.msg("Command is running: %r" % self.currentCommand)
623 # log.msg("Queueing: %r" % cmd)
623 # log.msg("Queueing: %r" % cmd)
624 self.queued.append(cmd)
624 self.queued.append(cmd)
625 else:
625 else:
626 # log.msg("No current commands, running: %r" % cmd)
626 # log.msg("No current commands, running: %r" % cmd)
627 self.currentCommand = cmd
627 self.currentCommand = cmd
628 self.runCurrentCommand()
628 self.runCurrentCommand()
629 return d
629 return d
630
630
631 def runCurrentCommand(self):
631 def runCurrentCommand(self):
632 """Run current command."""
632 """Run current command."""
633
633
634 cmd = self.currentCommand
634 cmd = self.currentCommand
635 f = getattr(self.engine, cmd.remoteMethod, None)
635 f = getattr(self.engine, cmd.remoteMethod, None)
636 if f:
636 if f:
637 d = f(*cmd.args, **cmd.kwargs)
637 d = f(*cmd.args, **cmd.kwargs)
638 if cmd.remoteMethod is 'execute':
638 if cmd.remoteMethod is 'execute':
639 d.addCallback(self.saveResult)
639 d.addCallback(self.saveResult)
640 d.addCallback(self.finishCommand)
640 d.addCallback(self.finishCommand)
641 d.addErrback(self.abortCommand)
641 d.addErrback(self.abortCommand)
642 else:
642 else:
643 return defer.fail(AttributeError(cmd.remoteMethod))
643 return defer.fail(AttributeError(cmd.remoteMethod))
644
644
645 def _flushQueue(self):
645 def _flushQueue(self):
646 """Pop next command in queue and run it."""
646 """Pop next command in queue and run it."""
647
647
648 if len(self.queued) > 0:
648 if len(self.queued) > 0:
649 self.currentCommand = self.queued.pop(0)
649 self.currentCommand = self.queued.pop(0)
650 self.runCurrentCommand()
650 self.runCurrentCommand()
651
651
652 def saveResult(self, result):
652 def saveResult(self, result):
653 """Put the result in the history."""
653 """Put the result in the history."""
654 self.history[result['number']] = result
654 self.history[result['number']] = result
655 return result
655 return result
656
656
657 def finishCommand(self, result):
657 def finishCommand(self, result):
658 """Finish currrent command."""
658 """Finish currrent command."""
659
659
660 # The order of these commands is absolutely critical.
660 # The order of these commands is absolutely critical.
661 self.currentCommand.handleResult(result)
661 self.currentCommand.handleResult(result)
662 self.currentCommand.finished = True
662 self.currentCommand.finished = True
663 self._flushQueue()
663 self._flushQueue()
664 return result
664 return result
665
665
666 def abortCommand(self, reason):
666 def abortCommand(self, reason):
667 """Abort current command.
667 """Abort current command.
668
668
669 This eats the Failure but first passes it onto the Deferred that the
669 This eats the Failure but first passes it onto the Deferred that the
670 user has.
670 user has.
671
671
672 It also clear out the queue so subsequence commands don't run.
672 It also clear out the queue so subsequence commands don't run.
673 """
673 """
674
674
675 # The order of these 3 commands is absolutely critical. The currentCommand
675 # The order of these 3 commands is absolutely critical. The currentCommand
676 # must first be marked as finished BEFORE the queue is cleared and before
676 # must first be marked as finished BEFORE the queue is cleared and before
677 # the current command is sent the failure.
677 # the current command is sent the failure.
678 # Also, the queue must be cleared BEFORE the current command is sent the Failure
678 # Also, the queue must be cleared BEFORE the current command is sent the Failure
679 # otherwise the errback chain could trigger new commands to be added to the
679 # otherwise the errback chain could trigger new commands to be added to the
680 # queue before we clear it. We should clear ONLY the commands that were in
680 # queue before we clear it. We should clear ONLY the commands that were in
681 # the queue when the error occured.
681 # the queue when the error occured.
682 self.currentCommand.finished = True
682 self.currentCommand.finished = True
683 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
683 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
684 self.clear_queue(msg=s)
684 self.clear_queue(msg=s)
685 self.currentCommand.handleError(reason)
685 self.currentCommand.handleError(reason)
686
686
687 return None
687 return None
688
688
689 #---------------------------------------------------------------------------
689 #---------------------------------------------------------------------------
690 # IEngineCore methods
690 # IEngineCore methods
691 #---------------------------------------------------------------------------
691 #---------------------------------------------------------------------------
692
692
693 @queue
693 @queue
694 def execute(self, lines):
694 def execute(self, lines):
695 pass
695 pass
696
696
697 @queue
697 @queue
698 def push(self, namespace):
698 def push(self, namespace):
699 pass
699 pass
700
700
701 @queue
701 @queue
702 def pull(self, keys):
702 def pull(self, keys):
703 pass
703 pass
704
704
705 @queue
705 @queue
706 def push_function(self, namespace):
706 def push_function(self, namespace):
707 pass
707 pass
708
708
709 @queue
709 @queue
710 def pull_function(self, keys):
710 def pull_function(self, keys):
711 pass
711 pass
712
712
713 def get_result(self, i=None):
713 def get_result(self, i=None):
714 if i is None:
714 if i is None:
715 i = max(self.history.keys()+[None])
715 i = max(self.history.keys()+[None])
716
716
717 cmd = self.history.get(i, None)
717 cmd = self.history.get(i, None)
718 # Uncomment this line to disable chaching of results
718 # Uncomment this line to disable chaching of results
719 #cmd = None
719 #cmd = None
720 if cmd is None:
720 if cmd is None:
721 return self.submitCommand(Command('get_result', i))
721 return self.submitCommand(Command('get_result', i))
722 else:
722 else:
723 return defer.succeed(cmd)
723 return defer.succeed(cmd)
724
724
725 def reset(self):
725 def reset(self):
726 self.clear_queue()
726 self.clear_queue()
727 self.history = {} # reset the cache - I am not sure we should do this
727 self.history = {} # reset the cache - I am not sure we should do this
728 return self.submitCommand(Command('reset'))
728 return self.submitCommand(Command('reset'))
729
729
730 def kill(self):
730 def kill(self):
731 self.clear_queue()
731 self.clear_queue()
732 return self.submitCommand(Command('kill'))
732 return self.submitCommand(Command('kill'))
733
733
734 @queue
734 @queue
735 def keys(self):
735 def keys(self):
736 pass
736 pass
737
737
738 #---------------------------------------------------------------------------
738 #---------------------------------------------------------------------------
739 # IEngineSerialized methods
739 # IEngineSerialized methods
740 #---------------------------------------------------------------------------
740 #---------------------------------------------------------------------------
741
741
742 @queue
742 @queue
743 def push_serialized(self, namespace):
743 def push_serialized(self, namespace):
744 pass
744 pass
745
745
746 @queue
746 @queue
747 def pull_serialized(self, keys):
747 def pull_serialized(self, keys):
748 pass
748 pass
749
749
750 #---------------------------------------------------------------------------
750 #---------------------------------------------------------------------------
751 # IEngineProperties methods
751 # IEngineProperties methods
752 #---------------------------------------------------------------------------
752 #---------------------------------------------------------------------------
753
753
754 @queue
754 @queue
755 def set_properties(self, namespace):
755 def set_properties(self, namespace):
756 pass
756 pass
757
757
758 @queue
758 @queue
759 def get_properties(self, keys=None):
759 def get_properties(self, keys=None):
760 pass
760 pass
761
761
762 @queue
762 @queue
763 def del_properties(self, keys):
763 def del_properties(self, keys):
764 pass
764 pass
765
765
766 @queue
766 @queue
767 def has_properties(self, keys):
767 def has_properties(self, keys):
768 pass
768 pass
769
769
770 @queue
770 @queue
771 def clear_properties(self):
771 def clear_properties(self):
772 pass
772 pass
773
773
774 #---------------------------------------------------------------------------
774 #---------------------------------------------------------------------------
775 # IQueuedEngine methods
775 # IQueuedEngine methods
776 #---------------------------------------------------------------------------
776 #---------------------------------------------------------------------------
777
777
778 def clear_queue(self, msg=''):
778 def clear_queue(self, msg=''):
779 """Clear the queue, but doesn't cancel the currently running commmand."""
779 """Clear the queue, but doesn't cancel the currently running commmand."""
780
780
781 for cmd in self.queued:
781 for cmd in self.queued:
782 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
782 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
783 self.queued = []
783 self.queued = []
784 return defer.succeed(None)
784 return defer.succeed(None)
785
785
786 def queue_status(self):
786 def queue_status(self):
787 if self.currentCommand is not None:
787 if self.currentCommand is not None:
788 if self.currentCommand.finished:
788 if self.currentCommand.finished:
789 pending = repr(None)
789 pending = repr(None)
790 else:
790 else:
791 pending = repr(self.currentCommand)
791 pending = repr(self.currentCommand)
792 else:
792 else:
793 pending = repr(None)
793 pending = repr(None)
794 dikt = {'queue':map(repr,self.queued), 'pending':pending}
794 dikt = {'queue':map(repr,self.queued), 'pending':pending}
795 return defer.succeed(dikt)
795 return defer.succeed(dikt)
796
796
797 def register_failure_observer(self, obs):
797 def register_failure_observer(self, obs):
798 self.failureObservers.append(obs)
798 self.failureObservers.append(obs)
799
799
800 def unregister_failure_observer(self, obs):
800 def unregister_failure_observer(self, obs):
801 self.failureObservers.remove(obs)
801 self.failureObservers.remove(obs)
802
802
803
803
804 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
804 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
805 # IEngineQueued.
805 # IEngineQueued.
806 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
806 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
807
807
808
808
809 class Command(object):
809 class Command(object):
810 """A command object that encapslates queued commands.
810 """A command object that encapslates queued commands.
811
811
812 This class basically keeps track of a command that has been queued
812 This class basically keeps track of a command that has been queued
813 in a QueuedEngine. It manages the deferreds and hold the method to be called
813 in a QueuedEngine. It manages the deferreds and hold the method to be called
814 and the arguments to that method.
814 and the arguments to that method.
815 """
815 """
816
816
817
817
818 def __init__(self, remoteMethod, *args, **kwargs):
818 def __init__(self, remoteMethod, *args, **kwargs):
819 """Build a new Command object."""
819 """Build a new Command object."""
820
820
821 self.remoteMethod = remoteMethod
821 self.remoteMethod = remoteMethod
822 self.args = args
822 self.args = args
823 self.kwargs = kwargs
823 self.kwargs = kwargs
824 self.finished = False
824 self.finished = False
825
825
826 def setDeferred(self, d):
826 def setDeferred(self, d):
827 """Sets the deferred attribute of the Command."""
827 """Sets the deferred attribute of the Command."""
828
828
829 self.deferred = d
829 self.deferred = d
830
830
831 def __repr__(self):
831 def __repr__(self):
832 if not self.args:
832 if not self.args:
833 args = ''
833 args = ''
834 else:
834 else:
835 args = str(self.args)[1:-2] #cut off (...,)
835 args = str(self.args)[1:-2] #cut off (...,)
836 for k,v in self.kwargs.iteritems():
836 for k,v in self.kwargs.iteritems():
837 if args:
837 if args:
838 args += ', '
838 args += ', '
839 args += '%s=%r' %(k,v)
839 args += '%s=%r' %(k,v)
840 return "%s(%s)" %(self.remoteMethod, args)
840 return "%s(%s)" %(self.remoteMethod, args)
841
841
842 def handleResult(self, result):
842 def handleResult(self, result):
843 """When the result is ready, relay it to self.deferred."""
843 """When the result is ready, relay it to self.deferred."""
844
844
845 self.deferred.callback(result)
845 self.deferred.callback(result)
846
846
847 def handleError(self, reason):
847 def handleError(self, reason):
848 """When an error has occured, relay it to self.deferred."""
848 """When an error has occured, relay it to self.deferred."""
849
849
850 self.deferred.errback(reason)
850 self.deferred.errback(reason)
851
851
852 class ThreadedEngineService(EngineService):
852 class ThreadedEngineService(EngineService):
853 """An EngineService subclass that defers execute commands to a separate
853 """An EngineService subclass that defers execute commands to a separate
854 thread.
854 thread.
855
855
856 ThreadedEngineService uses twisted.internet.threads.deferToThread to
856 ThreadedEngineService uses twisted.internet.threads.deferToThread to
857 defer execute requests to a separate thread. GUI frontends may want to
857 defer execute requests to a separate thread. GUI frontends may want to
858 use ThreadedEngineService as the engine in an
858 use ThreadedEngineService as the engine in an
859 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
859 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
860 block execution from blocking the GUI thread.
860 block execution from blocking the GUI thread.
861 """
861 """
862
862
863 zi.implements(IEngineBase)
863 zi.implements(IEngineBase)
864
864
865 def __init__(self, shellClass=Interpreter, mpi=None):
865 def __init__(self, shellClass=Interpreter, mpi=None):
866 EngineService.__init__(self, shellClass, mpi)
866 EngineService.__init__(self, shellClass, mpi)
867
867
868 def wrapped_execute(self, msg, lines):
868 def wrapped_execute(self, msg, lines):
869 """Wrap self.shell.execute to add extra information to tracebacks"""
869 """Wrap self.shell.execute to add extra information to tracebacks"""
870
870
871 try:
871 try:
872 result = self.shell.execute(lines)
872 result = self.shell.execute(lines)
873 except Exception,e:
873 except Exception,e:
874 # This gives the following:
874 # This gives the following:
875 # et=exception class
875 # et=exception class
876 # ev=exception class instance
876 # ev=exception class instance
877 # tb=traceback object
877 # tb=traceback object
878 et,ev,tb = sys.exc_info()
878 et,ev,tb = sys.exc_info()
879 # This call adds attributes to the exception value
879 # This call adds attributes to the exception value
880 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
880 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
881 # Add another attribute
881 # Add another attribute
882
882
883 # Create a new exception with the new attributes
883 # Create a new exception with the new attributes
884 e = et(ev._ipython_traceback_text)
884 e = et(ev._ipython_traceback_text)
885 e._ipython_engine_info = msg
885 e._ipython_engine_info = msg
886
886
887 # Re-raise
887 # Re-raise
888 raise e
888 raise e
889
889
890 return result
890 return result
891
891
892
892
893 def execute(self, lines):
893 def execute(self, lines):
894 # Only import this if we are going to use this class
894 # Only import this if we are going to use this class
895 from twisted.internet import threads
895 from twisted.internet import threads
896
896
897 msg = {'engineid':self.id,
897 msg = {'engineid':self.id,
898 'method':'execute',
898 'method':'execute',
899 'args':[lines]}
899 'args':[lines]}
900
900
901 d = threads.deferToThread(self.wrapped_execute, msg, lines)
901 d = threads.deferToThread(self.wrapped_execute, msg, lines)
902 d.addCallback(self.addIDToResult)
902 d.addCallback(self.addIDToResult)
903 return d
903 return d
@@ -1,757 +1,757 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """
3 """
4 Expose the multiengine controller over the Foolscap network protocol.
4 Expose the multiengine controller over the Foolscap network protocol.
5 """
5 """
6
6
7 __docformat__ = "restructuredtext en"
7 __docformat__ = "restructuredtext en"
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Copyright (C) 2008 The IPython Development Team
10 # Copyright (C) 2008 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15
15
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19
19
20 import cPickle as pickle
20 import cPickle as pickle
21 from types import FunctionType
21 from types import FunctionType
22
22
23 from zope.interface import Interface, implements
23 from zope.interface import Interface, implements
24 from twisted.internet import defer
24 from twisted.internet import defer
25 from twisted.python import components, failure, log
25 from twisted.python import components, failure, log
26
26
27 from foolscap import Referenceable
27 from foolscap import Referenceable
28
28
29 from IPython.kernel import error
29 from IPython.kernel import error
30 from IPython.kernel.util import printer
30 from IPython.kernel.util import printer
31 from IPython.kernel import map as Map
31 from IPython.kernel import map as Map
32 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.mapper import (
33 from IPython.kernel.mapper import (
34 MultiEngineMapper,
34 MultiEngineMapper,
35 IMultiEngineMapperFactory,
35 IMultiEngineMapperFactory,
36 IMapper
36 IMapper
37 )
37 )
38 from IPython.kernel.twistedutil import gatherBoth
38 from IPython.kernel.twistedutil import gatherBoth
39 from IPython.kernel.multiengine import (MultiEngine,
39 from IPython.kernel.multiengine import (MultiEngine,
40 IMultiEngine,
40 IMultiEngine,
41 IFullSynchronousMultiEngine,
41 IFullSynchronousMultiEngine,
42 ISynchronousMultiEngine)
42 ISynchronousMultiEngine)
43 from IPython.kernel.multiengineclient import wrapResultList
43 from IPython.kernel.multiengineclient import wrapResultList
44 from IPython.kernel.pendingdeferred import PendingDeferredManager
44 from IPython.kernel.pendingdeferred import PendingDeferredManager
45 from IPython.kernel.pickleutil import (can, canDict,
45 from IPython.kernel.pickleutil import (can, canDict,
46 canSequence, uncan, uncanDict, uncanSequence)
46 canSequence, uncan, uncanDict, uncanSequence)
47
47
48 from IPython.kernel.clientinterfaces import (
48 from IPython.kernel.clientinterfaces import (
49 IFCClientInterfaceProvider,
49 IFCClientInterfaceProvider,
50 IBlockingClientAdaptor
50 IBlockingClientAdaptor
51 )
51 )
52
52
53 # Needed to access the true globals from __main__.__dict__
53 # Needed to access the true globals from __main__.__dict__
54 import __main__
54 import __main__
55
55
56 #-------------------------------------------------------------------------------
56 #-------------------------------------------------------------------------------
57 # The Controller side of things
57 # The Controller side of things
58 #-------------------------------------------------------------------------------
58 #-------------------------------------------------------------------------------
59
59
60 def packageResult(wrappedMethod):
60 def packageResult(wrappedMethod):
61
61
62 def wrappedPackageResult(self, *args, **kwargs):
62 def wrappedPackageResult(self, *args, **kwargs):
63 d = wrappedMethod(self, *args, **kwargs)
63 d = wrappedMethod(self, *args, **kwargs)
64 d.addCallback(self.packageSuccess)
64 d.addCallback(self.packageSuccess)
65 d.addErrback(self.packageFailure)
65 d.addErrback(self.packageFailure)
66 return d
66 return d
67 return wrappedPackageResult
67 return wrappedPackageResult
68
68
69
69
70 class IFCSynchronousMultiEngine(Interface):
70 class IFCSynchronousMultiEngine(Interface):
71 """Foolscap interface to `ISynchronousMultiEngine`.
71 """Foolscap interface to `ISynchronousMultiEngine`.
72
72
73 The methods in this interface are similar to those of
73 The methods in this interface are similar to those of
74 `ISynchronousMultiEngine`, but their arguments and return values are pickled
74 `ISynchronousMultiEngine`, but their arguments and return values are pickled
75 if they are not already simple Python types that can be send over XML-RPC.
75 if they are not already simple Python types that can be send over XML-RPC.
76
76
77 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
77 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
78 documentation about the methods.
78 documentation about the methods.
79
79
80 Most methods in this interface act like the `ISynchronousMultiEngine`
80 Most methods in this interface act like the `ISynchronousMultiEngine`
81 versions and can be called in blocking or non-blocking mode.
81 versions and can be called in blocking or non-blocking mode.
82 """
82 """
83 pass
83 pass
84
84
85
85
86 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
86 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
87 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
87 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
88 """
88 """
89
89
90 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
90 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
91
91
92 addSlash = True
92 addSlash = True
93
93
94 def __init__(self, multiengine):
94 def __init__(self, multiengine):
95 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
95 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
96 # it. This allow this class to do two adaptation steps.
96 # it. This allow this class to do two adaptation steps.
97 self.smultiengine = ISynchronousMultiEngine(multiengine)
97 self.smultiengine = ISynchronousMultiEngine(multiengine)
98 self._deferredIDCallbacks = {}
98 self._deferredIDCallbacks = {}
99
99
100 #---------------------------------------------------------------------------
100 #---------------------------------------------------------------------------
101 # Non interface methods
101 # Non interface methods
102 #---------------------------------------------------------------------------
102 #---------------------------------------------------------------------------
103
103
104 def packageFailure(self, f):
104 def packageFailure(self, f):
105 f.cleanFailure()
105 f.cleanFailure()
106 return self.packageSuccess(f)
106 return self.packageSuccess(f)
107
107
108 def packageSuccess(self, obj):
108 def packageSuccess(self, obj):
109 serial = pickle.dumps(obj, 2)
109 serial = pickle.dumps(obj, 2)
110 return serial
110 return serial
111
111
112 #---------------------------------------------------------------------------
112 #---------------------------------------------------------------------------
113 # Things related to PendingDeferredManager
113 # Things related to PendingDeferredManager
114 #---------------------------------------------------------------------------
114 #---------------------------------------------------------------------------
115
115
116 @packageResult
116 @packageResult
117 def remote_get_pending_deferred(self, deferredID, block):
117 def remote_get_pending_deferred(self, deferredID, block):
118 d = self.smultiengine.get_pending_deferred(deferredID, block)
118 d = self.smultiengine.get_pending_deferred(deferredID, block)
119 try:
119 try:
120 callback = self._deferredIDCallbacks.pop(deferredID)
120 callback = self._deferredIDCallbacks.pop(deferredID)
121 except KeyError:
121 except KeyError:
122 callback = None
122 callback = None
123 if callback is not None:
123 if callback is not None:
124 d.addCallback(callback[0], *callback[1], **callback[2])
124 d.addCallback(callback[0], *callback[1], **callback[2])
125 return d
125 return d
126
126
127 @packageResult
127 @packageResult
128 def remote_clear_pending_deferreds(self):
128 def remote_clear_pending_deferreds(self):
129 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
129 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
130
130
131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
133 return did
133 return did
134
134
135 #---------------------------------------------------------------------------
135 #---------------------------------------------------------------------------
136 # IEngineMultiplexer related methods
136 # IEngineMultiplexer related methods
137 #---------------------------------------------------------------------------
137 #---------------------------------------------------------------------------
138
138
139 @packageResult
139 @packageResult
140 def remote_execute(self, lines, targets, block):
140 def remote_execute(self, lines, targets, block):
141 return self.smultiengine.execute(lines, targets=targets, block=block)
141 return self.smultiengine.execute(lines, targets=targets, block=block)
142
142
143 @packageResult
143 @packageResult
144 def remote_push(self, binaryNS, targets, block):
144 def remote_push(self, binaryNS, targets, block):
145 try:
145 try:
146 namespace = pickle.loads(binaryNS)
146 namespace = pickle.loads(binaryNS)
147 except:
147 except:
148 d = defer.fail(failure.Failure())
148 d = defer.fail(failure.Failure())
149 else:
149 else:
150 d = self.smultiengine.push(namespace, targets=targets, block=block)
150 d = self.smultiengine.push(namespace, targets=targets, block=block)
151 return d
151 return d
152
152
153 @packageResult
153 @packageResult
154 def remote_pull(self, keys, targets, block):
154 def remote_pull(self, keys, targets, block):
155 d = self.smultiengine.pull(keys, targets=targets, block=block)
155 d = self.smultiengine.pull(keys, targets=targets, block=block)
156 return d
156 return d
157
157
158 @packageResult
158 @packageResult
159 def remote_push_function(self, binaryNS, targets, block):
159 def remote_push_function(self, binaryNS, targets, block):
160 try:
160 try:
161 namespace = pickle.loads(binaryNS)
161 namespace = pickle.loads(binaryNS)
162 except:
162 except:
163 d = defer.fail(failure.Failure())
163 d = defer.fail(failure.Failure())
164 else:
164 else:
165 namespace = uncanDict(namespace)
165 namespace = uncanDict(namespace)
166 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
166 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
167 return d
167 return d
168
168
169 def _canMultipleKeys(self, result):
169 def _canMultipleKeys(self, result):
170 return [canSequence(r) for r in result]
170 return [canSequence(r) for r in result]
171
171
172 @packageResult
172 @packageResult
173 def remote_pull_function(self, keys, targets, block):
173 def remote_pull_function(self, keys, targets, block):
174 def can_functions(r, keys):
174 def can_functions(r, keys):
175 if len(keys)==1 or isinstance(keys, str):
175 if len(keys)==1 or isinstance(keys, str):
176 result = canSequence(r)
176 result = canSequence(r)
177 elif len(keys)>1:
177 elif len(keys)>1:
178 result = [canSequence(s) for s in r]
178 result = [canSequence(s) for s in r]
179 return result
179 return result
180 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
180 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
181 if block:
181 if block:
182 d.addCallback(can_functions, keys)
182 d.addCallback(can_functions, keys)
183 else:
183 else:
184 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
184 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
185 return d
185 return d
186
186
187 @packageResult
187 @packageResult
188 def remote_push_serialized(self, binaryNS, targets, block):
188 def remote_push_serialized(self, binaryNS, targets, block):
189 try:
189 try:
190 namespace = pickle.loads(binaryNS)
190 namespace = pickle.loads(binaryNS)
191 except:
191 except:
192 d = defer.fail(failure.Failure())
192 d = defer.fail(failure.Failure())
193 else:
193 else:
194 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
194 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
195 return d
195 return d
196
196
197 @packageResult
197 @packageResult
198 def remote_pull_serialized(self, keys, targets, block):
198 def remote_pull_serialized(self, keys, targets, block):
199 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
199 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
200 return d
200 return d
201
201
202 @packageResult
202 @packageResult
203 def remote_get_result(self, i, targets, block):
203 def remote_get_result(self, i, targets, block):
204 if i == 'None':
204 if i == 'None':
205 i = None
205 i = None
206 return self.smultiengine.get_result(i, targets=targets, block=block)
206 return self.smultiengine.get_result(i, targets=targets, block=block)
207
207
208 @packageResult
208 @packageResult
209 def remote_reset(self, targets, block):
209 def remote_reset(self, targets, block):
210 return self.smultiengine.reset(targets=targets, block=block)
210 return self.smultiengine.reset(targets=targets, block=block)
211
211
212 @packageResult
212 @packageResult
213 def remote_keys(self, targets, block):
213 def remote_keys(self, targets, block):
214 return self.smultiengine.keys(targets=targets, block=block)
214 return self.smultiengine.keys(targets=targets, block=block)
215
215
216 @packageResult
216 @packageResult
217 def remote_kill(self, controller, targets, block):
217 def remote_kill(self, controller, targets, block):
218 return self.smultiengine.kill(controller, targets=targets, block=block)
218 return self.smultiengine.kill(controller, targets=targets, block=block)
219
219
220 @packageResult
220 @packageResult
221 def remote_clear_queue(self, targets, block):
221 def remote_clear_queue(self, targets, block):
222 return self.smultiengine.clear_queue(targets=targets, block=block)
222 return self.smultiengine.clear_queue(targets=targets, block=block)
223
223
224 @packageResult
224 @packageResult
225 def remote_queue_status(self, targets, block):
225 def remote_queue_status(self, targets, block):
226 return self.smultiengine.queue_status(targets=targets, block=block)
226 return self.smultiengine.queue_status(targets=targets, block=block)
227
227
228 @packageResult
228 @packageResult
229 def remote_set_properties(self, binaryNS, targets, block):
229 def remote_set_properties(self, binaryNS, targets, block):
230 try:
230 try:
231 ns = pickle.loads(binaryNS)
231 ns = pickle.loads(binaryNS)
232 except:
232 except:
233 d = defer.fail(failure.Failure())
233 d = defer.fail(failure.Failure())
234 else:
234 else:
235 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
235 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
236 return d
236 return d
237
237
238 @packageResult
238 @packageResult
239 def remote_get_properties(self, keys, targets, block):
239 def remote_get_properties(self, keys, targets, block):
240 if keys=='None':
240 if keys=='None':
241 keys=None
241 keys=None
242 return self.smultiengine.get_properties(keys, targets=targets, block=block)
242 return self.smultiengine.get_properties(keys, targets=targets, block=block)
243
243
244 @packageResult
244 @packageResult
245 def remote_has_properties(self, keys, targets, block):
245 def remote_has_properties(self, keys, targets, block):
246 return self.smultiengine.has_properties(keys, targets=targets, block=block)
246 return self.smultiengine.has_properties(keys, targets=targets, block=block)
247
247
248 @packageResult
248 @packageResult
249 def remote_del_properties(self, keys, targets, block):
249 def remote_del_properties(self, keys, targets, block):
250 return self.smultiengine.del_properties(keys, targets=targets, block=block)
250 return self.smultiengine.del_properties(keys, targets=targets, block=block)
251
251
252 @packageResult
252 @packageResult
253 def remote_clear_properties(self, targets, block):
253 def remote_clear_properties(self, targets, block):
254 return self.smultiengine.clear_properties(targets=targets, block=block)
254 return self.smultiengine.clear_properties(targets=targets, block=block)
255
255
256 #---------------------------------------------------------------------------
256 #---------------------------------------------------------------------------
257 # IMultiEngine related methods
257 # IMultiEngine related methods
258 #---------------------------------------------------------------------------
258 #---------------------------------------------------------------------------
259
259
260 def remote_get_ids(self):
260 def remote_get_ids(self):
261 """Get the ids of the registered engines.
261 """Get the ids of the registered engines.
262
262
263 This method always blocks.
263 This method always blocks.
264 """
264 """
265 return self.smultiengine.get_ids()
265 return self.smultiengine.get_ids()
266
266
267 #---------------------------------------------------------------------------
267 #---------------------------------------------------------------------------
268 # IFCClientInterfaceProvider related methods
268 # IFCClientInterfaceProvider related methods
269 #---------------------------------------------------------------------------
269 #---------------------------------------------------------------------------
270
270
271 def remote_get_client_name(self):
271 def remote_get_client_name(self):
272 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
272 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
273
273
274
274
275 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
275 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
276 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
276 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
277 # two phase adaptation.
277 # two phase adaptation.
278 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
278 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
279 IMultiEngine, IFCSynchronousMultiEngine)
279 IMultiEngine, IFCSynchronousMultiEngine)
280
280
281
281
282 #-------------------------------------------------------------------------------
282 #-------------------------------------------------------------------------------
283 # The Client side of things
283 # The Client side of things
284 #-------------------------------------------------------------------------------
284 #-------------------------------------------------------------------------------
285
285
286
286
287 class FCFullSynchronousMultiEngineClient(object):
287 class FCFullSynchronousMultiEngineClient(object):
288
288
289 implements(
289 implements(
290 IFullSynchronousMultiEngine,
290 IFullSynchronousMultiEngine,
291 IBlockingClientAdaptor,
291 IBlockingClientAdaptor,
292 IMultiEngineMapperFactory,
292 IMultiEngineMapperFactory,
293 IMapper
293 IMapper
294 )
294 )
295
295
296 def __init__(self, remote_reference):
296 def __init__(self, remote_reference):
297 self.remote_reference = remote_reference
297 self.remote_reference = remote_reference
298 self._deferredIDCallbacks = {}
298 self._deferredIDCallbacks = {}
299 # This class manages some pending deferreds through this instance. This
299 # This class manages some pending deferreds through this instance. This
300 # is required for methods like gather/scatter as it enables us to
300 # is required for methods like gather/scatter as it enables us to
301 # create our own pending deferreds for composite operations.
301 # create our own pending deferreds for composite operations.
302 self.pdm = PendingDeferredManager()
302 self.pdm = PendingDeferredManager()
303
303
304 #---------------------------------------------------------------------------
304 #---------------------------------------------------------------------------
305 # Non interface methods
305 # Non interface methods
306 #---------------------------------------------------------------------------
306 #---------------------------------------------------------------------------
307
307
308 def unpackage(self, r):
308 def unpackage(self, r):
309 return pickle.loads(r)
309 return pickle.loads(r)
310
310
311 #---------------------------------------------------------------------------
311 #---------------------------------------------------------------------------
312 # Things related to PendingDeferredManager
312 # Things related to PendingDeferredManager
313 #---------------------------------------------------------------------------
313 #---------------------------------------------------------------------------
314
314
315 def get_pending_deferred(self, deferredID, block=True):
315 def get_pending_deferred(self, deferredID, block=True):
316
316
317 # Because we are managing some pending deferreds locally (through
317 # Because we are managing some pending deferreds locally (through
318 # self.pdm) and some remotely (on the controller), we first try the
318 # self.pdm) and some remotely (on the controller), we first try the
319 # local one and then the remote one.
319 # local one and then the remote one.
320 if self.pdm.quick_has_id(deferredID):
320 if self.pdm.quick_has_id(deferredID):
321 d = self.pdm.get_pending_deferred(deferredID, block)
321 d = self.pdm.get_pending_deferred(deferredID, block)
322 return d
322 return d
323 else:
323 else:
324 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
324 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
325 d.addCallback(self.unpackage)
325 d.addCallback(self.unpackage)
326 try:
326 try:
327 callback = self._deferredIDCallbacks.pop(deferredID)
327 callback = self._deferredIDCallbacks.pop(deferredID)
328 except KeyError:
328 except KeyError:
329 callback = None
329 callback = None
330 if callback is not None:
330 if callback is not None:
331 d.addCallback(callback[0], *callback[1], **callback[2])
331 d.addCallback(callback[0], *callback[1], **callback[2])
332 return d
332 return d
333
333
334 def clear_pending_deferreds(self):
334 def clear_pending_deferreds(self):
335
335
336 # This clear both the local (self.pdm) and remote pending deferreds
336 # This clear both the local (self.pdm) and remote pending deferreds
337 self.pdm.clear_pending_deferreds()
337 self.pdm.clear_pending_deferreds()
338 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
338 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
339 d2.addCallback(self.unpackage)
339 d2.addCallback(self.unpackage)
340 return d2
340 return d2
341
341
342 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
342 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
343 self._deferredIDCallbacks[did] = (callback, args, kwargs)
343 self._deferredIDCallbacks[did] = (callback, args, kwargs)
344 return did
344 return did
345
345
346 #---------------------------------------------------------------------------
346 #---------------------------------------------------------------------------
347 # IEngineMultiplexer related methods
347 # IEngineMultiplexer related methods
348 #---------------------------------------------------------------------------
348 #---------------------------------------------------------------------------
349
349
350 def execute(self, lines, targets='all', block=True):
350 def execute(self, lines, targets='all', block=True):
351 d = self.remote_reference.callRemote('execute', lines, targets, block)
351 d = self.remote_reference.callRemote('execute', lines, targets, block)
352 d.addCallback(self.unpackage)
352 d.addCallback(self.unpackage)
353 return d
353 return d
354
354
355 def push(self, namespace, targets='all', block=True):
355 def push(self, namespace, targets='all', block=True):
356 serial = pickle.dumps(namespace, 2)
356 serial = pickle.dumps(namespace, 2)
357 d = self.remote_reference.callRemote('push', serial, targets, block)
357 d = self.remote_reference.callRemote('push', serial, targets, block)
358 d.addCallback(self.unpackage)
358 d.addCallback(self.unpackage)
359 return d
359 return d
360
360
361 def pull(self, keys, targets='all', block=True):
361 def pull(self, keys, targets='all', block=True):
362 d = self.remote_reference.callRemote('pull', keys, targets, block)
362 d = self.remote_reference.callRemote('pull', keys, targets, block)
363 d.addCallback(self.unpackage)
363 d.addCallback(self.unpackage)
364 return d
364 return d
365
365
366 def push_function(self, namespace, targets='all', block=True):
366 def push_function(self, namespace, targets='all', block=True):
367 cannedNamespace = canDict(namespace)
367 cannedNamespace = canDict(namespace)
368 serial = pickle.dumps(cannedNamespace, 2)
368 serial = pickle.dumps(cannedNamespace, 2)
369 d = self.remote_reference.callRemote('push_function', serial, targets, block)
369 d = self.remote_reference.callRemote('push_function', serial, targets, block)
370 d.addCallback(self.unpackage)
370 d.addCallback(self.unpackage)
371 return d
371 return d
372
372
373 def pull_function(self, keys, targets='all', block=True):
373 def pull_function(self, keys, targets='all', block=True):
374 def uncan_functions(r, keys):
374 def uncan_functions(r, keys):
375 if len(keys)==1 or isinstance(keys, str):
375 if len(keys)==1 or isinstance(keys, str):
376 return uncanSequence(r)
376 return uncanSequence(r)
377 elif len(keys)>1:
377 elif len(keys)>1:
378 return [uncanSequence(s) for s in r]
378 return [uncanSequence(s) for s in r]
379 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
379 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
380 if block:
380 if block:
381 d.addCallback(self.unpackage)
381 d.addCallback(self.unpackage)
382 d.addCallback(uncan_functions, keys)
382 d.addCallback(uncan_functions, keys)
383 else:
383 else:
384 d.addCallback(self.unpackage)
384 d.addCallback(self.unpackage)
385 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
385 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
386 return d
386 return d
387
387
388 def push_serialized(self, namespace, targets='all', block=True):
388 def push_serialized(self, namespace, targets='all', block=True):
389 cannedNamespace = canDict(namespace)
389 cannedNamespace = canDict(namespace)
390 serial = pickle.dumps(cannedNamespace, 2)
390 serial = pickle.dumps(cannedNamespace, 2)
391 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
391 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
392 d.addCallback(self.unpackage)
392 d.addCallback(self.unpackage)
393 return d
393 return d
394
394
395 def pull_serialized(self, keys, targets='all', block=True):
395 def pull_serialized(self, keys, targets='all', block=True):
396 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
396 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
397 d.addCallback(self.unpackage)
397 d.addCallback(self.unpackage)
398 return d
398 return d
399
399
400 def get_result(self, i=None, targets='all', block=True):
400 def get_result(self, i=None, targets='all', block=True):
401 if i is None: # This is because None cannot be marshalled by xml-rpc
401 if i is None: # This is because None cannot be marshalled by xml-rpc
402 i = 'None'
402 i = 'None'
403 d = self.remote_reference.callRemote('get_result', i, targets, block)
403 d = self.remote_reference.callRemote('get_result', i, targets, block)
404 d.addCallback(self.unpackage)
404 d.addCallback(self.unpackage)
405 return d
405 return d
406
406
407 def reset(self, targets='all', block=True):
407 def reset(self, targets='all', block=True):
408 d = self.remote_reference.callRemote('reset', targets, block)
408 d = self.remote_reference.callRemote('reset', targets, block)
409 d.addCallback(self.unpackage)
409 d.addCallback(self.unpackage)
410 return d
410 return d
411
411
412 def keys(self, targets='all', block=True):
412 def keys(self, targets='all', block=True):
413 d = self.remote_reference.callRemote('keys', targets, block)
413 d = self.remote_reference.callRemote('keys', targets, block)
414 d.addCallback(self.unpackage)
414 d.addCallback(self.unpackage)
415 return d
415 return d
416
416
417 def kill(self, controller=False, targets='all', block=True):
417 def kill(self, controller=False, targets='all', block=True):
418 d = self.remote_reference.callRemote('kill', controller, targets, block)
418 d = self.remote_reference.callRemote('kill', controller, targets, block)
419 d.addCallback(self.unpackage)
419 d.addCallback(self.unpackage)
420 return d
420 return d
421
421
422 def clear_queue(self, targets='all', block=True):
422 def clear_queue(self, targets='all', block=True):
423 d = self.remote_reference.callRemote('clear_queue', targets, block)
423 d = self.remote_reference.callRemote('clear_queue', targets, block)
424 d.addCallback(self.unpackage)
424 d.addCallback(self.unpackage)
425 return d
425 return d
426
426
427 def queue_status(self, targets='all', block=True):
427 def queue_status(self, targets='all', block=True):
428 d = self.remote_reference.callRemote('queue_status', targets, block)
428 d = self.remote_reference.callRemote('queue_status', targets, block)
429 d.addCallback(self.unpackage)
429 d.addCallback(self.unpackage)
430 return d
430 return d
431
431
432 def set_properties(self, properties, targets='all', block=True):
432 def set_properties(self, properties, targets='all', block=True):
433 serial = pickle.dumps(properties, 2)
433 serial = pickle.dumps(properties, 2)
434 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
434 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
435 d.addCallback(self.unpackage)
435 d.addCallback(self.unpackage)
436 return d
436 return d
437
437
438 def get_properties(self, keys=None, targets='all', block=True):
438 def get_properties(self, keys=None, targets='all', block=True):
439 if keys==None:
439 if keys==None:
440 keys='None'
440 keys='None'
441 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
441 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
442 d.addCallback(self.unpackage)
442 d.addCallback(self.unpackage)
443 return d
443 return d
444
444
445 def has_properties(self, keys, targets='all', block=True):
445 def has_properties(self, keys, targets='all', block=True):
446 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
446 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
447 d.addCallback(self.unpackage)
447 d.addCallback(self.unpackage)
448 return d
448 return d
449
449
450 def del_properties(self, keys, targets='all', block=True):
450 def del_properties(self, keys, targets='all', block=True):
451 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
451 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
452 d.addCallback(self.unpackage)
452 d.addCallback(self.unpackage)
453 return d
453 return d
454
454
455 def clear_properties(self, targets='all', block=True):
455 def clear_properties(self, targets='all', block=True):
456 d = self.remote_reference.callRemote('clear_properties', targets, block)
456 d = self.remote_reference.callRemote('clear_properties', targets, block)
457 d.addCallback(self.unpackage)
457 d.addCallback(self.unpackage)
458 return d
458 return d
459
459
460 #---------------------------------------------------------------------------
460 #---------------------------------------------------------------------------
461 # IMultiEngine related methods
461 # IMultiEngine related methods
462 #---------------------------------------------------------------------------
462 #---------------------------------------------------------------------------
463
463
464 def get_ids(self):
464 def get_ids(self):
465 d = self.remote_reference.callRemote('get_ids')
465 d = self.remote_reference.callRemote('get_ids')
466 return d
466 return d
467
467
468 #---------------------------------------------------------------------------
468 #---------------------------------------------------------------------------
469 # ISynchronousMultiEngineCoordinator related methods
469 # ISynchronousMultiEngineCoordinator related methods
470 #---------------------------------------------------------------------------
470 #---------------------------------------------------------------------------
471
471
472 def _process_targets(self, targets):
472 def _process_targets(self, targets):
473 def create_targets(ids):
473 def create_targets(ids):
474 if isinstance(targets, int):
474 if isinstance(targets, int):
475 engines = [targets]
475 engines = [targets]
476 elif targets=='all':
476 elif targets=='all':
477 engines = ids
477 engines = ids
478 elif isinstance(targets, (list, tuple)):
478 elif isinstance(targets, (list, tuple)):
479 engines = targets
479 engines = targets
480 for t in engines:
480 for t in engines:
481 if not t in ids:
481 if not t in ids:
482 raise error.InvalidEngineID("engine with id %r does not exist"%t)
482 raise error.InvalidEngineID("engine with id %r does not exist"%t)
483 return engines
483 return engines
484
484
485 d = self.get_ids()
485 d = self.get_ids()
486 d.addCallback(create_targets)
486 d.addCallback(create_targets)
487 return d
487 return d
488
488
489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
490
490
491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
492 # This enables us to collect a bunch fo deferred ids and make a secondary
492 # This enables us to collect a bunch fo deferred ids and make a secondary
493 # deferred id that corresponds to the entire group. This logic is extremely
493 # deferred id that corresponds to the entire group. This logic is extremely
494 # difficult to get right though.
494 # difficult to get right though.
495 def do_scatter(engines):
495 def do_scatter(engines):
496 nEngines = len(engines)
496 nEngines = len(engines)
497 mapClass = Map.dists[dist]
497 mapClass = Map.dists[dist]
498 mapObject = mapClass()
498 mapObject = mapClass()
499 d_list = []
499 d_list = []
500 # Loop through and push to each engine in non-blocking mode.
500 # Loop through and push to each engine in non-blocking mode.
501 # This returns a set of deferreds to deferred_ids
501 # This returns a set of deferreds to deferred_ids
502 for index, engineid in enumerate(engines):
502 for index, engineid in enumerate(engines):
503 partition = mapObject.getPartition(seq, index, nEngines)
503 partition = mapObject.getPartition(seq, index, nEngines)
504 if flatten and len(partition) == 1:
504 if flatten and len(partition) == 1:
505 d = self.push({key: partition[0]}, targets=engineid, block=False)
505 d = self.push({key: partition[0]}, targets=engineid, block=False)
506 else:
506 else:
507 d = self.push({key: partition}, targets=engineid, block=False)
507 d = self.push({key: partition}, targets=engineid, block=False)
508 d_list.append(d)
508 d_list.append(d)
509 # Collect the deferred to deferred_ids
509 # Collect the deferred to deferred_ids
510 d = gatherBoth(d_list,
510 d = gatherBoth(d_list,
511 fireOnOneErrback=0,
511 fireOnOneErrback=0,
512 consumeErrors=1,
512 consumeErrors=1,
513 logErrors=0)
513 logErrors=0)
514 # Now d has a list of deferred_ids or Failures coming
514 # Now d has a list of deferred_ids or Failures coming
515 d.addCallback(error.collect_exceptions, 'scatter')
515 d.addCallback(error.collect_exceptions, 'scatter')
516 def process_did_list(did_list):
516 def process_did_list(did_list):
517 """Turn a list of deferred_ids into a final result or failure."""
517 """Turn a list of deferred_ids into a final result or failure."""
518 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
518 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
519 final_d = gatherBoth(new_d_list,
519 final_d = gatherBoth(new_d_list,
520 fireOnOneErrback=0,
520 fireOnOneErrback=0,
521 consumeErrors=1,
521 consumeErrors=1,
522 logErrors=0)
522 logErrors=0)
523 final_d.addCallback(error.collect_exceptions, 'scatter')
523 final_d.addCallback(error.collect_exceptions, 'scatter')
524 final_d.addCallback(lambda lop: [i[0] for i in lop])
524 final_d.addCallback(lambda lop: [i[0] for i in lop])
525 return final_d
525 return final_d
526 # Now, depending on block, we need to handle the list deferred_ids
526 # Now, depending on block, we need to handle the list deferred_ids
527 # coming down the pipe diferently.
527 # coming down the pipe diferently.
528 if block:
528 if block:
529 # If we are blocking register a callback that will transform the
529 # If we are blocking register a callback that will transform the
530 # list of deferred_ids into the final result.
530 # list of deferred_ids into the final result.
531 d.addCallback(process_did_list)
531 d.addCallback(process_did_list)
532 return d
532 return d
533 else:
533 else:
534 # Here we are going to use a _local_ PendingDeferredManager.
534 # Here we are going to use a _local_ PendingDeferredManager.
535 deferred_id = self.pdm.get_deferred_id()
535 deferred_id = self.pdm.get_deferred_id()
536 # This is the deferred we will return to the user that will fire
536 # This is the deferred we will return to the user that will fire
537 # with the local deferred_id AFTER we have received the list of
537 # with the local deferred_id AFTER we have received the list of
538 # primary deferred_ids
538 # primary deferred_ids
539 d_to_return = defer.Deferred()
539 d_to_return = defer.Deferred()
540 def do_it(did_list):
540 def do_it(did_list):
541 """Produce a deferred to the final result, but first fire the
541 """Produce a deferred to the final result, but first fire the
542 deferred we will return to the user that has the local
542 deferred we will return to the user that has the local
543 deferred id."""
543 deferred id."""
544 d_to_return.callback(deferred_id)
544 d_to_return.callback(deferred_id)
545 return process_did_list(did_list)
545 return process_did_list(did_list)
546 d.addCallback(do_it)
546 d.addCallback(do_it)
547 # Now save the deferred to the final result
547 # Now save the deferred to the final result
548 self.pdm.save_pending_deferred(d, deferred_id)
548 self.pdm.save_pending_deferred(d, deferred_id)
549 return d_to_return
549 return d_to_return
550
550
551 d = self._process_targets(targets)
551 d = self._process_targets(targets)
552 d.addCallback(do_scatter)
552 d.addCallback(do_scatter)
553 return d
553 return d
554
554
555 def gather(self, key, dist='b', targets='all', block=True):
555 def gather(self, key, dist='b', targets='all', block=True):
556
556
557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
558 # This enables us to collect a bunch fo deferred ids and make a secondary
558 # This enables us to collect a bunch fo deferred ids and make a secondary
559 # deferred id that corresponds to the entire group. This logic is extremely
559 # deferred id that corresponds to the entire group. This logic is extremely
560 # difficult to get right though.
560 # difficult to get right though.
561 def do_gather(engines):
561 def do_gather(engines):
562 nEngines = len(engines)
562 nEngines = len(engines)
563 mapClass = Map.dists[dist]
563 mapClass = Map.dists[dist]
564 mapObject = mapClass()
564 mapObject = mapClass()
565 d_list = []
565 d_list = []
566 # Loop through and push to each engine in non-blocking mode.
566 # Loop through and push to each engine in non-blocking mode.
567 # This returns a set of deferreds to deferred_ids
567 # This returns a set of deferreds to deferred_ids
568 for index, engineid in enumerate(engines):
568 for index, engineid in enumerate(engines):
569 d = self.pull(key, targets=engineid, block=False)
569 d = self.pull(key, targets=engineid, block=False)
570 d_list.append(d)
570 d_list.append(d)
571 # Collect the deferred to deferred_ids
571 # Collect the deferred to deferred_ids
572 d = gatherBoth(d_list,
572 d = gatherBoth(d_list,
573 fireOnOneErrback=0,
573 fireOnOneErrback=0,
574 consumeErrors=1,
574 consumeErrors=1,
575 logErrors=0)
575 logErrors=0)
576 # Now d has a list of deferred_ids or Failures coming
576 # Now d has a list of deferred_ids or Failures coming
577 d.addCallback(error.collect_exceptions, 'scatter')
577 d.addCallback(error.collect_exceptions, 'scatter')
578 def process_did_list(did_list):
578 def process_did_list(did_list):
579 """Turn a list of deferred_ids into a final result or failure."""
579 """Turn a list of deferred_ids into a final result or failure."""
580 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
580 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
581 final_d = gatherBoth(new_d_list,
581 final_d = gatherBoth(new_d_list,
582 fireOnOneErrback=0,
582 fireOnOneErrback=0,
583 consumeErrors=1,
583 consumeErrors=1,
584 logErrors=0)
584 logErrors=0)
585 final_d.addCallback(error.collect_exceptions, 'gather')
585 final_d.addCallback(error.collect_exceptions, 'gather')
586 final_d.addCallback(lambda lop: [i[0] for i in lop])
586 final_d.addCallback(lambda lop: [i[0] for i in lop])
587 final_d.addCallback(mapObject.joinPartitions)
587 final_d.addCallback(mapObject.joinPartitions)
588 return final_d
588 return final_d
589 # Now, depending on block, we need to handle the list deferred_ids
589 # Now, depending on block, we need to handle the list deferred_ids
590 # coming down the pipe diferently.
590 # coming down the pipe diferently.
591 if block:
591 if block:
592 # If we are blocking register a callback that will transform the
592 # If we are blocking register a callback that will transform the
593 # list of deferred_ids into the final result.
593 # list of deferred_ids into the final result.
594 d.addCallback(process_did_list)
594 d.addCallback(process_did_list)
595 return d
595 return d
596 else:
596 else:
597 # Here we are going to use a _local_ PendingDeferredManager.
597 # Here we are going to use a _local_ PendingDeferredManager.
598 deferred_id = self.pdm.get_deferred_id()
598 deferred_id = self.pdm.get_deferred_id()
599 # This is the deferred we will return to the user that will fire
599 # This is the deferred we will return to the user that will fire
600 # with the local deferred_id AFTER we have received the list of
600 # with the local deferred_id AFTER we have received the list of
601 # primary deferred_ids
601 # primary deferred_ids
602 d_to_return = defer.Deferred()
602 d_to_return = defer.Deferred()
603 def do_it(did_list):
603 def do_it(did_list):
604 """Produce a deferred to the final result, but first fire the
604 """Produce a deferred to the final result, but first fire the
605 deferred we will return to the user that has the local
605 deferred we will return to the user that has the local
606 deferred id."""
606 deferred id."""
607 d_to_return.callback(deferred_id)
607 d_to_return.callback(deferred_id)
608 return process_did_list(did_list)
608 return process_did_list(did_list)
609 d.addCallback(do_it)
609 d.addCallback(do_it)
610 # Now save the deferred to the final result
610 # Now save the deferred to the final result
611 self.pdm.save_pending_deferred(d, deferred_id)
611 self.pdm.save_pending_deferred(d, deferred_id)
612 return d_to_return
612 return d_to_return
613
613
614 d = self._process_targets(targets)
614 d = self._process_targets(targets)
615 d.addCallback(do_gather)
615 d.addCallback(do_gather)
616 return d
616 return d
617
617
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
619 """
619 """
620 A parallelized version of Python's builtin map.
620 A parallelized version of Python's builtin map.
621
621
622 This has a slightly different syntax than the builtin `map`.
622 This has a slightly different syntax than the builtin `map`.
623 This is needed because we need to have keyword arguments and thus
623 This is needed because we need to have keyword arguments and thus
624 can't use *args to capture all the sequences. Instead, they must
624 can't use *args to capture all the sequences. Instead, they must
625 be passed in a list or tuple.
625 be passed in a list or tuple.
626
626
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628
628
629 Most users will want to use parallel functions or the `mapper`
629 Most users will want to use parallel functions or the `mapper`
630 and `map` methods for an API that follows that of the builtin
630 and `map` methods for an API that follows that of the builtin
631 `map`.
631 `map`.
632 """
632 """
633 if not isinstance(sequences, (list, tuple)):
633 if not isinstance(sequences, (list, tuple)):
634 raise TypeError('sequences must be a list or tuple')
634 raise TypeError('sequences must be a list or tuple')
635 max_len = max(len(s) for s in sequences)
635 max_len = max(len(s) for s in sequences)
636 for s in sequences:
636 for s in sequences:
637 if len(s)!=max_len:
637 if len(s)!=max_len:
638 raise ValueError('all sequences must have equal length')
638 raise ValueError('all sequences must have equal length')
639 if isinstance(func, FunctionType):
639 if isinstance(func, FunctionType):
640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
643 elif isinstance(func, str):
643 elif isinstance(func, str):
644 d = defer.succeed(None)
644 d = defer.succeed(None)
645 sourceToRun = \
645 sourceToRun = \
646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
647 else:
647 else:
648 raise TypeError("func must be a function or str")
648 raise TypeError("func must be a function or str")
649
649
650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
654 return d
654 return d
655
655
656 def map(self, func, *sequences):
656 def map(self, func, *sequences):
657 """
657 """
658 A parallel version of Python's builtin `map` function.
658 A parallel version of Python's builtin `map` function.
659
659
660 This method applies a function to sequences of arguments. It
660 This method applies a function to sequences of arguments. It
661 follows the same syntax as the builtin `map`.
661 follows the same syntax as the builtin `map`.
662
662
663 This method creates a mapper objects by calling `self.mapper` with
663 This method creates a mapper objects by calling `self.mapper` with
664 no arguments and then uses that mapper to do the mapping. See
664 no arguments and then uses that mapper to do the mapping. See
665 the documentation of `mapper` for more details.
665 the documentation of `mapper` for more details.
666 """
666 """
667 return self.mapper().map(func, *sequences)
667 return self.mapper().map(func, *sequences)
668
668
669 def mapper(self, dist='b', targets='all', block=True):
669 def mapper(self, dist='b', targets='all', block=True):
670 """
670 """
671 Create a mapper object that has a `map` method.
671 Create a mapper object that has a `map` method.
672
672
673 This method returns an object that implements the `IMapper`
673 This method returns an object that implements the `IMapper`
674 interface. This method is a factory that is used to control how
674 interface. This method is a factory that is used to control how
675 the map happens.
675 the map happens.
676
676
677 :Parameters:
677 :Parameters:
678 dist : str
678 dist : str
679 What decomposition to use, 'b' is the only one supported
679 What decomposition to use, 'b' is the only one supported
680 currently
680 currently
681 targets : str, int, sequence of ints
681 targets : str, int, sequence of ints
682 Which engines to use for the map
682 Which engines to use for the map
683 block : boolean
683 block : boolean
684 Should calls to `map` block or not
684 Should calls to `map` block or not
685 """
685 """
686 return MultiEngineMapper(self, dist, targets, block)
686 return MultiEngineMapper(self, dist, targets, block)
687
687
688 def parallel(self, dist='b', targets='all', block=True):
688 def parallel(self, dist='b', targets='all', block=True):
689 """
689 """
690 A decorator that turns a function into a parallel function.
690 A decorator that turns a function into a parallel function.
691
691
692 This can be used as:
692 This can be used as:
693
693
694 @parallel()
694 @parallel()
695 def f(x, y)
695 def f(x, y)
696 ...
696 ...
697
697
698 f(range(10), range(10))
698 f(range(10), range(10))
699
699
700 This causes f(0,0), f(1,1), ... to be called in parallel.
700 This causes f(0,0), f(1,1), ... to be called in parallel.
701
701
702 :Parameters:
702 :Parameters:
703 dist : str
703 dist : str
704 What decomposition to use, 'b' is the only one supported
704 What decomposition to use, 'b' is the only one supported
705 currently
705 currently
706 targets : str, int, sequence of ints
706 targets : str, int, sequence of ints
707 Which engines to use for the map
707 Which engines to use for the map
708 block : boolean
708 block : boolean
709 Should calls to `map` block or not
709 Should calls to `map` block or not
710 """
710 """
711 mapper = self.mapper(dist, targets, block)
711 mapper = self.mapper(dist, targets, block)
712 pf = ParallelFunction(mapper)
712 pf = ParallelFunction(mapper)
713 return pf
713 return pf
714
714
715 #---------------------------------------------------------------------------
715 #---------------------------------------------------------------------------
716 # ISynchronousMultiEngineExtras related methods
716 # ISynchronousMultiEngineExtras related methods
717 #---------------------------------------------------------------------------
717 #---------------------------------------------------------------------------
718
718
719 def _transformPullResult(self, pushResult, multitargets, lenKeys):
719 def _transformPullResult(self, pushResult, multitargets, lenKeys):
720 if not multitargets:
720 if not multitargets:
721 result = pushResult[0]
721 result = pushResult[0]
722 elif lenKeys > 1:
722 elif lenKeys > 1:
723 result = zip(*pushResult)
723 result = zip(*pushResult)
724 elif lenKeys is 1:
724 elif lenKeys is 1:
725 result = list(pushResult)
725 result = list(pushResult)
726 return result
726 return result
727
727
728 def zip_pull(self, keys, targets='all', block=True):
728 def zip_pull(self, keys, targets='all', block=True):
729 multitargets = not isinstance(targets, int) and len(targets) > 1
729 multitargets = not isinstance(targets, int) and len(targets) > 1
730 lenKeys = len(keys)
730 lenKeys = len(keys)
731 d = self.pull(keys, targets=targets, block=block)
731 d = self.pull(keys, targets=targets, block=block)
732 if block:
732 if block:
733 d.addCallback(self._transformPullResult, multitargets, lenKeys)
733 d.addCallback(self._transformPullResult, multitargets, lenKeys)
734 else:
734 else:
735 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
735 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
736 return d
736 return d
737
737
738 def run(self, fname, targets='all', block=True):
738 def run(self, fname, targets='all', block=True):
739 fileobj = open(fname,'r')
739 fileobj = open(fname,'r')
740 source = fileobj.read()
740 source = fileobj.read()
741 fileobj.close()
741 fileobj.close()
742 # if the compilation blows, we get a local error right away
742 # if the compilation blows, we get a local error right away
743 try:
743 try:
744 code = compile(source,fname,'exec')
744 code = compile(source,fname,'exec')
745 except:
745 except:
746 return defer.fail(failure.Failure())
746 return defer.fail(failure.Failure())
747 # Now run the code
747 # Now run the code
748 d = self.execute(source, targets=targets, block=block)
748 d = self.execute(source, targets=targets, block=block)
749 return d
749 return d
750
750
751 #---------------------------------------------------------------------------
751 #---------------------------------------------------------------------------
752 # IBlockingClientAdaptor related methods
752 # IBlockingClientAdaptor related methods
753 #---------------------------------------------------------------------------
753 #---------------------------------------------------------------------------
754
754
755 def adapt_to_blocking_client(self):
755 def adapt_to_blocking_client(self):
756 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
756 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
757 return IFullBlockingMultiEngineClient(self)
757 return IFullBlockingMultiEngineClient(self)
@@ -1,521 +1,723 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 pjoin = os.path.join
22 pjoin = os.path.join
22
23
23 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
24 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
27 from twisted.python import failure, log
28 from twisted.python import failure, log
28
29
29 from IPython.external import argparse
30 from IPython.external import argparse
30 from IPython.external import Itpl
31 from IPython.external import Itpl
31 from IPython.genutils import get_ipython_dir, num_cpus
32 from IPython.genutils import get_ipython_dir, num_cpus
32 from IPython.kernel.fcutil import have_crypto
33 from IPython.kernel.fcutil import have_crypto
33 from IPython.kernel.error import SecurityError
34 from IPython.kernel.error import SecurityError
34 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.twistedutil import gatherBoth
36 from IPython.kernel.twistedutil import gatherBoth
36 from IPython.kernel.util import printer
37 from IPython.kernel.util import printer
37
38
38
39
39 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
40 # General process handling code
41 # General process handling code
41 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
42
43
43 def find_exe(cmd):
44 def find_exe(cmd):
44 try:
45 try:
45 import win32api
46 import win32api
46 except ImportError:
47 except ImportError:
47 raise ImportError('you need to have pywin32 installed for this to work')
48 raise ImportError('you need to have pywin32 installed for this to work')
48 else:
49 else:
49 try:
50 try:
50 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 except:
52 except:
52 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
53 return path
54 return path
54
55
55 class ProcessStateError(Exception):
56 class ProcessStateError(Exception):
56 pass
57 pass
57
58
58 class UnknownStatus(Exception):
59 class UnknownStatus(Exception):
59 pass
60 pass
60
61
61 class LauncherProcessProtocol(ProcessProtocol):
62 class LauncherProcessProtocol(ProcessProtocol):
62 """
63 """
63 A ProcessProtocol to go with the ProcessLauncher.
64 A ProcessProtocol to go with the ProcessLauncher.
64 """
65 """
65 def __init__(self, process_launcher):
66 def __init__(self, process_launcher):
66 self.process_launcher = process_launcher
67 self.process_launcher = process_launcher
67
68
68 def connectionMade(self):
69 def connectionMade(self):
69 self.process_launcher.fire_start_deferred(self.transport.pid)
70 self.process_launcher.fire_start_deferred(self.transport.pid)
70
71
71 def processEnded(self, status):
72 def processEnded(self, status):
72 value = status.value
73 value = status.value
73 if isinstance(value, ProcessDone):
74 if isinstance(value, ProcessDone):
74 self.process_launcher.fire_stop_deferred(0)
75 self.process_launcher.fire_stop_deferred(0)
75 elif isinstance(value, ProcessTerminated):
76 elif isinstance(value, ProcessTerminated):
76 self.process_launcher.fire_stop_deferred(
77 self.process_launcher.fire_stop_deferred(
77 {'exit_code':value.exitCode,
78 {'exit_code':value.exitCode,
78 'signal':value.signal,
79 'signal':value.signal,
79 'status':value.status
80 'status':value.status
80 }
81 }
81 )
82 )
82 else:
83 else:
83 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84
85
85 def outReceived(self, data):
86 def outReceived(self, data):
86 log.msg(data)
87 log.msg(data)
87
88
88 def errReceived(self, data):
89 def errReceived(self, data):
89 log.err(data)
90 log.err(data)
90
91
91 class ProcessLauncher(object):
92 class ProcessLauncher(object):
92 """
93 """
93 Start and stop an external process in an asynchronous manner.
94 Start and stop an external process in an asynchronous manner.
94
95
95 Currently this uses deferreds to notify other parties of process state
96 Currently this uses deferreds to notify other parties of process state
96 changes. This is an awkward design and should be moved to using
97 changes. This is an awkward design and should be moved to using
97 a formal NotificationCenter.
98 a formal NotificationCenter.
98 """
99 """
99 def __init__(self, cmd_and_args):
100 def __init__(self, cmd_and_args):
100 self.cmd = cmd_and_args[0]
101 self.cmd = cmd_and_args[0]
101 self.args = cmd_and_args
102 self.args = cmd_and_args
102 self._reset()
103 self._reset()
103
104
104 def _reset(self):
105 def _reset(self):
105 self.process_protocol = None
106 self.process_protocol = None
106 self.pid = None
107 self.pid = None
107 self.start_deferred = None
108 self.start_deferred = None
108 self.stop_deferreds = []
109 self.stop_deferreds = []
109 self.state = 'before' # before, running, or after
110 self.state = 'before' # before, running, or after
110
111
111 @property
112 @property
112 def running(self):
113 def running(self):
113 if self.state == 'running':
114 if self.state == 'running':
114 return True
115 return True
115 else:
116 else:
116 return False
117 return False
117
118
118 def fire_start_deferred(self, pid):
119 def fire_start_deferred(self, pid):
119 self.pid = pid
120 self.pid = pid
120 self.state = 'running'
121 self.state = 'running'
121 log.msg('Process %r has started with pid=%i' % (self.args, pid))
122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
122 self.start_deferred.callback(pid)
123 self.start_deferred.callback(pid)
123
124
124 def start(self):
125 def start(self):
125 if self.state == 'before':
126 if self.state == 'before':
126 self.process_protocol = LauncherProcessProtocol(self)
127 self.process_protocol = LauncherProcessProtocol(self)
127 self.start_deferred = defer.Deferred()
128 self.start_deferred = defer.Deferred()
128 self.process_transport = reactor.spawnProcess(
129 self.process_transport = reactor.spawnProcess(
129 self.process_protocol,
130 self.process_protocol,
130 self.cmd,
131 self.cmd,
131 self.args,
132 self.args,
132 env=os.environ
133 env=os.environ
133 )
134 )
134 return self.start_deferred
135 return self.start_deferred
135 else:
136 else:
136 s = 'the process has already been started and has state: %r' % \
137 s = 'the process has already been started and has state: %r' % \
137 self.state
138 self.state
138 return defer.fail(ProcessStateError(s))
139 return defer.fail(ProcessStateError(s))
139
140
140 def get_stop_deferred(self):
141 def get_stop_deferred(self):
141 if self.state == 'running' or self.state == 'before':
142 if self.state == 'running' or self.state == 'before':
142 d = defer.Deferred()
143 d = defer.Deferred()
143 self.stop_deferreds.append(d)
144 self.stop_deferreds.append(d)
144 return d
145 return d
145 else:
146 else:
146 s = 'this process is already complete'
147 s = 'this process is already complete'
147 return defer.fail(ProcessStateError(s))
148 return defer.fail(ProcessStateError(s))
148
149
149 def fire_stop_deferred(self, exit_code):
150 def fire_stop_deferred(self, exit_code):
150 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
151 self.state = 'after'
152 self.state = 'after'
152 for d in self.stop_deferreds:
153 for d in self.stop_deferreds:
153 d.callback(exit_code)
154 d.callback(exit_code)
154
155
155 def signal(self, sig):
156 def signal(self, sig):
156 """
157 """
157 Send a signal to the process.
158 Send a signal to the process.
158
159
159 The argument sig can be ('KILL','INT', etc.) or any signal number.
160 The argument sig can be ('KILL','INT', etc.) or any signal number.
160 """
161 """
161 if self.state == 'running':
162 if self.state == 'running':
162 self.process_transport.signalProcess(sig)
163 self.process_transport.signalProcess(sig)
163
164
164 # def __del__(self):
165 # def __del__(self):
165 # self.signal('KILL')
166 # self.signal('KILL')
166
167
167 def interrupt_then_kill(self, delay=1.0):
168 def interrupt_then_kill(self, delay=1.0):
168 self.signal('INT')
169 self.signal('INT')
169 reactor.callLater(delay, self.signal, 'KILL')
170 reactor.callLater(delay, self.signal, 'KILL')
170
171
171
172
172 #-----------------------------------------------------------------------------
173 #-----------------------------------------------------------------------------
173 # Code for launching controller and engines
174 # Code for launching controller and engines
174 #-----------------------------------------------------------------------------
175 #-----------------------------------------------------------------------------
175
176
176
177
177 class ControllerLauncher(ProcessLauncher):
178 class ControllerLauncher(ProcessLauncher):
178
179
179 def __init__(self, extra_args=None):
180 def __init__(self, extra_args=None):
180 if sys.platform == 'win32':
181 if sys.platform == 'win32':
181 # This logic is needed because the ipcontroller script doesn't
182 # This logic is needed because the ipcontroller script doesn't
182 # always get installed in the same way or in the same location.
183 # always get installed in the same way or in the same location.
183 from IPython.kernel.scripts import ipcontroller
184 from IPython.kernel.scripts import ipcontroller
184 script_location = ipcontroller.__file__.replace('.pyc', '.py')
185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
185 # The -u option here turns on unbuffered output, which is required
186 # The -u option here turns on unbuffered output, which is required
186 # on Win32 to prevent wierd conflict and problems with Twisted
187 # on Win32 to prevent wierd conflict and problems with Twisted
187 args = [find_exe('python'), '-u', script_location]
188 args = [find_exe('python'), '-u', script_location]
188 else:
189 else:
189 args = ['ipcontroller']
190 args = ['ipcontroller']
190 self.extra_args = extra_args
191 self.extra_args = extra_args
191 if extra_args is not None:
192 if extra_args is not None:
192 args.extend(extra_args)
193 args.extend(extra_args)
193
194
194 ProcessLauncher.__init__(self, args)
195 ProcessLauncher.__init__(self, args)
195
196
196
197
197 class EngineLauncher(ProcessLauncher):
198 class EngineLauncher(ProcessLauncher):
198
199
199 def __init__(self, extra_args=None):
200 def __init__(self, extra_args=None):
200 if sys.platform == 'win32':
201 if sys.platform == 'win32':
201 # This logic is needed because the ipcontroller script doesn't
202 # This logic is needed because the ipcontroller script doesn't
202 # always get installed in the same way or in the same location.
203 # always get installed in the same way or in the same location.
203 from IPython.kernel.scripts import ipengine
204 from IPython.kernel.scripts import ipengine
204 script_location = ipengine.__file__.replace('.pyc', '.py')
205 script_location = ipengine.__file__.replace('.pyc', '.py')
205 # The -u option here turns on unbuffered output, which is required
206 # The -u option here turns on unbuffered output, which is required
206 # on Win32 to prevent wierd conflict and problems with Twisted
207 # on Win32 to prevent wierd conflict and problems with Twisted
207 args = [find_exe('python'), '-u', script_location]
208 args = [find_exe('python'), '-u', script_location]
208 else:
209 else:
209 args = ['ipengine']
210 args = ['ipengine']
210 self.extra_args = extra_args
211 self.extra_args = extra_args
211 if extra_args is not None:
212 if extra_args is not None:
212 args.extend(extra_args)
213 args.extend(extra_args)
213
214
214 ProcessLauncher.__init__(self, args)
215 ProcessLauncher.__init__(self, args)
215
216
216
217
217 class LocalEngineSet(object):
218 class LocalEngineSet(object):
218
219
219 def __init__(self, extra_args=None):
220 def __init__(self, extra_args=None):
220 self.extra_args = extra_args
221 self.extra_args = extra_args
221 self.launchers = []
222 self.launchers = []
222
223
223 def start(self, n):
224 def start(self, n):
224 dlist = []
225 dlist = []
225 for i in range(n):
226 for i in range(n):
226 el = EngineLauncher(extra_args=self.extra_args)
227 el = EngineLauncher(extra_args=self.extra_args)
227 d = el.start()
228 d = el.start()
228 self.launchers.append(el)
229 self.launchers.append(el)
229 dlist.append(d)
230 dlist.append(d)
230 dfinal = gatherBoth(dlist, consumeErrors=True)
231 dfinal = gatherBoth(dlist, consumeErrors=True)
231 dfinal.addCallback(self._handle_start)
232 dfinal.addCallback(self._handle_start)
232 return dfinal
233 return dfinal
233
234
234 def _handle_start(self, r):
235 def _handle_start(self, r):
235 log.msg('Engines started with pids: %r' % r)
236 log.msg('Engines started with pids: %r' % r)
236 return r
237 return r
237
238
238 def _handle_stop(self, r):
239 def _handle_stop(self, r):
239 log.msg('Engines received signal: %r' % r)
240 log.msg('Engines received signal: %r' % r)
240 return r
241 return r
241
242
242 def signal(self, sig):
243 def signal(self, sig):
243 dlist = []
244 dlist = []
244 for el in self.launchers:
245 for el in self.launchers:
245 d = el.get_stop_deferred()
246 d = el.get_stop_deferred()
246 dlist.append(d)
247 dlist.append(d)
247 el.signal(sig)
248 el.signal(sig)
248 dfinal = gatherBoth(dlist, consumeErrors=True)
249 dfinal = gatherBoth(dlist, consumeErrors=True)
249 dfinal.addCallback(self._handle_stop)
250 dfinal.addCallback(self._handle_stop)
250 return dfinal
251 return dfinal
251
252
252 def interrupt_then_kill(self, delay=1.0):
253 def interrupt_then_kill(self, delay=1.0):
253 dlist = []
254 dlist = []
254 for el in self.launchers:
255 for el in self.launchers:
255 d = el.get_stop_deferred()
256 d = el.get_stop_deferred()
256 dlist.append(d)
257 dlist.append(d)
257 el.interrupt_then_kill(delay)
258 el.interrupt_then_kill(delay)
258 dfinal = gatherBoth(dlist, consumeErrors=True)
259 dfinal = gatherBoth(dlist, consumeErrors=True)
259 dfinal.addCallback(self._handle_stop)
260 dfinal.addCallback(self._handle_stop)
260 return dfinal
261 return dfinal
261
262
262
263
263 class BatchEngineSet(object):
264 class BatchEngineSet(object):
264
265
265 # Subclasses must fill these in. See PBSEngineSet
266 # Subclasses must fill these in. See PBSEngineSet
266 submit_command = ''
267 submit_command = ''
267 delete_command = ''
268 delete_command = ''
268 job_id_regexp = ''
269 job_id_regexp = ''
269
270
270 def __init__(self, template_file, **kwargs):
271 def __init__(self, template_file, **kwargs):
271 self.template_file = template_file
272 self.template_file = template_file
272 self.context = {}
273 self.context = {}
273 self.context.update(kwargs)
274 self.context.update(kwargs)
274 self.batch_file = self.template_file+'-run'
275 self.batch_file = self.template_file+'-run'
275
276
276 def parse_job_id(self, output):
277 def parse_job_id(self, output):
277 m = re.match(self.job_id_regexp, output)
278 m = re.match(self.job_id_regexp, output)
278 if m is not None:
279 if m is not None:
279 job_id = m.group()
280 job_id = m.group()
280 else:
281 else:
281 raise Exception("job id couldn't be determined: %s" % output)
282 raise Exception("job id couldn't be determined: %s" % output)
282 self.job_id = job_id
283 self.job_id = job_id
283 log.msg('Job started with job id: %r' % job_id)
284 log.msg('Job started with job id: %r' % job_id)
284 return job_id
285 return job_id
285
286
286 def write_batch_script(self, n):
287 def write_batch_script(self, n):
287 self.context['n'] = n
288 self.context['n'] = n
288 template = open(self.template_file, 'r').read()
289 template = open(self.template_file, 'r').read()
289 log.msg('Using template for batch script: %s' % self.template_file)
290 log.msg('Using template for batch script: %s' % self.template_file)
290 script_as_string = Itpl.itplns(template, self.context)
291 script_as_string = Itpl.itplns(template, self.context)
291 log.msg('Writing instantiated batch script: %s' % self.batch_file)
292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
292 f = open(self.batch_file,'w')
293 f = open(self.batch_file,'w')
293 f.write(script_as_string)
294 f.write(script_as_string)
294 f.close()
295 f.close()
295
296
296 def handle_error(self, f):
297 def handle_error(self, f):
297 f.printTraceback()
298 f.printTraceback()
298 f.raiseException()
299 f.raiseException()
299
300
300 def start(self, n):
301 def start(self, n):
301 self.write_batch_script(n)
302 self.write_batch_script(n)
302 d = getProcessOutput(self.submit_command,
303 d = getProcessOutput(self.submit_command,
303 [self.batch_file],env=os.environ)
304 [self.batch_file],env=os.environ)
304 d.addCallback(self.parse_job_id)
305 d.addCallback(self.parse_job_id)
305 d.addErrback(self.handle_error)
306 d.addErrback(self.handle_error)
306 return d
307 return d
307
308
308 def kill(self):
309 def kill(self):
309 d = getProcessOutput(self.delete_command,
310 d = getProcessOutput(self.delete_command,
310 [self.job_id],env=os.environ)
311 [self.job_id],env=os.environ)
311 return d
312 return d
312
313
313 class PBSEngineSet(BatchEngineSet):
314 class PBSEngineSet(BatchEngineSet):
314
315
315 submit_command = 'qsub'
316 submit_command = 'qsub'
316 delete_command = 'qdel'
317 delete_command = 'qdel'
317 job_id_regexp = '\d+'
318 job_id_regexp = '\d+'
318
319
319 def __init__(self, template_file, **kwargs):
320 def __init__(self, template_file, **kwargs):
320 BatchEngineSet.__init__(self, template_file, **kwargs)
321 BatchEngineSet.__init__(self, template_file, **kwargs)
321
322
322
323
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
326 echo $!
327 """
328
329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
332
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
336
337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
339
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
344 """
345
346 self.temp_dir = tempfile.gettempdir()
347 if sshx is not None:
348 self.sshx = sshx
349 else:
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
355 f = open(self.sshx, 'w')
356 f.writelines(self.sshx_template)
357 f.close()
358 self.engine_command = ipengine
359 self.engine_hosts = engine_hosts
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
365 f = open(self.engine_killer, 'w')
366 f.writelines(self.engine_killer_template)
367 f.close()
368
369 def start(self, send_furl=False):
370 dlist = []
371 for host in self.engine_hosts.keys():
372 count = self.engine_hosts[host]
373 d = self._start(host, count, send_furl)
374 dlist.append(d)
375 return gatherBoth(dlist, consumeErrors=True)
376
377 def _start(self, hostname, count=1, send_furl=False):
378 if send_furl:
379 d = self._scp_furl(hostname)
380 else:
381 d = defer.succeed(None)
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 return d
385
386 def _scp_furl(self, hostname):
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 cmd_list = scp_cmd.split()
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 log.msg('Copying furl file: %s' % scp_cmd)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 return d
393
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
398 )
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
401 sshx_scp = scp_cmd.split()
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 return d
404
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
409 )
410 cmds = exec_engine.split()
411 dlist = []
412 log.msg("about to start engines...")
413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
418
419 def kill(self):
420 dlist = []
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
425
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
430 return d
431
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
435 hostname,
436 self.temp_dir,
437 os.environ['USER']
438 )
439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
442 return d
443
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
447 self.temp_dir,
448 os.environ['USER']
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
454
455 def _exec_err(self, r):
456 log.msg(r)
457
323 #-----------------------------------------------------------------------------
458 #-----------------------------------------------------------------------------
324 # Main functions for the different types of clusters
459 # Main functions for the different types of clusters
325 #-----------------------------------------------------------------------------
460 #-----------------------------------------------------------------------------
326
461
327 # TODO:
462 # TODO:
328 # The logic in these codes should be moved into classes like LocalCluster
463 # The logic in these codes should be moved into classes like LocalCluster
329 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
330 # The main functions should then just parse the command line arguments, create
465 # The main functions should then just parse the command line arguments, create
331 # the appropriate class and call a 'start' method.
466 # the appropriate class and call a 'start' method.
332
467
333 def check_security(args, cont_args):
468 def check_security(args, cont_args):
334 if (not args.x or not args.y) and not have_crypto:
469 if (not args.x or not args.y) and not have_crypto:
335 log.err("""
470 log.err("""
336 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
337 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
338 reactor.stop()
473 reactor.stop()
339 return False
474 return False
340 if args.x:
475 if args.x:
341 cont_args.append('-x')
476 cont_args.append('-x')
342 if args.y:
477 if args.y:
343 cont_args.append('-y')
478 cont_args.append('-y')
344 return True
479 return True
345
480
481
346 def main_local(args):
482 def main_local(args):
347 cont_args = []
483 cont_args = []
348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
349
485
350 # Check security settings before proceeding
486 # Check security settings before proceeding
351 if not check_security(args, cont_args):
487 if not check_security(args, cont_args):
352 return
488 return
353
489
354 cl = ControllerLauncher(extra_args=cont_args)
490 cl = ControllerLauncher(extra_args=cont_args)
355 dstart = cl.start()
491 dstart = cl.start()
356 def start_engines(cont_pid):
492 def start_engines(cont_pid):
357 engine_args = []
493 engine_args = []
358 engine_args.append('--logfile=%s' % \
494 engine_args.append('--logfile=%s' % \
359 pjoin(args.logdir,'ipengine%s-' % cont_pid))
495 pjoin(args.logdir,'ipengine%s-' % cont_pid))
360 eset = LocalEngineSet(extra_args=engine_args)
496 eset = LocalEngineSet(extra_args=engine_args)
361 def shutdown(signum, frame):
497 def shutdown(signum, frame):
362 log.msg('Stopping local cluster')
498 log.msg('Stopping local cluster')
363 # We are still playing with the times here, but these seem
499 # We are still playing with the times here, but these seem
364 # to be reliable in allowing everything to exit cleanly.
500 # to be reliable in allowing everything to exit cleanly.
365 eset.interrupt_then_kill(0.5)
501 eset.interrupt_then_kill(0.5)
366 cl.interrupt_then_kill(0.5)
502 cl.interrupt_then_kill(0.5)
367 reactor.callLater(1.0, reactor.stop)
503 reactor.callLater(1.0, reactor.stop)
368 signal.signal(signal.SIGINT,shutdown)
504 signal.signal(signal.SIGINT,shutdown)
369 d = eset.start(args.n)
505 d = eset.start(args.n)
370 return d
506 return d
371 def delay_start(cont_pid):
507 def delay_start(cont_pid):
372 # This is needed because the controller doesn't start listening
508 # This is needed because the controller doesn't start listening
373 # right when it starts and the controller needs to write
509 # right when it starts and the controller needs to write
374 # furl files for the engine to pick up
510 # furl files for the engine to pick up
375 reactor.callLater(1.0, start_engines, cont_pid)
511 reactor.callLater(1.0, start_engines, cont_pid)
376 dstart.addCallback(delay_start)
512 dstart.addCallback(delay_start)
377 dstart.addErrback(lambda f: f.raiseException())
513 dstart.addErrback(lambda f: f.raiseException())
378
514
515
379 def main_mpirun(args):
516 def main_mpirun(args):
380 cont_args = []
517 cont_args = []
381 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
518 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
382
519
383 # Check security settings before proceeding
520 # Check security settings before proceeding
384 if not check_security(args, cont_args):
521 if not check_security(args, cont_args):
385 return
522 return
386
523
387 cl = ControllerLauncher(extra_args=cont_args)
524 cl = ControllerLauncher(extra_args=cont_args)
388 dstart = cl.start()
525 dstart = cl.start()
389 def start_engines(cont_pid):
526 def start_engines(cont_pid):
390 raw_args = ['mpirun']
527 raw_args = ['mpirun']
391 raw_args.extend(['-n',str(args.n)])
528 raw_args.extend(['-n',str(args.n)])
392 raw_args.append('ipengine')
529 raw_args.append('ipengine')
393 raw_args.append('-l')
530 raw_args.append('-l')
394 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
531 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
395 if args.mpi:
532 if args.mpi:
396 raw_args.append('--mpi=%s' % args.mpi)
533 raw_args.append('--mpi=%s' % args.mpi)
397 eset = ProcessLauncher(raw_args)
534 eset = ProcessLauncher(raw_args)
398 def shutdown(signum, frame):
535 def shutdown(signum, frame):
399 log.msg('Stopping local cluster')
536 log.msg('Stopping local cluster')
400 # We are still playing with the times here, but these seem
537 # We are still playing with the times here, but these seem
401 # to be reliable in allowing everything to exit cleanly.
538 # to be reliable in allowing everything to exit cleanly.
402 eset.interrupt_then_kill(1.0)
539 eset.interrupt_then_kill(1.0)
403 cl.interrupt_then_kill(1.0)
540 cl.interrupt_then_kill(1.0)
404 reactor.callLater(2.0, reactor.stop)
541 reactor.callLater(2.0, reactor.stop)
405 signal.signal(signal.SIGINT,shutdown)
542 signal.signal(signal.SIGINT,shutdown)
406 d = eset.start()
543 d = eset.start()
407 return d
544 return d
408 def delay_start(cont_pid):
545 def delay_start(cont_pid):
409 # This is needed because the controller doesn't start listening
546 # This is needed because the controller doesn't start listening
410 # right when it starts and the controller needs to write
547 # right when it starts and the controller needs to write
411 # furl files for the engine to pick up
548 # furl files for the engine to pick up
412 reactor.callLater(1.0, start_engines, cont_pid)
549 reactor.callLater(1.0, start_engines, cont_pid)
413 dstart.addCallback(delay_start)
550 dstart.addCallback(delay_start)
414 dstart.addErrback(lambda f: f.raiseException())
551 dstart.addErrback(lambda f: f.raiseException())
415
552
553
416 def main_pbs(args):
554 def main_pbs(args):
417 cont_args = []
555 cont_args = []
418 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
556 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
419
557
420 # Check security settings before proceeding
558 # Check security settings before proceeding
421 if not check_security(args, cont_args):
559 if not check_security(args, cont_args):
422 return
560 return
423
561
424 cl = ControllerLauncher(extra_args=cont_args)
562 cl = ControllerLauncher(extra_args=cont_args)
425 dstart = cl.start()
563 dstart = cl.start()
426 def start_engines(r):
564 def start_engines(r):
427 pbs_set = PBSEngineSet(args.pbsscript)
565 pbs_set = PBSEngineSet(args.pbsscript)
428 def shutdown(signum, frame):
566 def shutdown(signum, frame):
429 log.msg('Stopping pbs cluster')
567 log.msg('Stopping pbs cluster')
430 d = pbs_set.kill()
568 d = pbs_set.kill()
431 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
569 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
432 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
570 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
433 signal.signal(signal.SIGINT,shutdown)
571 signal.signal(signal.SIGINT,shutdown)
434 d = pbs_set.start(args.n)
572 d = pbs_set.start(args.n)
435 return d
573 return d
436 dstart.addCallback(start_engines)
574 dstart.addCallback(start_engines)
437 dstart.addErrback(lambda f: f.raiseException())
575 dstart.addErrback(lambda f: f.raiseException())
438
576
439
577
578 def main_ssh(args):
579 """Start a controller on localhost and engines using ssh.
580
581 Your clusterfile should look like::
582
583 send_furl = False # True, if you want
584 engines = {
585 'engine_host1' : engine_count,
586 'engine_host2' : engine_count2
587 }
588 """
589 clusterfile = {}
590 execfile(args.clusterfile, clusterfile)
591 if not clusterfile.has_key('send_furl'):
592 clusterfile['send_furl'] = False
593
594 cont_args = []
595 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
596
597 # Check security settings before proceeding
598 if not check_security(args, cont_args):
599 return
600
601 cl = ControllerLauncher(extra_args=cont_args)
602 dstart = cl.start()
603 def start_engines(cont_pid):
604 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
605 def shutdown(signum, frame):
606 d = ssh_set.kill()
607 # d.addErrback(log.err)
608 cl.interrupt_then_kill(1.0)
609 reactor.callLater(2.0, reactor.stop)
610 signal.signal(signal.SIGINT,shutdown)
611 d = ssh_set.start(clusterfile['send_furl'])
612 return d
613
614 def delay_start(cont_pid):
615 reactor.callLater(1.0, start_engines, cont_pid)
616
617 dstart.addCallback(delay_start)
618 dstart.addErrback(lambda f: f.raiseException())
619
620
440 def get_args():
621 def get_args():
441 base_parser = argparse.ArgumentParser(add_help=False)
622 base_parser = argparse.ArgumentParser(add_help=False)
442 base_parser.add_argument(
623 base_parser.add_argument(
443 '-x',
624 '-x',
444 action='store_true',
625 action='store_true',
445 dest='x',
626 dest='x',
446 help='turn off client security'
627 help='turn off client security'
447 )
628 )
448 base_parser.add_argument(
629 base_parser.add_argument(
449 '-y',
630 '-y',
450 action='store_true',
631 action='store_true',
451 dest='y',
632 dest='y',
452 help='turn off engine security'
633 help='turn off engine security'
453 )
634 )
454 base_parser.add_argument(
635 base_parser.add_argument(
455 "--logdir",
636 "--logdir",
456 type=str,
637 type=str,
457 dest="logdir",
638 dest="logdir",
458 help="directory to put log files (default=$IPYTHONDIR/log)",
639 help="directory to put log files (default=$IPYTHONDIR/log)",
459 default=pjoin(get_ipython_dir(),'log')
640 default=pjoin(get_ipython_dir(),'log')
460 )
641 )
461 base_parser.add_argument(
642 base_parser.add_argument(
462 "-n",
643 "-n",
463 "--num",
644 "--num",
464 type=int,
645 type=int,
465 dest="n",
646 dest="n",
466 default=2,
647 default=2,
467 help="the number of engines to start"
648 help="the number of engines to start"
468 )
649 )
469
650
470 parser = argparse.ArgumentParser(
651 parser = argparse.ArgumentParser(
471 description='IPython cluster startup. This starts a controller and\
652 description='IPython cluster startup. This starts a controller and\
472 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
653 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
473 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
654 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
474 )
655 )
475 subparsers = parser.add_subparsers(
656 subparsers = parser.add_subparsers(
476 help='available cluster types. For help, do "ipcluster TYPE --help"')
657 help='available cluster types. For help, do "ipcluster TYPE --help"')
477
658
478 parser_local = subparsers.add_parser(
659 parser_local = subparsers.add_parser(
479 'local',
660 'local',
480 help='run a local cluster',
661 help='run a local cluster',
481 parents=[base_parser]
662 parents=[base_parser]
482 )
663 )
483 parser_local.set_defaults(func=main_local)
664 parser_local.set_defaults(func=main_local)
484
665
485 parser_mpirun = subparsers.add_parser(
666 parser_mpirun = subparsers.add_parser(
486 'mpirun',
667 'mpirun',
487 help='run a cluster using mpirun',
668 help='run a cluster using mpirun',
488 parents=[base_parser]
669 parents=[base_parser]
489 )
670 )
490 parser_mpirun.add_argument(
671 parser_mpirun.add_argument(
491 "--mpi",
672 "--mpi",
492 type=str,
673 type=str,
493 dest="mpi", # Don't put a default here to allow no MPI support
674 dest="mpi", # Don't put a default here to allow no MPI support
494 help="how to call MPI_Init (default=mpi4py)"
675 help="how to call MPI_Init (default=mpi4py)"
495 )
676 )
496 parser_mpirun.set_defaults(func=main_mpirun)
677 parser_mpirun.set_defaults(func=main_mpirun)
497
678
498 parser_pbs = subparsers.add_parser(
679 parser_pbs = subparsers.add_parser(
499 'pbs',
680 'pbs',
500 help='run a pbs cluster',
681 help='run a pbs cluster',
501 parents=[base_parser]
682 parents=[base_parser]
502 )
683 )
503 parser_pbs.add_argument(
684 parser_pbs.add_argument(
504 '--pbs-script',
685 '--pbs-script',
505 type=str,
686 type=str,
506 dest='pbsscript',
687 dest='pbsscript',
507 help='PBS script template',
688 help='PBS script template',
508 default='pbs.template'
689 default='pbs.template'
509 )
690 )
510 parser_pbs.set_defaults(func=main_pbs)
691 parser_pbs.set_defaults(func=main_pbs)
692
693 parser_ssh = subparsers.add_parser(
694 'ssh',
695 help='run a cluster using ssh, should have ssh-keys setup',
696 parents=[base_parser]
697 )
698 parser_ssh.add_argument(
699 '--clusterfile',
700 type=str,
701 dest='clusterfile',
702 help='python file describing the cluster',
703 default='clusterfile.py',
704 )
705 parser_ssh.add_argument(
706 '--sshx',
707 type=str,
708 dest='sshx',
709 help='sshx launcher helper'
710 )
711 parser_ssh.set_defaults(func=main_ssh)
712
511 args = parser.parse_args()
713 args = parser.parse_args()
512 return args
714 return args
513
715
514 def main():
716 def main():
515 args = get_args()
717 args = get_args()
516 reactor.callWhenRunning(args.func, args)
718 reactor.callWhenRunning(args.func, args)
517 log.startLogging(sys.stdout)
719 log.startLogging(sys.stdout)
518 reactor.run()
720 reactor.run()
519
721
520 if __name__ == '__main__':
722 if __name__ == '__main__':
521 main()
723 main()
@@ -1,393 +1,398 b''
1 .. _changes:
1 .. _changes:
2
2
3 ==========
3 ==========
4 What's new
4 What's new
5 ==========
5 ==========
6
6
7 .. contents::
7 .. contents::
8 ..
8 ..
9 1 Release 0.9.1
9 1 Release 0.9.1
10 2 Release 0.9
10 2 Release 0.9
11 2.1 New features
11 2.1 New features
12 2.2 Bug fixes
12 2.2 Bug fixes
13 2.3 Backwards incompatible changes
13 2.3 Backwards incompatible changes
14 2.4 Changes merged in from IPython1
14 2.4 Changes merged in from IPython1
15 2.4.1 New features
15 2.4.1 New features
16 2.4.2 Bug fixes
16 2.4.2 Bug fixes
17 2.4.3 Backwards incompatible changes
17 2.4.3 Backwards incompatible changes
18 3 Release 0.8.4
18 3 Release 0.8.4
19 4 Release 0.8.3
19 4 Release 0.8.3
20 5 Release 0.8.2
20 5 Release 0.8.2
21 6 Older releases
21 6 Older releases
22 ..
22 ..
23
23
24 Release dev
24 Release dev
25 ===========
25 ===========
26
26
27 New features
27 New features
28 ------------
28 ------------
29
29
30 * The new ipcluster now has a fully working ssh mode that should work on
31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32
30 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
31 to Matt Foster for this patch.
34 to Matt Foster for this patch.
32
35
33 * Fully refactored :command:`ipcluster` command line program for starting
36 * Fully refactored :command:`ipcluster` command line program for starting
34 IPython clusters. This new version is a complete rewrite and 1) is fully
37 IPython clusters. This new version is a complete rewrite and 1) is fully
35 cross platform (we now use Twisted's process management), 2) has much
38 cross platform (we now use Twisted's process management), 2) has much
36 improved performance, 3) uses subcommands for different types of clusters,
39 improved performance, 3) uses subcommands for different types of clusters,
37 4) uses argparse for parsing command line options, 5) has better support
40 4) uses argparse for parsing command line options, 5) has better support
38 for starting clusters using :command:`mpirun`, 6) has experimental support
41 for starting clusters using :command:`mpirun`, 6) has experimental support
39 for starting engines using PBS. However, this new version of ipcluster
42 for starting engines using PBS. However, this new version of ipcluster
40 should be considered a technology preview. We plan on changing the API
43 should be considered a technology preview. We plan on changing the API
41 in significant ways before it is final.
44 in significant ways before it is final.
42
45
43 * The :mod:`argparse` module has been added to :mod:`IPython.external`.
46 * The :mod:`argparse` module has been added to :mod:`IPython.external`.
44
47
45 * Fully description of the security model added to the docs.
48 * Fully description of the security model added to the docs.
46
49
47 * cd completer: show bookmarks if no other completions are available.
50 * cd completer: show bookmarks if no other completions are available.
48
51
49 * sh profile: easy way to give 'title' to prompt: assign to variable
52 * sh profile: easy way to give 'title' to prompt: assign to variable
50 '_prompt_title'. It looks like this::
53 '_prompt_title'. It looks like this::
51
54
52 [~]|1> _prompt_title = 'sudo!'
55 [~]|1> _prompt_title = 'sudo!'
53 sudo![~]|2>
56 sudo![~]|2>
54
57
55 * %edit: If you do '%edit pasted_block', pasted_block
58 * %edit: If you do '%edit pasted_block', pasted_block
56 variable gets updated with new data (so repeated
59 variable gets updated with new data (so repeated
57 editing makes sense)
60 editing makes sense)
58
61
59 Bug fixes
62 Bug fixes
60 ---------
63 ---------
61
64
65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66
62 * The ipengine and ipcontroller scripts now handle missing furl files
67 * The ipengine and ipcontroller scripts now handle missing furl files
63 more gracefully by giving better error messages.
68 more gracefully by giving better error messages.
64
69
65 * %rehashx: Aliases no longer contain dots. python3.0 binary
70 * %rehashx: Aliases no longer contain dots. python3.0 binary
66 will create alias python30. Fixes:
71 will create alias python30. Fixes:
67 #259716 "commands with dots in them don't work"
72 #259716 "commands with dots in them don't work"
68
73
69 * %cpaste: %cpaste -r repeats the last pasted block.
74 * %cpaste: %cpaste -r repeats the last pasted block.
70 The block is assigned to pasted_block even if code
75 The block is assigned to pasted_block even if code
71 raises exception.
76 raises exception.
72
77
73 Backwards incompatible changes
78 Backwards incompatible changes
74 ------------------------------
79 ------------------------------
75
80
76 * The controller now has a ``-r`` flag that needs to be used if you want to
81 * The controller now has a ``-r`` flag that needs to be used if you want to
77 reuse existing furl files. Otherwise they are deleted (the default).
82 reuse existing furl files. Otherwise they are deleted (the default).
78
83
79 * Remove ipy_leo.py. "easy_install ipython-extension" to get it.
84 * Remove ipy_leo.py. "easy_install ipython-extension" to get it.
80 (done to decouple it from ipython release cycle)
85 (done to decouple it from ipython release cycle)
81
86
82
87
83
88
84 Release 0.9.1
89 Release 0.9.1
85 =============
90 =============
86
91
87 This release was quickly made to restore compatibility with Python 2.4, which
92 This release was quickly made to restore compatibility with Python 2.4, which
88 version 0.9 accidentally broke. No new features were introduced, other than
93 version 0.9 accidentally broke. No new features were introduced, other than
89 some additional testing support for internal use.
94 some additional testing support for internal use.
90
95
91
96
92 Release 0.9
97 Release 0.9
93 ===========
98 ===========
94
99
95 New features
100 New features
96 ------------
101 ------------
97
102
98 * All furl files and security certificates are now put in a read-only
103 * All furl files and security certificates are now put in a read-only
99 directory named ~./ipython/security.
104 directory named ~./ipython/security.
100
105
101 * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that
106 * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that
102 determines the user's IPython directory in a robust manner.
107 determines the user's IPython directory in a robust manner.
103
108
104 * Laurent's WX application has been given a top-level script called
109 * Laurent's WX application has been given a top-level script called
105 ipython-wx, and it has received numerous fixes. We expect this code to be
110 ipython-wx, and it has received numerous fixes. We expect this code to be
106 architecturally better integrated with Gael's WX 'ipython widget' over the
111 architecturally better integrated with Gael's WX 'ipython widget' over the
107 next few releases.
112 next few releases.
108
113
109 * The Editor synchronization work by Vivian De Smedt has been merged in. This
114 * The Editor synchronization work by Vivian De Smedt has been merged in. This
110 code adds a number of new editor hooks to synchronize with editors under
115 code adds a number of new editor hooks to synchronize with editors under
111 Windows.
116 Windows.
112
117
113 * A new, still experimental but highly functional, WX shell by Gael Varoquaux.
118 * A new, still experimental but highly functional, WX shell by Gael Varoquaux.
114 This work was sponsored by Enthought, and while it's still very new, it is
119 This work was sponsored by Enthought, and while it's still very new, it is
115 based on a more cleanly organized arhictecture of the various IPython
120 based on a more cleanly organized arhictecture of the various IPython
116 components. We will continue to develop this over the next few releases as a
121 components. We will continue to develop this over the next few releases as a
117 model for GUI components that use IPython.
122 model for GUI components that use IPython.
118
123
119 * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework),
124 * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework),
120 authored by Barry Wark. Currently the WX and the Cocoa ones have slightly
125 authored by Barry Wark. Currently the WX and the Cocoa ones have slightly
121 different internal organizations, but the whole team is working on finding
126 different internal organizations, but the whole team is working on finding
122 what the right abstraction points are for a unified codebase.
127 what the right abstraction points are for a unified codebase.
123
128
124 * As part of the frontend work, Barry Wark also implemented an experimental
129 * As part of the frontend work, Barry Wark also implemented an experimental
125 event notification system that various ipython components can use. In the
130 event notification system that various ipython components can use. In the
126 next release the implications and use patterns of this system regarding the
131 next release the implications and use patterns of this system regarding the
127 various GUI options will be worked out.
132 various GUI options will be worked out.
128
133
129 * IPython finally has a full test system, that can test docstrings with
134 * IPython finally has a full test system, that can test docstrings with
130 IPython-specific functionality. There are still a few pieces missing for it
135 IPython-specific functionality. There are still a few pieces missing for it
131 to be widely accessible to all users (so they can run the test suite at any
136 to be widely accessible to all users (so they can run the test suite at any
132 time and report problems), but it now works for the developers. We are
137 time and report problems), but it now works for the developers. We are
133 working hard on continuing to improve it, as this was probably IPython's
138 working hard on continuing to improve it, as this was probably IPython's
134 major Achilles heel (the lack of proper test coverage made it effectively
139 major Achilles heel (the lack of proper test coverage made it effectively
135 impossible to do large-scale refactoring). The full test suite can now
140 impossible to do large-scale refactoring). The full test suite can now
136 be run using the :command:`iptest` command line program.
141 be run using the :command:`iptest` command line program.
137
142
138 * The notion of a task has been completely reworked. An `ITask` interface has
143 * The notion of a task has been completely reworked. An `ITask` interface has
139 been created. This interface defines the methods that tasks need to
144 been created. This interface defines the methods that tasks need to
140 implement. These methods are now responsible for things like submitting
145 implement. These methods are now responsible for things like submitting
141 tasks and processing results. There are two basic task types:
146 tasks and processing results. There are two basic task types:
142 :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but
147 :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but
143 renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on
148 renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on
144 a function.
149 a function.
145
150
146 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
151 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
147 standardize the idea of a `map` method. This interface has a single `map`
152 standardize the idea of a `map` method. This interface has a single `map`
148 method that has the same syntax as the built-in `map`. We have also defined
153 method that has the same syntax as the built-in `map`. We have also defined
149 a `mapper` factory interface that creates objects that implement
154 a `mapper` factory interface that creates objects that implement
150 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the
155 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the
151 multiengine and task controller now have mapping capabilties.
156 multiengine and task controller now have mapping capabilties.
152
157
153 * The parallel function capabilities have been reworks. The major changes are
158 * The parallel function capabilities have been reworks. The major changes are
154 that i) there is now an `@parallel` magic that creates parallel functions,
159 that i) there is now an `@parallel` magic that creates parallel functions,
155 ii) the syntax for mulitple variable follows that of `map`, iii) both the
160 ii) the syntax for mulitple variable follows that of `map`, iii) both the
156 multiengine and task controller now have a parallel function implementation.
161 multiengine and task controller now have a parallel function implementation.
157
162
158 * All of the parallel computing capabilities from `ipython1-dev` have been
163 * All of the parallel computing capabilities from `ipython1-dev` have been
159 merged into IPython proper. This resulted in the following new subpackages:
164 merged into IPython proper. This resulted in the following new subpackages:
160 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
165 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
161 :mod:`IPython.tools` and :mod:`IPython.testing`.
166 :mod:`IPython.tools` and :mod:`IPython.testing`.
162
167
163 * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and
168 * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and
164 friends have been completely refactored. Now we are checking for
169 friends have been completely refactored. Now we are checking for
165 dependencies using the approach that matplotlib uses.
170 dependencies using the approach that matplotlib uses.
166
171
167 * The documentation has been completely reorganized to accept the
172 * The documentation has been completely reorganized to accept the
168 documentation from `ipython1-dev`.
173 documentation from `ipython1-dev`.
169
174
170 * We have switched to using Foolscap for all of our network protocols in
175 * We have switched to using Foolscap for all of our network protocols in
171 :mod:`IPython.kernel`. This gives us secure connections that are both
176 :mod:`IPython.kernel`. This gives us secure connections that are both
172 encrypted and authenticated.
177 encrypted and authenticated.
173
178
174 * We have a brand new `COPYING.txt` files that describes the IPython license
179 * We have a brand new `COPYING.txt` files that describes the IPython license
175 and copyright. The biggest change is that we are putting "The IPython
180 and copyright. The biggest change is that we are putting "The IPython
176 Development Team" as the copyright holder. We give more details about
181 Development Team" as the copyright holder. We give more details about
177 exactly what this means in this file. All developer should read this and use
182 exactly what this means in this file. All developer should read this and use
178 the new banner in all IPython source code files.
183 the new banner in all IPython source code files.
179
184
180 * sh profile: ./foo runs foo as system command, no need to do !./foo anymore
185 * sh profile: ./foo runs foo as system command, no need to do !./foo anymore
181
186
182 * String lists now support ``sort(field, nums = True)`` method (to easily sort
187 * String lists now support ``sort(field, nums = True)`` method (to easily sort
183 system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``.
188 system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``.
184
189
185 * '%cpaste foo' now assigns the pasted block as string list, instead of string
190 * '%cpaste foo' now assigns the pasted block as string list, instead of string
186
191
187 * The ipcluster script now run by default with no security. This is done
192 * The ipcluster script now run by default with no security. This is done
188 because the main usage of the script is for starting things on localhost.
193 because the main usage of the script is for starting things on localhost.
189 Eventually when ipcluster is able to start things on other hosts, we will put
194 Eventually when ipcluster is able to start things on other hosts, we will put
190 security back.
195 security back.
191
196
192 * 'cd --foo' searches directory history for string foo, and jumps to that dir.
197 * 'cd --foo' searches directory history for string foo, and jumps to that dir.
193 Last part of dir name is checked first. If no matches for that are found,
198 Last part of dir name is checked first. If no matches for that are found,
194 look at the whole path.
199 look at the whole path.
195
200
196
201
197 Bug fixes
202 Bug fixes
198 ---------
203 ---------
199
204
200 * The Windows installer has been fixed. Now all IPython scripts have ``.bat``
205 * The Windows installer has been fixed. Now all IPython scripts have ``.bat``
201 versions created. Also, the Start Menu shortcuts have been updated.
206 versions created. Also, the Start Menu shortcuts have been updated.
202
207
203 * The colors escapes in the multiengine client are now turned off on win32 as
208 * The colors escapes in the multiengine client are now turned off on win32 as
204 they don't print correctly.
209 they don't print correctly.
205
210
206 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing
211 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing
207 mpi_import_statement incorrectly, which was leading the engine to crash when
212 mpi_import_statement incorrectly, which was leading the engine to crash when
208 mpi was enabled.
213 mpi was enabled.
209
214
210 * A few subpackages had missing ``__init__.py`` files.
215 * A few subpackages had missing ``__init__.py`` files.
211
216
212 * The documentation is only created if Sphinx is found. Previously, the
217 * The documentation is only created if Sphinx is found. Previously, the
213 ``setup.py`` script would fail if it was missing.
218 ``setup.py`` script would fail if it was missing.
214
219
215 * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as
220 * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as
216 it caused problems on certain platforms.
221 it caused problems on certain platforms.
217
222
218
223
219 Backwards incompatible changes
224 Backwards incompatible changes
220 ------------------------------
225 ------------------------------
221
226
222 * The ``clusterfile`` options of the :command:`ipcluster` command has been
227 * The ``clusterfile`` options of the :command:`ipcluster` command has been
223 removed as it was not working and it will be replaced soon by something much
228 removed as it was not working and it will be replaced soon by something much
224 more robust.
229 more robust.
225
230
226 * The :mod:`IPython.kernel` configuration now properly find the user's
231 * The :mod:`IPython.kernel` configuration now properly find the user's
227 IPython directory.
232 IPython directory.
228
233
229 * In ipapi, the :func:`make_user_ns` function has been replaced with
234 * In ipapi, the :func:`make_user_ns` function has been replaced with
230 :func:`make_user_namespaces`, to support dict subclasses in namespace
235 :func:`make_user_namespaces`, to support dict subclasses in namespace
231 creation.
236 creation.
232
237
233 * :class:`IPython.kernel.client.Task` has been renamed
238 * :class:`IPython.kernel.client.Task` has been renamed
234 :class:`IPython.kernel.client.StringTask` to make way for new task types.
239 :class:`IPython.kernel.client.StringTask` to make way for new task types.
235
240
236 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
241 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
237 and `map`.
242 and `map`.
238
243
239 * Renamed the values that the rename `dist` keyword argument can have from
244 * Renamed the values that the rename `dist` keyword argument can have from
240 `'basic'` to `'b'`.
245 `'basic'` to `'b'`.
241
246
242 * IPython has a larger set of dependencies if you want all of its capabilities.
247 * IPython has a larger set of dependencies if you want all of its capabilities.
243 See the `setup.py` script for details.
248 See the `setup.py` script for details.
244
249
245 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
250 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
246 :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple.
251 :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple.
247 Instead they take the filename of a file that contains the FURL for that
252 Instead they take the filename of a file that contains the FURL for that
248 client. If the FURL file is in your IPYTHONDIR, it will be found automatically
253 client. If the FURL file is in your IPYTHONDIR, it will be found automatically
249 and the constructor can be left empty.
254 and the constructor can be left empty.
250
255
251 * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created
256 * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created
252 using the factory functions :func:`get_multiengine_client` and
257 using the factory functions :func:`get_multiengine_client` and
253 :func:`get_task_client`. These return a `Deferred` to the actual client.
258 :func:`get_task_client`. These return a `Deferred` to the actual client.
254
259
255 * The command line options to `ipcontroller` and `ipengine` have changed to
260 * The command line options to `ipcontroller` and `ipengine` have changed to
256 reflect the new Foolscap network protocol and the FURL files. Please see the
261 reflect the new Foolscap network protocol and the FURL files. Please see the
257 help for these scripts for details.
262 help for these scripts for details.
258
263
259 * The configuration files for the kernel have changed because of the Foolscap
264 * The configuration files for the kernel have changed because of the Foolscap
260 stuff. If you were using custom config files before, you should delete them
265 stuff. If you were using custom config files before, you should delete them
261 and regenerate new ones.
266 and regenerate new ones.
262
267
263 Changes merged in from IPython1
268 Changes merged in from IPython1
264 -------------------------------
269 -------------------------------
265
270
266 New features
271 New features
267 ............
272 ............
268
273
269 * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and
274 * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and
270 zope.interface are now easy installable, we can declare them as dependencies
275 zope.interface are now easy installable, we can declare them as dependencies
271 in our setupegg.py script.
276 in our setupegg.py script.
272
277
273 * IPython is now compatible with Twisted 2.5.0 and 8.x.
278 * IPython is now compatible with Twisted 2.5.0 and 8.x.
274
279
275 * Added a new example of how to use :mod:`ipython1.kernel.asynclient`.
280 * Added a new example of how to use :mod:`ipython1.kernel.asynclient`.
276
281
277 * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not
282 * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not
278 been merged into IPython and is still in `ipython1-dev`.
283 been merged into IPython and is still in `ipython1-dev`.
279
284
280 * The ``TaskController`` now has methods for getting the queue status.
285 * The ``TaskController`` now has methods for getting the queue status.
281
286
282 * The ``TaskResult`` objects not have information about how long the task
287 * The ``TaskResult`` objects not have information about how long the task
283 took to run.
288 took to run.
284
289
285 * We are attaching additional attributes to exceptions ``(_ipython_*)`` that
290 * We are attaching additional attributes to exceptions ``(_ipython_*)`` that
286 we use to carry additional info around.
291 we use to carry additional info around.
287
292
288 * New top-level module :mod:`asyncclient` that has asynchronous versions (that
293 * New top-level module :mod:`asyncclient` that has asynchronous versions (that
289 return deferreds) of the client classes. This is designed to users who want
294 return deferreds) of the client classes. This is designed to users who want
290 to run their own Twisted reactor.
295 to run their own Twisted reactor.
291
296
292 * All the clients in :mod:`client` are now based on Twisted. This is done by
297 * All the clients in :mod:`client` are now based on Twisted. This is done by
293 running the Twisted reactor in a separate thread and using the
298 running the Twisted reactor in a separate thread and using the
294 :func:`blockingCallFromThread` function that is in recent versions of Twisted.
299 :func:`blockingCallFromThread` function that is in recent versions of Twisted.
295
300
296 * Functions can now be pushed/pulled to/from engines using
301 * Functions can now be pushed/pulled to/from engines using
297 :meth:`MultiEngineClient.push_function` and
302 :meth:`MultiEngineClient.push_function` and
298 :meth:`MultiEngineClient.pull_function`.
303 :meth:`MultiEngineClient.pull_function`.
299
304
300 * Gather/scatter are now implemented in the client to reduce the work load
305 * Gather/scatter are now implemented in the client to reduce the work load
301 of the controller and improve performance.
306 of the controller and improve performance.
302
307
303 * Complete rewrite of the IPython docuementation. All of the documentation
308 * Complete rewrite of the IPython docuementation. All of the documentation
304 from the IPython website has been moved into docs/source as restructured
309 from the IPython website has been moved into docs/source as restructured
305 text documents. PDF and HTML documentation are being generated using
310 text documents. PDF and HTML documentation are being generated using
306 Sphinx.
311 Sphinx.
307
312
308 * New developer oriented documentation: development guidelines and roadmap.
313 * New developer oriented documentation: development guidelines and roadmap.
309
314
310 * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt``
315 * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt``
311 file that is organized by release and is meant to provide something more
316 file that is organized by release and is meant to provide something more
312 relevant for users.
317 relevant for users.
313
318
314 Bug fixes
319 Bug fixes
315 .........
320 .........
316
321
317 * Created a proper ``MANIFEST.in`` file to create source distributions.
322 * Created a proper ``MANIFEST.in`` file to create source distributions.
318
323
319 * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine
324 * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine
320 actions were being collected with a :class:`DeferredList` with
325 actions were being collected with a :class:`DeferredList` with
321 ``fireononeerrback=1``. This meant that methods were returning
326 ``fireononeerrback=1``. This meant that methods were returning
322 before all engines had given their results. This was causing extremely odd
327 before all engines had given their results. This was causing extremely odd
323 bugs in certain cases. To fix this problem, we have 1) set
328 bugs in certain cases. To fix this problem, we have 1) set
324 ``fireononeerrback=0`` to make sure all results (or exceptions) are in
329 ``fireononeerrback=0`` to make sure all results (or exceptions) are in
325 before returning and 2) introduced a :exc:`CompositeError` exception
330 before returning and 2) introduced a :exc:`CompositeError` exception
326 that wraps all of the engine exceptions. This is a huge change as it means
331 that wraps all of the engine exceptions. This is a huge change as it means
327 that users will have to catch :exc:`CompositeError` rather than the actual
332 that users will have to catch :exc:`CompositeError` rather than the actual
328 exception.
333 exception.
329
334
330 Backwards incompatible changes
335 Backwards incompatible changes
331 ..............................
336 ..............................
332
337
333 * All names have been renamed to conform to the lowercase_with_underscore
338 * All names have been renamed to conform to the lowercase_with_underscore
334 convention. This will require users to change references to all names like
339 convention. This will require users to change references to all names like
335 ``queueStatus`` to ``queue_status``.
340 ``queueStatus`` to ``queue_status``.
336
341
337 * Previously, methods like :meth:`MultiEngineClient.push` and
342 * Previously, methods like :meth:`MultiEngineClient.push` and
338 :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was
343 :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was
339 becoming a problem as we weren't able to introduce new keyword arguments into
344 becoming a problem as we weren't able to introduce new keyword arguments into
340 the API. Now these methods simple take a dict or sequence. This has also
345 the API. Now these methods simple take a dict or sequence. This has also
341 allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and
346 allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and
342 :meth:`pullAll`. These things are now handled with the ``targets`` keyword
347 :meth:`pullAll`. These things are now handled with the ``targets`` keyword
343 argument that defaults to ``'all'``.
348 argument that defaults to ``'all'``.
344
349
345 * The :attr:`MultiEngineClient.magicTargets` has been renamed to
350 * The :attr:`MultiEngineClient.magicTargets` has been renamed to
346 :attr:`MultiEngineClient.targets`.
351 :attr:`MultiEngineClient.targets`.
347
352
348 * All methods in the MultiEngine interface now accept the optional keyword
353 * All methods in the MultiEngine interface now accept the optional keyword
349 argument ``block``.
354 argument ``block``.
350
355
351 * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and
356 * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and
352 :class:`TaskController` to :class:`TaskClient`.
357 :class:`TaskController` to :class:`TaskClient`.
353
358
354 * Renamed the top-level module from :mod:`api` to :mod:`client`.
359 * Renamed the top-level module from :mod:`api` to :mod:`client`.
355
360
356 * Most methods in the multiengine interface now raise a :exc:`CompositeError`
361 * Most methods in the multiengine interface now raise a :exc:`CompositeError`
357 exception that wraps the user's exceptions, rather than just raising the raw
362 exception that wraps the user's exceptions, rather than just raising the raw
358 user's exception.
363 user's exception.
359
364
360 * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push``
365 * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push``
361 and ``pull``.
366 and ``pull``.
362
367
363
368
364 Release 0.8.4
369 Release 0.8.4
365 =============
370 =============
366
371
367 This was a quick release to fix an unfortunate bug that slipped into the 0.8.3
372 This was a quick release to fix an unfortunate bug that slipped into the 0.8.3
368 release. The ``--twisted`` option was disabled, as it turned out to be broken
373 release. The ``--twisted`` option was disabled, as it turned out to be broken
369 across several platforms.
374 across several platforms.
370
375
371
376
372 Release 0.8.3
377 Release 0.8.3
373 =============
378 =============
374
379
375 * pydb is now disabled by default (due to %run -d problems). You can enable
380 * pydb is now disabled by default (due to %run -d problems). You can enable
376 it by passing -pydb command line argument to IPython. Note that setting
381 it by passing -pydb command line argument to IPython. Note that setting
377 it in config file won't work.
382 it in config file won't work.
378
383
379
384
380 Release 0.8.2
385 Release 0.8.2
381 =============
386 =============
382
387
383 * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory
388 * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory
384 and jumps to /foo. The current behaviour is closer to the documented
389 and jumps to /foo. The current behaviour is closer to the documented
385 behaviour, and should not trip anyone.
390 behaviour, and should not trip anyone.
386
391
387
392
388 Older releases
393 Older releases
389 ==============
394 ==============
390
395
391 Changes in earlier releases of IPython are described in the older file
396 Changes in earlier releases of IPython are described in the older file
392 ``ChangeLog``. Please refer to this document for details.
397 ``ChangeLog``. Please refer to this document for details.
393
398
@@ -1,251 +1,324 b''
1 .. _parallel_process:
1 .. _parallel_process:
2
2
3 ===========================================
3 ===========================================
4 Starting the IPython controller and engines
4 Starting the IPython controller and engines
5 ===========================================
5 ===========================================
6
6
7 To use IPython for parallel computing, you need to start one instance of
7 To use IPython for parallel computing, you need to start one instance of
8 the controller and one or more instances of the engine. The controller
8 the controller and one or more instances of the engine. The controller
9 and each engine can run on different machines or on the same machine.
9 and each engine can run on different machines or on the same machine.
10 Because of this, there are many different possibilities.
10 Because of this, there are many different possibilities.
11
11
12 Broadly speaking, there are two ways of going about starting a controller and engines:
12 Broadly speaking, there are two ways of going about starting a controller and engines:
13
13
14 * In an automated manner using the :command:`ipcluster` command.
14 * In an automated manner using the :command:`ipcluster` command.
15 * In a more manual way using the :command:`ipcontroller` and
15 * In a more manual way using the :command:`ipcontroller` and
16 :command:`ipengine` commands.
16 :command:`ipengine` commands.
17
17
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19
19
20 General considerations
20 General considerations
21 ======================
21 ======================
22
22
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24
24
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26
26
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 ``host0``.
28 ``host0``.
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 controller from ``host0`` to hosts ``host1``-``hostn``.
30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 3. Start the engines on hosts ``host1``-``hostn`` by running
31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 :command:`ipengine`. This command has to be told where the FURL file
32 :command:`ipengine`. This command has to be told where the FURL file
33 (:file:`ipcontroller-engine.furl`) is located.
33 (:file:`ipcontroller-engine.furl`) is located.
34
34
35 At this point, the controller and engines will be connected. By default, the
35 At this point, the controller and engines will be connected. By default, the
36 FURL files created by the controller are put into the
36 FURL files created by the controller are put into the
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 the controller, step 2 can be skipped as the engines will automatically look
38 the controller, step 2 can be skipped as the engines will automatically look
39 at that location.
39 at that location.
40
40
41 The final step required required to actually use the running controller from a
41 The final step required required to actually use the running controller from a
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45
45
46 Using :command:`ipcluster`
46 Using :command:`ipcluster`
47 ==========================
47 ==========================
48
48
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50
50
51 1. When the controller and engines are all run on localhost. This is useful
51 1. When the controller and engines are all run on localhost. This is useful
52 for testing or running on a multicore computer.
52 for testing or running on a multicore computer.
53 2. When engines are started using the :command:`mpirun` command that comes
53 2. When engines are started using the :command:`mpirun` command that comes
54 with most MPI [MPI]_ implementations
54 with most MPI [MPI]_ implementations
55 3. When engines are started using the PBS [PBS]_ batch system.
55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
57 remote nodes using :command:`ssh`.
56
58
57 .. note::
59 .. note::
58
60
59 It is also possible for advanced users to add support to
61 It is also possible for advanced users to add support to
60 :command:`ipcluster` for starting controllers and engines using other
62 :command:`ipcluster` for starting controllers and engines using other
61 methods (like Sun's Grid Engine for example).
63 methods (like Sun's Grid Engine for example).
62
64
63 .. note::
65 .. note::
64
66
65 Currently :command:`ipcluster` requires that the
67 Currently :command:`ipcluster` requires that the
66 :file:`~/.ipython/security` directory live on a shared filesystem that is
68 :file:`~/.ipython/security` directory live on a shared filesystem that is
67 seen by both the controller and engines. If you don't have a shared file
69 seen by both the controller and engines. If you don't have a shared file
68 system you will need to use :command:`ipcontroller` and
70 system you will need to use :command:`ipcontroller` and
69 :command:`ipengine` directly.
71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 using the :command:`ssh` method to start the cluster.
70
73
71 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
72 and :command:`ipengine` to perform the steps described above.
75 and :command:`ipengine` to perform the steps described above.
73
76
74 Using :command:`ipcluster` in local mode
77 Using :command:`ipcluster` in local mode
75 ----------------------------------------
78 ----------------------------------------
76
79
77 To start one controller and 4 engines on localhost, just do::
80 To start one controller and 4 engines on localhost, just do::
78
81
79 $ ipcluster local -n 4
82 $ ipcluster local -n 4
80
83
81 To see other command line options for the local mode, do::
84 To see other command line options for the local mode, do::
82
85
83 $ ipcluster local -h
86 $ ipcluster local -h
84
87
85 Using :command:`ipcluster` in mpirun mode
88 Using :command:`ipcluster` in mpirun mode
86 -----------------------------------------
89 -----------------------------------------
87
90
88 The mpirun mode is useful if you:
91 The mpirun mode is useful if you:
89
92
90 1. Have MPI installed.
93 1. Have MPI installed.
91 2. Your systems are configured to use the :command:`mpirun` command to start
94 2. Your systems are configured to use the :command:`mpirun` command to start
92 processes.
95 processes.
93
96
94 If these are satisfied, you can start an IPython cluster using::
97 If these are satisfied, you can start an IPython cluster using::
95
98
96 $ ipcluster mpirun -n 4
99 $ ipcluster mpirun -n 4
97
100
98 This does the following:
101 This does the following:
99
102
100 1. Starts the IPython controller on current host.
103 1. Starts the IPython controller on current host.
101 2. Uses :command:`mpirun` to start 4 engines.
104 2. Uses :command:`mpirun` to start 4 engines.
102
105
103 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
106 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
104
107
105 $ ipcluster mpirun -n 4 --mpi=mpi4py
108 $ ipcluster mpirun -n 4 --mpi=mpi4py
106
109
107 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
110 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
108
111
109 Additional command line options for this mode can be found by doing::
112 Additional command line options for this mode can be found by doing::
110
113
111 $ ipcluster mpirun -h
114 $ ipcluster mpirun -h
112
115
113 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
116 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
114
117
115
118
116 Using :command:`ipcluster` in PBS mode
119 Using :command:`ipcluster` in PBS mode
117 --------------------------------------
120 --------------------------------------
118
121
119 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
122 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
120
123
121 .. sourcecode:: bash
124 .. sourcecode:: bash
122
125
123 #PBS -N ipython
126 #PBS -N ipython
124 #PBS -j oe
127 #PBS -j oe
125 #PBS -l walltime=00:10:00
128 #PBS -l walltime=00:10:00
126 #PBS -l nodes=${n/4}:ppn=4
129 #PBS -l nodes=${n/4}:ppn=4
127 #PBS -q parallel
130 #PBS -q parallel
128
131
129 cd $$PBS_O_WORKDIR
132 cd $$PBS_O_WORKDIR
130 export PATH=$$HOME/usr/local/bin
133 export PATH=$$HOME/usr/local/bin
131 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
134 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
132 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
135 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
133
136
134 There are a few important points about this template:
137 There are a few important points about this template:
135
138
136 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
139 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
137 template engine.
140 template engine.
138
141
139 2. Instead of putting in the actual number of engines, use the notation
142 2. Instead of putting in the actual number of engines, use the notation
140 ``${n}`` to indicate the number of engines to be started. You can also uses
143 ``${n}`` to indicate the number of engines to be started. You can also uses
141 expressions like ``${n/4}`` in the template to indicate the number of
144 expressions like ``${n/4}`` in the template to indicate the number of
142 nodes.
145 nodes.
143
146
144 3. Because ``$`` is a special character used by the template engine, you must
147 3. Because ``$`` is a special character used by the template engine, you must
145 escape any ``$`` by using ``$$``. This is important when referring to
148 escape any ``$`` by using ``$$``. This is important when referring to
146 environment variables in the template.
149 environment variables in the template.
147
150
148 4. Any options to :command:`ipengine` should be given in the batch script
151 4. Any options to :command:`ipengine` should be given in the batch script
149 template.
152 template.
150
153
151 5. Depending on the configuration of you system, you may have to set
154 5. Depending on the configuration of you system, you may have to set
152 environment variables in the script template.
155 environment variables in the script template.
153
156
154 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
157 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
155
158
156 $ ipcluster pbs -n 128 --pbs-script=pbs.template
159 $ ipcluster pbs -n 128 --pbs-script=pbs.template
157
160
158 Additional command line options for this mode can be found by doing::
161 Additional command line options for this mode can be found by doing::
159
162
160 $ ipcluster pbs -h
163 $ ipcluster pbs -h
161
164
165 Using :command:`ipcluster` in SSH mode
166 --------------------------------------
167
168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 nodes and the :command:`ipcontroller` on localhost.
170
171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172
173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174
175 .. sourcecode:: python
176
177 send_furl = True
178 engines = { 'host1.example.com' : 2,
179 'host2.example.com' : 5,
180 'host3.example.com' : 1,
181 'host4.example.com' : 8 }
182
183 Since this is a regular python file usual python syntax applies. Things to note:
184
185 * The `engines` dict, where the keys is the host we want to run engines on and
186 the value is the number of engines to run on that host.
187 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 furl needed for :command:`ipengine` to each host.
189
190 The ``--clusterfile`` command line option lets you specify the file to use for
191 the cluster definition. Once you have your cluster file and you can
192 :command:`ssh` into the remote hosts with out an password you are ready to
193 start your cluster like so:
194
195 .. sourcecode:: bash
196
197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198
199
200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201
202 * sshx.sh
203 * engine_killer.sh
204
205 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
206
207 The default sshx.sh is the following:
208
209 .. sourcecode:: bash
210
211 #!/bin/sh
212 "$@" &> /dev/null &
213 echo $!
214
215 If you want to use a custom sshx.sh script you need to use the ``--sshx``
216 option and specify the file to use. Using a custom sshx.sh file could be
217 helpful when you need to setup the environment on the remote host before
218 executing :command:`ipengine`.
219
220 For a detailed options list:
221
222 .. sourcecode:: bash
223
224 $ ipcluster ssh -h
225
226 Current limitations of the SSH mode of :command:`ipcluster` are:
227
228 * Untested on Windows. Would require a working :command:`ssh` on Windows.
229 Also, we are using shell scripts to setup and execute commands on remote
230 hosts.
231 * :command:`ipcontroller` is started on localhost, with no option to start it
232 on a remote node.
233
162 Using the :command:`ipcontroller` and :command:`ipengine` commands
234 Using the :command:`ipcontroller` and :command:`ipengine` commands
163 ==================================================================
235 ==================================================================
164
236
165 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
237 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
166
238
167 Starting the controller and engine on your local machine
239 Starting the controller and engine on your local machine
168 --------------------------------------------------------
240 --------------------------------------------------------
169
241
170 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
242 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
171 local machine, do the following.
243 local machine, do the following.
172
244
173 First start the controller::
245 First start the controller::
174
246
175 $ ipcontroller
247 $ ipcontroller
176
248
177 Next, start however many instances of the engine you want using (repeatedly) the command::
249 Next, start however many instances of the engine you want using (repeatedly) the command::
178
250
179 $ ipengine
251 $ ipengine
180
252
181 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
253 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
182
254
183 .. warning::
255 .. warning::
184
256
185 The order of the above operations is very important. You *must*
257 The order of the above operations is very important. You *must*
186 start the controller before the engines, since the engines connect
258 start the controller before the engines, since the engines connect
187 to the controller as they get started.
259 to the controller as they get started.
188
260
189 .. note::
261 .. note::
190
262
191 On some platforms (OS X), to put the controller and engine into the
263 On some platforms (OS X), to put the controller and engine into the
192 background you may need to give these commands in the form ``(ipcontroller
264 background you may need to give these commands in the form ``(ipcontroller
193 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
265 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
194 properly.
266 properly.
195
267
196 Starting the controller and engines on different hosts
268 Starting the controller and engines on different hosts
197 ------------------------------------------------------
269 ------------------------------------------------------
198
270
199 When the controller and engines are running on different hosts, things are
271 When the controller and engines are running on different hosts, things are
200 slightly more complicated, but the underlying ideas are the same:
272 slightly more complicated, but the underlying ideas are the same:
201
273
202 1. Start the controller on a host using :command:`ipcontroller`.
274 1. Start the controller on a host using :command:`ipcontroller`.
203 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
275 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
204 3. Use :command:`ipengine` on the engine's hosts to start the engines.
276 3. Use :command:`ipengine` on the engine's hosts to start the engines.
205
277
206 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
278 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
207
279
208 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
280 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
209 directory on the engine's host, where it will be found automatically.
281 directory on the engine's host, where it will be found automatically.
210 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
282 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
211 flag.
283 flag.
212
284
213 The ``--furl-file`` flag works like this::
285 The ``--furl-file`` flag works like this::
214
286
215 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
287 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
216
288
217 .. note::
289 .. note::
218
290
219 If the controller's and engine's hosts all have a shared file system
291 If the controller's and engine's hosts all have a shared file system
220 (:file:`~./ipython/security` is the same on all of them), then things
292 (:file:`~./ipython/security` is the same on all of them), then things
221 will just work!
293 will just work!
222
294
223 Make FURL files persistent
295 Make FURL files persistent
224 ---------------------------
296 ---------------------------
225
297
226 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
298 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
227
299
228 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
300 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
229
301
230 $ ipcontroller -r --client-port=10101 --engine-port=10102
302 $ ipcontroller -r --client-port=10101 --engine-port=10102
231
303
232 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
304 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
233
305
234 .. note::
306 .. note::
235
307
236 You may ask the question: what ports does the controller listen on if you
308 You may ask the question: what ports does the controller listen on if you
237 don't tell is to use specific ones? The default is to use high random port
309 don't tell is to use specific ones? The default is to use high random port
238 numbers. We do this for two reasons: i) to increase security through
310 numbers. We do this for two reasons: i) to increase security through
239 obscurity and ii) to multiple controllers on a given host to start and
311 obscurity and ii) to multiple controllers on a given host to start and
240 automatically use different ports.
312 automatically use different ports.
241
313
242 Log files
314 Log files
243 ---------
315 ---------
244
316
245 All of the components of IPython have log files associated with them.
317 All of the components of IPython have log files associated with them.
246 These log files can be extremely useful in debugging problems with
318 These log files can be extremely useful in debugging problems with
247 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
319 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
248 the log files to us will often help us to debug any problems.
320 the log files to us will often help us to debug any problems.
249
321
250
322
251 .. [PBS] Portable Batch System. http://www.openpbs.org/
323 .. [PBS] Portable Batch System. http://www.openpbs.org/
324 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now