##// END OF EJS Templates
Merging in vvatsa's ssh mode for ipcluster with some changes....
Brian Granger -
Show More
@@ -1,904 +1,903 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3
3
4 """A Twisted Service Representation of the IPython core.
4 """A Twisted Service Representation of the IPython core.
5
5
6 The IPython Core exposed to the network is called the Engine. Its
6 The IPython Core exposed to the network is called the Engine. Its
7 representation in Twisted in the EngineService. Interfaces and adapters
7 representation in Twisted in the EngineService. Interfaces and adapters
8 are used to abstract out the details of the actual network protocol used.
8 are used to abstract out the details of the actual network protocol used.
9 The EngineService is an Engine that knows nothing about the actual protocol
9 The EngineService is an Engine that knows nothing about the actual protocol
10 used.
10 used.
11
11
12 The EngineService is exposed with various network protocols in modules like:
12 The EngineService is exposed with various network protocols in modules like:
13
13
14 enginepb.py
14 enginepb.py
15 enginevanilla.py
15 enginevanilla.py
16
16
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 felt that we had over-engineered things. To improve the maintainability of the
18 felt that we had over-engineered things. To improve the maintainability of the
19 code we have taken out the ICompleteEngine interface and the completeEngine
19 code we have taken out the ICompleteEngine interface and the completeEngine
20 method that automatically added methods to engines.
20 method that automatically added methods to engines.
21
21
22 """
22 """
23
23
24 __docformat__ = "restructuredtext en"
24 __docformat__ = "restructuredtext en"
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Copyright (C) 2008 The IPython Development Team
27 # Copyright (C) 2008 The IPython Development Team
28 #
28 #
29 # Distributed under the terms of the BSD License. The full license is in
29 # Distributed under the terms of the BSD License. The full license is in
30 # the file COPYING, distributed as part of this software.
30 # the file COPYING, distributed as part of this software.
31 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
32
32
33 #-------------------------------------------------------------------------------
33 #-------------------------------------------------------------------------------
34 # Imports
34 # Imports
35 #-------------------------------------------------------------------------------
35 #-------------------------------------------------------------------------------
36
36
37 import os, sys, copy
37 import os, sys, copy
38 import cPickle as pickle
38 import cPickle as pickle
39 from new import instancemethod
39 from new import instancemethod
40
40
41 from twisted.application import service
41 from twisted.application import service
42 from twisted.internet import defer, reactor
42 from twisted.internet import defer, reactor
43 from twisted.python import log, failure, components
43 from twisted.python import log, failure, components
44 import zope.interface as zi
44 import zope.interface as zi
45
45
46 from IPython.kernel.core.interpreter import Interpreter
46 from IPython.kernel.core.interpreter import Interpreter
47 from IPython.kernel import newserialized, error, util
47 from IPython.kernel import newserialized, error, util
48 from IPython.kernel.util import printer
48 from IPython.kernel.util import printer
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
50 from IPython.kernel import codeutil
50 from IPython.kernel import codeutil
51
51
52
52
53 #-------------------------------------------------------------------------------
53 #-------------------------------------------------------------------------------
54 # Interface specification for the Engine
54 # Interface specification for the Engine
55 #-------------------------------------------------------------------------------
55 #-------------------------------------------------------------------------------
56
56
57 class IEngineCore(zi.Interface):
57 class IEngineCore(zi.Interface):
58 """The minimal required interface for the IPython Engine.
58 """The minimal required interface for the IPython Engine.
59
59
60 This interface provides a formal specification of the IPython core.
60 This interface provides a formal specification of the IPython core.
61 All these methods should return deferreds regardless of what side of a
61 All these methods should return deferreds regardless of what side of a
62 network connection they are on.
62 network connection they are on.
63
63
64 In general, this class simply wraps a shell class and wraps its return
64 In general, this class simply wraps a shell class and wraps its return
65 values as Deferred objects. If the underlying shell class method raises
65 values as Deferred objects. If the underlying shell class method raises
66 an exception, this class should convert it to a twisted.failure.Failure
66 an exception, this class should convert it to a twisted.failure.Failure
67 that will be propagated along the Deferred's errback chain.
67 that will be propagated along the Deferred's errback chain.
68
68
69 In addition, Failures are aggressive. By this, we mean that if a method
69 In addition, Failures are aggressive. By this, we mean that if a method
70 is performing multiple actions (like pulling multiple object) if any
70 is performing multiple actions (like pulling multiple object) if any
71 single one fails, the entire method will fail with that Failure. It is
71 single one fails, the entire method will fail with that Failure. It is
72 all or nothing.
72 all or nothing.
73 """
73 """
74
74
75 id = zi.interface.Attribute("the id of the Engine object")
75 id = zi.interface.Attribute("the id of the Engine object")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
77
77
78 def execute(lines):
78 def execute(lines):
79 """Execute lines of Python code.
79 """Execute lines of Python code.
80
80
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
82 upon success.
82 upon success.
83
83
84 Returns a failure object if the execution of lines raises an exception.
84 Returns a failure object if the execution of lines raises an exception.
85 """
85 """
86
86
87 def push(namespace):
87 def push(namespace):
88 """Push dict namespace into the user's namespace.
88 """Push dict namespace into the user's namespace.
89
89
90 Returns a deferred to None or a failure.
90 Returns a deferred to None or a failure.
91 """
91 """
92
92
93 def pull(keys):
93 def pull(keys):
94 """Pulls values out of the user's namespace by keys.
94 """Pulls values out of the user's namespace by keys.
95
95
96 Returns a deferred to a tuple objects or a single object.
96 Returns a deferred to a tuple objects or a single object.
97
97
98 Raises NameError if any one of objects doess not exist.
98 Raises NameError if any one of objects doess not exist.
99 """
99 """
100
100
101 def push_function(namespace):
101 def push_function(namespace):
102 """Push a dict of key, function pairs into the user's namespace.
102 """Push a dict of key, function pairs into the user's namespace.
103
103
104 Returns a deferred to None or a failure."""
104 Returns a deferred to None or a failure."""
105
105
106 def pull_function(keys):
106 def pull_function(keys):
107 """Pulls functions out of the user's namespace by keys.
107 """Pulls functions out of the user's namespace by keys.
108
108
109 Returns a deferred to a tuple of functions or a single function.
109 Returns a deferred to a tuple of functions or a single function.
110
110
111 Raises NameError if any one of the functions does not exist.
111 Raises NameError if any one of the functions does not exist.
112 """
112 """
113
113
114 def get_result(i=None):
114 def get_result(i=None):
115 """Get the stdin/stdout/stderr of command i.
115 """Get the stdin/stdout/stderr of command i.
116
116
117 Returns a deferred to a dict with keys
117 Returns a deferred to a dict with keys
118 (id, number, stdin, stdout, stderr).
118 (id, number, stdin, stdout, stderr).
119
119
120 Raises IndexError if command i does not exist.
120 Raises IndexError if command i does not exist.
121 Raises TypeError if i in not an int.
121 Raises TypeError if i in not an int.
122 """
122 """
123
123
124 def reset():
124 def reset():
125 """Reset the shell.
125 """Reset the shell.
126
126
127 This clears the users namespace. Won't cause modules to be
127 This clears the users namespace. Won't cause modules to be
128 reloaded. Should also re-initialize certain variables like id.
128 reloaded. Should also re-initialize certain variables like id.
129 """
129 """
130
130
131 def kill():
131 def kill():
132 """Kill the engine by stopping the reactor."""
132 """Kill the engine by stopping the reactor."""
133
133
134 def keys():
134 def keys():
135 """Return the top level variables in the users namspace.
135 """Return the top level variables in the users namspace.
136
136
137 Returns a deferred to a dict."""
137 Returns a deferred to a dict."""
138
138
139
139
140 class IEngineSerialized(zi.Interface):
140 class IEngineSerialized(zi.Interface):
141 """Push/Pull methods that take Serialized objects.
141 """Push/Pull methods that take Serialized objects.
142
142
143 All methods should return deferreds.
143 All methods should return deferreds.
144 """
144 """
145
145
146 def push_serialized(namespace):
146 def push_serialized(namespace):
147 """Push a dict of keys and Serialized objects into the user's namespace."""
147 """Push a dict of keys and Serialized objects into the user's namespace."""
148
148
149 def pull_serialized(keys):
149 def pull_serialized(keys):
150 """Pull objects by key from the user's namespace as Serialized.
150 """Pull objects by key from the user's namespace as Serialized.
151
151
152 Returns a list of or one Serialized.
152 Returns a list of or one Serialized.
153
153
154 Raises NameError is any one of the objects does not exist.
154 Raises NameError is any one of the objects does not exist.
155 """
155 """
156
156
157
157
158 class IEngineProperties(zi.Interface):
158 class IEngineProperties(zi.Interface):
159 """Methods for access to the properties object of an Engine"""
159 """Methods for access to the properties object of an Engine"""
160
160
161 properties = zi.Attribute("A StrictDict object, containing the properties")
161 properties = zi.Attribute("A StrictDict object, containing the properties")
162
162
163 def set_properties(properties):
163 def set_properties(properties):
164 """set properties by key and value"""
164 """set properties by key and value"""
165
165
166 def get_properties(keys=None):
166 def get_properties(keys=None):
167 """get a list of properties by `keys`, if no keys specified, get all"""
167 """get a list of properties by `keys`, if no keys specified, get all"""
168
168
169 def del_properties(keys):
169 def del_properties(keys):
170 """delete properties by `keys`"""
170 """delete properties by `keys`"""
171
171
172 def has_properties(keys):
172 def has_properties(keys):
173 """get a list of bool values for whether `properties` has `keys`"""
173 """get a list of bool values for whether `properties` has `keys`"""
174
174
175 def clear_properties():
175 def clear_properties():
176 """clear the properties dict"""
176 """clear the properties dict"""
177
177
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
179 """The basic engine interface that EngineService will implement.
179 """The basic engine interface that EngineService will implement.
180
180
181 This exists so it is easy to specify adapters that adapt to and from the
181 This exists so it is easy to specify adapters that adapt to and from the
182 API that the basic EngineService implements.
182 API that the basic EngineService implements.
183 """
183 """
184 pass
184 pass
185
185
186 class IEngineQueued(IEngineBase):
186 class IEngineQueued(IEngineBase):
187 """Interface for adding a queue to an IEngineBase.
187 """Interface for adding a queue to an IEngineBase.
188
188
189 This interface extends the IEngineBase interface to add methods for managing
189 This interface extends the IEngineBase interface to add methods for managing
190 the engine's queue. The implicit details of this interface are that the
190 the engine's queue. The implicit details of this interface are that the
191 execution of all methods declared in IEngineBase should appropriately be
191 execution of all methods declared in IEngineBase should appropriately be
192 put through a queue before execution.
192 put through a queue before execution.
193
193
194 All methods should return deferreds.
194 All methods should return deferreds.
195 """
195 """
196
196
197 def clear_queue():
197 def clear_queue():
198 """Clear the queue."""
198 """Clear the queue."""
199
199
200 def queue_status():
200 def queue_status():
201 """Get the queued and pending commands in the queue."""
201 """Get the queued and pending commands in the queue."""
202
202
203 def register_failure_observer(obs):
203 def register_failure_observer(obs):
204 """Register an observer of pending Failures.
204 """Register an observer of pending Failures.
205
205
206 The observer must implement IFailureObserver.
206 The observer must implement IFailureObserver.
207 """
207 """
208
208
209 def unregister_failure_observer(obs):
209 def unregister_failure_observer(obs):
210 """Unregister an observer of pending Failures."""
210 """Unregister an observer of pending Failures."""
211
211
212
212
213 class IEngineThreaded(zi.Interface):
213 class IEngineThreaded(zi.Interface):
214 """A place holder for threaded commands.
214 """A place holder for threaded commands.
215
215
216 All methods should return deferreds.
216 All methods should return deferreds.
217 """
217 """
218 pass
218 pass
219
219
220
220
221 #-------------------------------------------------------------------------------
221 #-------------------------------------------------------------------------------
222 # Functions and classes to implement the EngineService
222 # Functions and classes to implement the EngineService
223 #-------------------------------------------------------------------------------
223 #-------------------------------------------------------------------------------
224
224
225
225
226 class StrictDict(dict):
226 class StrictDict(dict):
227 """This is a strict copying dictionary for use as the interface to the
227 """This is a strict copying dictionary for use as the interface to the
228 properties of an Engine.
228 properties of an Engine.
229
229
230 :IMPORTANT:
230 :IMPORTANT:
231 This object copies the values you set to it, and returns copies to you
231 This object copies the values you set to it, and returns copies to you
232 when you request them. The only way to change properties os explicitly
232 when you request them. The only way to change properties os explicitly
233 through the setitem and getitem of the dictionary interface.
233 through the setitem and getitem of the dictionary interface.
234
234
235 Example:
235 Example:
236 >>> e = get_engine(id)
236 >>> e = get_engine(id)
237 >>> L = [1,2,3]
237 >>> L = [1,2,3]
238 >>> e.properties['L'] = L
238 >>> e.properties['L'] = L
239 >>> L == e.properties['L']
239 >>> L == e.properties['L']
240 True
240 True
241 >>> L.append(99)
241 >>> L.append(99)
242 >>> L == e.properties['L']
242 >>> L == e.properties['L']
243 False
243 False
244
244
245 Note that getitem copies, so calls to methods of objects do not affect
245 Note that getitem copies, so calls to methods of objects do not affect
246 the properties, as seen here:
246 the properties, as seen here:
247
247
248 >>> e.properties[1] = range(2)
248 >>> e.properties[1] = range(2)
249 >>> print e.properties[1]
249 >>> print e.properties[1]
250 [0, 1]
250 [0, 1]
251 >>> e.properties[1].append(2)
251 >>> e.properties[1].append(2)
252 >>> print e.properties[1]
252 >>> print e.properties[1]
253 [0, 1]
253 [0, 1]
254 """
254 """
255 def __init__(self, *args, **kwargs):
255 def __init__(self, *args, **kwargs):
256 dict.__init__(self, *args, **kwargs)
256 dict.__init__(self, *args, **kwargs)
257 self.modified = True
257 self.modified = True
258
258
259 def __getitem__(self, key):
259 def __getitem__(self, key):
260 return copy.deepcopy(dict.__getitem__(self, key))
260 return copy.deepcopy(dict.__getitem__(self, key))
261
261
262 def __setitem__(self, key, value):
262 def __setitem__(self, key, value):
263 # check if this entry is valid for transport around the network
263 # check if this entry is valid for transport around the network
264 # and copying
264 # and copying
265 try:
265 try:
266 pickle.dumps(key, 2)
266 pickle.dumps(key, 2)
267 pickle.dumps(value, 2)
267 pickle.dumps(value, 2)
268 newvalue = copy.deepcopy(value)
268 newvalue = copy.deepcopy(value)
269 except:
269 except:
270 raise error.InvalidProperty(value)
270 raise error.InvalidProperty(value)
271 dict.__setitem__(self, key, newvalue)
271 dict.__setitem__(self, key, newvalue)
272 self.modified = True
272 self.modified = True
273
273
274 def __delitem__(self, key):
274 def __delitem__(self, key):
275 dict.__delitem__(self, key)
275 dict.__delitem__(self, key)
276 self.modified = True
276 self.modified = True
277
277
278 def update(self, dikt):
278 def update(self, dikt):
279 for k,v in dikt.iteritems():
279 for k,v in dikt.iteritems():
280 self[k] = v
280 self[k] = v
281
281
282 def pop(self, key):
282 def pop(self, key):
283 self.modified = True
283 self.modified = True
284 return dict.pop(self, key)
284 return dict.pop(self, key)
285
285
286 def popitem(self):
286 def popitem(self):
287 self.modified = True
287 self.modified = True
288 return dict.popitem(self)
288 return dict.popitem(self)
289
289
290 def clear(self):
290 def clear(self):
291 self.modified = True
291 self.modified = True
292 dict.clear(self)
292 dict.clear(self)
293
293
294 def subDict(self, *keys):
294 def subDict(self, *keys):
295 d = {}
295 d = {}
296 for key in keys:
296 for key in keys:
297 d[key] = self[key]
297 d[key] = self[key]
298 return d
298 return d
299
299
300
300
301
301
302 class EngineAPI(object):
302 class EngineAPI(object):
303 """This is the object through which the user can edit the `properties`
303 """This is the object through which the user can edit the `properties`
304 attribute of an Engine.
304 attribute of an Engine.
305 The Engine Properties object copies all object in and out of itself.
305 The Engine Properties object copies all object in and out of itself.
306 See the EngineProperties object for details.
306 See the EngineProperties object for details.
307 """
307 """
308 _fix=False
308 _fix=False
309 def __init__(self, id):
309 def __init__(self, id):
310 self.id = id
310 self.id = id
311 self.properties = StrictDict()
311 self.properties = StrictDict()
312 self._fix=True
312 self._fix=True
313
313
314 def __setattr__(self, k,v):
314 def __setattr__(self, k,v):
315 if self._fix:
315 if self._fix:
316 raise error.KernelError("I am protected!")
316 raise error.KernelError("I am protected!")
317 else:
317 else:
318 object.__setattr__(self, k, v)
318 object.__setattr__(self, k, v)
319
319
320 def __delattr__(self, key):
320 def __delattr__(self, key):
321 raise error.KernelError("I am protected!")
321 raise error.KernelError("I am protected!")
322
322
323
323
324 _apiDict = {}
324 _apiDict = {}
325
325
326 def get_engine(id):
326 def get_engine(id):
327 """Get the Engine API object, whcih currently just provides the properties
327 """Get the Engine API object, whcih currently just provides the properties
328 object, by ID"""
328 object, by ID"""
329 global _apiDict
329 global _apiDict
330 if not _apiDict.get(id):
330 if not _apiDict.get(id):
331 _apiDict[id] = EngineAPI(id)
331 _apiDict[id] = EngineAPI(id)
332 return _apiDict[id]
332 return _apiDict[id]
333
333
334 def drop_engine(id):
334 def drop_engine(id):
335 """remove an engine"""
335 """remove an engine"""
336 global _apiDict
336 global _apiDict
337 if _apiDict.has_key(id):
337 if _apiDict.has_key(id):
338 del _apiDict[id]
338 del _apiDict[id]
339
339
340 class EngineService(object, service.Service):
340 class EngineService(object, service.Service):
341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
342
342
343 zi.implements(IEngineBase)
343 zi.implements(IEngineBase)
344 name = 'EngineService'
344 name = 'EngineService'
345
345
346 def __init__(self, shellClass=Interpreter, mpi=None):
346 def __init__(self, shellClass=Interpreter, mpi=None):
347 """Create an EngineService.
347 """Create an EngineService.
348
348
349 shellClass: something that implements IInterpreter or core1
349 shellClass: something that implements IInterpreter or core1
350 mpi: an mpi module that has rank and size attributes
350 mpi: an mpi module that has rank and size attributes
351 """
351 """
352 self.shellClass = shellClass
352 self.shellClass = shellClass
353 self.shell = self.shellClass()
353 self.shell = self.shellClass()
354 self.mpi = mpi
354 self.mpi = mpi
355 self.id = None
355 self.id = None
356 self.properties = get_engine(self.id).properties
356 self.properties = get_engine(self.id).properties
357 if self.mpi is not None:
357 if self.mpi is not None:
358 log.msg("MPI started with rank = %i and size = %i" %
358 log.msg("MPI started with rank = %i and size = %i" %
359 (self.mpi.rank, self.mpi.size))
359 (self.mpi.rank, self.mpi.size))
360 self.id = self.mpi.rank
360 self.id = self.mpi.rank
361 self._seedNamespace()
361 self._seedNamespace()
362
362
363 # Make id a property so that the shell can get the updated id
363 # Make id a property so that the shell can get the updated id
364
364
365 def _setID(self, id):
365 def _setID(self, id):
366 self._id = id
366 self._id = id
367 self.properties = get_engine(id).properties
367 self.properties = get_engine(id).properties
368 self.shell.push({'id': id})
368 self.shell.push({'id': id})
369
369
370 def _getID(self):
370 def _getID(self):
371 return self._id
371 return self._id
372
372
373 id = property(_getID, _setID)
373 id = property(_getID, _setID)
374
374
375 def _seedNamespace(self):
375 def _seedNamespace(self):
376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
377
377
378 def executeAndRaise(self, msg, callable, *args, **kwargs):
378 def executeAndRaise(self, msg, callable, *args, **kwargs):
379 """Call a method of self.shell and wrap any exception."""
379 """Call a method of self.shell and wrap any exception."""
380 d = defer.Deferred()
380 d = defer.Deferred()
381 try:
381 try:
382 result = callable(*args, **kwargs)
382 result = callable(*args, **kwargs)
383 except:
383 except:
384 # This gives the following:
384 # This gives the following:
385 # et=exception class
385 # et=exception class
386 # ev=exception class instance
386 # ev=exception class instance
387 # tb=traceback object
387 # tb=traceback object
388 et,ev,tb = sys.exc_info()
388 et,ev,tb = sys.exc_info()
389 # This call adds attributes to the exception value
389 # This call adds attributes to the exception value
390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
391 # Add another attribute
391 # Add another attribute
392 ev._ipython_engine_info = msg
392 ev._ipython_engine_info = msg
393 f = failure.Failure(ev,et,None)
393 f = failure.Failure(ev,et,None)
394 d.errback(f)
394 d.errback(f)
395 else:
395 else:
396 d.callback(result)
396 d.callback(result)
397
397
398 return d
398 return d
399
399
400
400
401 # The IEngine methods. See the interface for documentation.
401 # The IEngine methods. See the interface for documentation.
402
402
403 @profile
404 def execute(self, lines):
403 def execute(self, lines):
405 msg = {'engineid':self.id,
404 msg = {'engineid':self.id,
406 'method':'execute',
405 'method':'execute',
407 'args':[lines]}
406 'args':[lines]}
408 d = self.executeAndRaise(msg, self.shell.execute, lines)
407 d = self.executeAndRaise(msg, self.shell.execute, lines)
409 d.addCallback(self.addIDToResult)
408 d.addCallback(self.addIDToResult)
410 return d
409 return d
411
410
412 def addIDToResult(self, result):
411 def addIDToResult(self, result):
413 result['id'] = self.id
412 result['id'] = self.id
414 return result
413 return result
415
414
416 def push(self, namespace):
415 def push(self, namespace):
417 msg = {'engineid':self.id,
416 msg = {'engineid':self.id,
418 'method':'push',
417 'method':'push',
419 'args':[repr(namespace.keys())]}
418 'args':[repr(namespace.keys())]}
420 d = self.executeAndRaise(msg, self.shell.push, namespace)
419 d = self.executeAndRaise(msg, self.shell.push, namespace)
421 return d
420 return d
422
421
423 def pull(self, keys):
422 def pull(self, keys):
424 msg = {'engineid':self.id,
423 msg = {'engineid':self.id,
425 'method':'pull',
424 'method':'pull',
426 'args':[repr(keys)]}
425 'args':[repr(keys)]}
427 d = self.executeAndRaise(msg, self.shell.pull, keys)
426 d = self.executeAndRaise(msg, self.shell.pull, keys)
428 return d
427 return d
429
428
430 def push_function(self, namespace):
429 def push_function(self, namespace):
431 msg = {'engineid':self.id,
430 msg = {'engineid':self.id,
432 'method':'push_function',
431 'method':'push_function',
433 'args':[repr(namespace.keys())]}
432 'args':[repr(namespace.keys())]}
434 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
433 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
435 return d
434 return d
436
435
437 def pull_function(self, keys):
436 def pull_function(self, keys):
438 msg = {'engineid':self.id,
437 msg = {'engineid':self.id,
439 'method':'pull_function',
438 'method':'pull_function',
440 'args':[repr(keys)]}
439 'args':[repr(keys)]}
441 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
440 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
442 return d
441 return d
443
442
444 def get_result(self, i=None):
443 def get_result(self, i=None):
445 msg = {'engineid':self.id,
444 msg = {'engineid':self.id,
446 'method':'get_result',
445 'method':'get_result',
447 'args':[repr(i)]}
446 'args':[repr(i)]}
448 d = self.executeAndRaise(msg, self.shell.getCommand, i)
447 d = self.executeAndRaise(msg, self.shell.getCommand, i)
449 d.addCallback(self.addIDToResult)
448 d.addCallback(self.addIDToResult)
450 return d
449 return d
451
450
452 def reset(self):
451 def reset(self):
453 msg = {'engineid':self.id,
452 msg = {'engineid':self.id,
454 'method':'reset',
453 'method':'reset',
455 'args':[]}
454 'args':[]}
456 del self.shell
455 del self.shell
457 self.shell = self.shellClass()
456 self.shell = self.shellClass()
458 self.properties.clear()
457 self.properties.clear()
459 d = self.executeAndRaise(msg, self._seedNamespace)
458 d = self.executeAndRaise(msg, self._seedNamespace)
460 return d
459 return d
461
460
462 def kill(self):
461 def kill(self):
463 drop_engine(self.id)
462 drop_engine(self.id)
464 try:
463 try:
465 reactor.stop()
464 reactor.stop()
466 except RuntimeError:
465 except RuntimeError:
467 log.msg('The reactor was not running apparently.')
466 log.msg('The reactor was not running apparently.')
468 return defer.fail()
467 return defer.fail()
469 else:
468 else:
470 return defer.succeed(None)
469 return defer.succeed(None)
471
470
472 def keys(self):
471 def keys(self):
473 """Return a list of variables names in the users top level namespace.
472 """Return a list of variables names in the users top level namespace.
474
473
475 This used to return a dict of all the keys/repr(values) in the
474 This used to return a dict of all the keys/repr(values) in the
476 user's namespace. This was too much info for the ControllerService
475 user's namespace. This was too much info for the ControllerService
477 to handle so it is now just a list of keys.
476 to handle so it is now just a list of keys.
478 """
477 """
479
478
480 remotes = []
479 remotes = []
481 for k in self.shell.user_ns.iterkeys():
480 for k in self.shell.user_ns.iterkeys():
482 if k not in ['__name__', '_ih', '_oh', '__builtins__',
481 if k not in ['__name__', '_ih', '_oh', '__builtins__',
483 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
482 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
484 remotes.append(k)
483 remotes.append(k)
485 return defer.succeed(remotes)
484 return defer.succeed(remotes)
486
485
487 def set_properties(self, properties):
486 def set_properties(self, properties):
488 msg = {'engineid':self.id,
487 msg = {'engineid':self.id,
489 'method':'set_properties',
488 'method':'set_properties',
490 'args':[repr(properties.keys())]}
489 'args':[repr(properties.keys())]}
491 return self.executeAndRaise(msg, self.properties.update, properties)
490 return self.executeAndRaise(msg, self.properties.update, properties)
492
491
493 def get_properties(self, keys=None):
492 def get_properties(self, keys=None):
494 msg = {'engineid':self.id,
493 msg = {'engineid':self.id,
495 'method':'get_properties',
494 'method':'get_properties',
496 'args':[repr(keys)]}
495 'args':[repr(keys)]}
497 if keys is None:
496 if keys is None:
498 keys = self.properties.keys()
497 keys = self.properties.keys()
499 return self.executeAndRaise(msg, self.properties.subDict, *keys)
498 return self.executeAndRaise(msg, self.properties.subDict, *keys)
500
499
501 def _doDel(self, keys):
500 def _doDel(self, keys):
502 for key in keys:
501 for key in keys:
503 del self.properties[key]
502 del self.properties[key]
504
503
505 def del_properties(self, keys):
504 def del_properties(self, keys):
506 msg = {'engineid':self.id,
505 msg = {'engineid':self.id,
507 'method':'del_properties',
506 'method':'del_properties',
508 'args':[repr(keys)]}
507 'args':[repr(keys)]}
509 return self.executeAndRaise(msg, self._doDel, keys)
508 return self.executeAndRaise(msg, self._doDel, keys)
510
509
511 def _doHas(self, keys):
510 def _doHas(self, keys):
512 return [self.properties.has_key(key) for key in keys]
511 return [self.properties.has_key(key) for key in keys]
513
512
514 def has_properties(self, keys):
513 def has_properties(self, keys):
515 msg = {'engineid':self.id,
514 msg = {'engineid':self.id,
516 'method':'has_properties',
515 'method':'has_properties',
517 'args':[repr(keys)]}
516 'args':[repr(keys)]}
518 return self.executeAndRaise(msg, self._doHas, keys)
517 return self.executeAndRaise(msg, self._doHas, keys)
519
518
520 def clear_properties(self):
519 def clear_properties(self):
521 msg = {'engineid':self.id,
520 msg = {'engineid':self.id,
522 'method':'clear_properties',
521 'method':'clear_properties',
523 'args':[]}
522 'args':[]}
524 return self.executeAndRaise(msg, self.properties.clear)
523 return self.executeAndRaise(msg, self.properties.clear)
525
524
526 def push_serialized(self, sNamespace):
525 def push_serialized(self, sNamespace):
527 msg = {'engineid':self.id,
526 msg = {'engineid':self.id,
528 'method':'push_serialized',
527 'method':'push_serialized',
529 'args':[repr(sNamespace.keys())]}
528 'args':[repr(sNamespace.keys())]}
530 ns = {}
529 ns = {}
531 for k,v in sNamespace.iteritems():
530 for k,v in sNamespace.iteritems():
532 try:
531 try:
533 unserialized = newserialized.IUnSerialized(v)
532 unserialized = newserialized.IUnSerialized(v)
534 ns[k] = unserialized.getObject()
533 ns[k] = unserialized.getObject()
535 except:
534 except:
536 return defer.fail()
535 return defer.fail()
537 return self.executeAndRaise(msg, self.shell.push, ns)
536 return self.executeAndRaise(msg, self.shell.push, ns)
538
537
539 def pull_serialized(self, keys):
538 def pull_serialized(self, keys):
540 msg = {'engineid':self.id,
539 msg = {'engineid':self.id,
541 'method':'pull_serialized',
540 'method':'pull_serialized',
542 'args':[repr(keys)]}
541 'args':[repr(keys)]}
543 if isinstance(keys, str):
542 if isinstance(keys, str):
544 keys = [keys]
543 keys = [keys]
545 if len(keys)==1:
544 if len(keys)==1:
546 d = self.executeAndRaise(msg, self.shell.pull, keys)
545 d = self.executeAndRaise(msg, self.shell.pull, keys)
547 d.addCallback(newserialized.serialize)
546 d.addCallback(newserialized.serialize)
548 return d
547 return d
549 elif len(keys)>1:
548 elif len(keys)>1:
550 d = self.executeAndRaise(msg, self.shell.pull, keys)
549 d = self.executeAndRaise(msg, self.shell.pull, keys)
551 @d.addCallback
550 @d.addCallback
552 def packThemUp(values):
551 def packThemUp(values):
553 serials = []
552 serials = []
554 for v in values:
553 for v in values:
555 try:
554 try:
556 serials.append(newserialized.serialize(v))
555 serials.append(newserialized.serialize(v))
557 except:
556 except:
558 return defer.fail(failure.Failure())
557 return defer.fail(failure.Failure())
559 return serials
558 return serials
560 return packThemUp
559 return packThemUp
561
560
562
561
563 def queue(methodToQueue):
562 def queue(methodToQueue):
564 def queuedMethod(this, *args, **kwargs):
563 def queuedMethod(this, *args, **kwargs):
565 name = methodToQueue.__name__
564 name = methodToQueue.__name__
566 return this.submitCommand(Command(name, *args, **kwargs))
565 return this.submitCommand(Command(name, *args, **kwargs))
567 return queuedMethod
566 return queuedMethod
568
567
569 class QueuedEngine(object):
568 class QueuedEngine(object):
570 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
569 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
571
570
572 The resulting object will implement IEngineQueued which extends
571 The resulting object will implement IEngineQueued which extends
573 IEngineCore which extends (IEngineBase, IEngineSerialized).
572 IEngineCore which extends (IEngineBase, IEngineSerialized).
574
573
575 This seems like the best way of handling it, but I am not sure. The
574 This seems like the best way of handling it, but I am not sure. The
576 other option is to have the various base interfaces be used like
575 other option is to have the various base interfaces be used like
577 mix-in intefaces. The problem I have with this is adpatation is
576 mix-in intefaces. The problem I have with this is adpatation is
578 more difficult and complicated because there can be can multiple
577 more difficult and complicated because there can be can multiple
579 original and final Interfaces.
578 original and final Interfaces.
580 """
579 """
581
580
582 zi.implements(IEngineQueued)
581 zi.implements(IEngineQueued)
583
582
584 def __init__(self, engine):
583 def __init__(self, engine):
585 """Create a QueuedEngine object from an engine
584 """Create a QueuedEngine object from an engine
586
585
587 engine: An implementor of IEngineCore and IEngineSerialized
586 engine: An implementor of IEngineCore and IEngineSerialized
588 keepUpToDate: whether to update the remote status when the
587 keepUpToDate: whether to update the remote status when the
589 queue is empty. Defaults to False.
588 queue is empty. Defaults to False.
590 """
589 """
591
590
592 # This is the right way to do these tests rather than
591 # This is the right way to do these tests rather than
593 # IEngineCore in list(zi.providedBy(engine)) which will only
592 # IEngineCore in list(zi.providedBy(engine)) which will only
594 # picks of the interfaces that are directly declared by engine.
593 # picks of the interfaces that are directly declared by engine.
595 assert IEngineBase.providedBy(engine), \
594 assert IEngineBase.providedBy(engine), \
596 "engine passed to QueuedEngine doesn't provide IEngineBase"
595 "engine passed to QueuedEngine doesn't provide IEngineBase"
597
596
598 self.engine = engine
597 self.engine = engine
599 self.id = engine.id
598 self.id = engine.id
600 self.queued = []
599 self.queued = []
601 self.history = {}
600 self.history = {}
602 self.engineStatus = {}
601 self.engineStatus = {}
603 self.currentCommand = None
602 self.currentCommand = None
604 self.failureObservers = []
603 self.failureObservers = []
605
604
606 def _get_properties(self):
605 def _get_properties(self):
607 return self.engine.properties
606 return self.engine.properties
608
607
609 properties = property(_get_properties, lambda self, _: None)
608 properties = property(_get_properties, lambda self, _: None)
610 # Queue management methods. You should not call these directly
609 # Queue management methods. You should not call these directly
611
610
612 def submitCommand(self, cmd):
611 def submitCommand(self, cmd):
613 """Submit command to queue."""
612 """Submit command to queue."""
614
613
615 d = defer.Deferred()
614 d = defer.Deferred()
616 cmd.setDeferred(d)
615 cmd.setDeferred(d)
617 if self.currentCommand is not None:
616 if self.currentCommand is not None:
618 if self.currentCommand.finished:
617 if self.currentCommand.finished:
619 # log.msg("Running command immediately: %r" % cmd)
618 # log.msg("Running command immediately: %r" % cmd)
620 self.currentCommand = cmd
619 self.currentCommand = cmd
621 self.runCurrentCommand()
620 self.runCurrentCommand()
622 else: # command is still running
621 else: # command is still running
623 # log.msg("Command is running: %r" % self.currentCommand)
622 # log.msg("Command is running: %r" % self.currentCommand)
624 # log.msg("Queueing: %r" % cmd)
623 # log.msg("Queueing: %r" % cmd)
625 self.queued.append(cmd)
624 self.queued.append(cmd)
626 else:
625 else:
627 # log.msg("No current commands, running: %r" % cmd)
626 # log.msg("No current commands, running: %r" % cmd)
628 self.currentCommand = cmd
627 self.currentCommand = cmd
629 self.runCurrentCommand()
628 self.runCurrentCommand()
630 return d
629 return d
631
630
632 def runCurrentCommand(self):
631 def runCurrentCommand(self):
633 """Run current command."""
632 """Run current command."""
634
633
635 cmd = self.currentCommand
634 cmd = self.currentCommand
636 f = getattr(self.engine, cmd.remoteMethod, None)
635 f = getattr(self.engine, cmd.remoteMethod, None)
637 if f:
636 if f:
638 d = f(*cmd.args, **cmd.kwargs)
637 d = f(*cmd.args, **cmd.kwargs)
639 if cmd.remoteMethod is 'execute':
638 if cmd.remoteMethod is 'execute':
640 d.addCallback(self.saveResult)
639 d.addCallback(self.saveResult)
641 d.addCallback(self.finishCommand)
640 d.addCallback(self.finishCommand)
642 d.addErrback(self.abortCommand)
641 d.addErrback(self.abortCommand)
643 else:
642 else:
644 return defer.fail(AttributeError(cmd.remoteMethod))
643 return defer.fail(AttributeError(cmd.remoteMethod))
645
644
646 def _flushQueue(self):
645 def _flushQueue(self):
647 """Pop next command in queue and run it."""
646 """Pop next command in queue and run it."""
648
647
649 if len(self.queued) > 0:
648 if len(self.queued) > 0:
650 self.currentCommand = self.queued.pop(0)
649 self.currentCommand = self.queued.pop(0)
651 self.runCurrentCommand()
650 self.runCurrentCommand()
652
651
653 def saveResult(self, result):
652 def saveResult(self, result):
654 """Put the result in the history."""
653 """Put the result in the history."""
655 self.history[result['number']] = result
654 self.history[result['number']] = result
656 return result
655 return result
657
656
658 def finishCommand(self, result):
657 def finishCommand(self, result):
659 """Finish currrent command."""
658 """Finish currrent command."""
660
659
661 # The order of these commands is absolutely critical.
660 # The order of these commands is absolutely critical.
662 self.currentCommand.handleResult(result)
661 self.currentCommand.handleResult(result)
663 self.currentCommand.finished = True
662 self.currentCommand.finished = True
664 self._flushQueue()
663 self._flushQueue()
665 return result
664 return result
666
665
667 def abortCommand(self, reason):
666 def abortCommand(self, reason):
668 """Abort current command.
667 """Abort current command.
669
668
670 This eats the Failure but first passes it onto the Deferred that the
669 This eats the Failure but first passes it onto the Deferred that the
671 user has.
670 user has.
672
671
673 It also clear out the queue so subsequence commands don't run.
672 It also clear out the queue so subsequence commands don't run.
674 """
673 """
675
674
676 # The order of these 3 commands is absolutely critical. The currentCommand
675 # The order of these 3 commands is absolutely critical. The currentCommand
677 # must first be marked as finished BEFORE the queue is cleared and before
676 # must first be marked as finished BEFORE the queue is cleared and before
678 # the current command is sent the failure.
677 # the current command is sent the failure.
679 # Also, the queue must be cleared BEFORE the current command is sent the Failure
678 # Also, the queue must be cleared BEFORE the current command is sent the Failure
680 # otherwise the errback chain could trigger new commands to be added to the
679 # otherwise the errback chain could trigger new commands to be added to the
681 # queue before we clear it. We should clear ONLY the commands that were in
680 # queue before we clear it. We should clear ONLY the commands that were in
682 # the queue when the error occured.
681 # the queue when the error occured.
683 self.currentCommand.finished = True
682 self.currentCommand.finished = True
684 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
683 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
685 self.clear_queue(msg=s)
684 self.clear_queue(msg=s)
686 self.currentCommand.handleError(reason)
685 self.currentCommand.handleError(reason)
687
686
688 return None
687 return None
689
688
690 #---------------------------------------------------------------------------
689 #---------------------------------------------------------------------------
691 # IEngineCore methods
690 # IEngineCore methods
692 #---------------------------------------------------------------------------
691 #---------------------------------------------------------------------------
693
692
694 @queue
693 @queue
695 def execute(self, lines):
694 def execute(self, lines):
696 pass
695 pass
697
696
698 @queue
697 @queue
699 def push(self, namespace):
698 def push(self, namespace):
700 pass
699 pass
701
700
702 @queue
701 @queue
703 def pull(self, keys):
702 def pull(self, keys):
704 pass
703 pass
705
704
706 @queue
705 @queue
707 def push_function(self, namespace):
706 def push_function(self, namespace):
708 pass
707 pass
709
708
710 @queue
709 @queue
711 def pull_function(self, keys):
710 def pull_function(self, keys):
712 pass
711 pass
713
712
714 def get_result(self, i=None):
713 def get_result(self, i=None):
715 if i is None:
714 if i is None:
716 i = max(self.history.keys()+[None])
715 i = max(self.history.keys()+[None])
717
716
718 cmd = self.history.get(i, None)
717 cmd = self.history.get(i, None)
719 # Uncomment this line to disable chaching of results
718 # Uncomment this line to disable chaching of results
720 #cmd = None
719 #cmd = None
721 if cmd is None:
720 if cmd is None:
722 return self.submitCommand(Command('get_result', i))
721 return self.submitCommand(Command('get_result', i))
723 else:
722 else:
724 return defer.succeed(cmd)
723 return defer.succeed(cmd)
725
724
726 def reset(self):
725 def reset(self):
727 self.clear_queue()
726 self.clear_queue()
728 self.history = {} # reset the cache - I am not sure we should do this
727 self.history = {} # reset the cache - I am not sure we should do this
729 return self.submitCommand(Command('reset'))
728 return self.submitCommand(Command('reset'))
730
729
731 def kill(self):
730 def kill(self):
732 self.clear_queue()
731 self.clear_queue()
733 return self.submitCommand(Command('kill'))
732 return self.submitCommand(Command('kill'))
734
733
735 @queue
734 @queue
736 def keys(self):
735 def keys(self):
737 pass
736 pass
738
737
739 #---------------------------------------------------------------------------
738 #---------------------------------------------------------------------------
740 # IEngineSerialized methods
739 # IEngineSerialized methods
741 #---------------------------------------------------------------------------
740 #---------------------------------------------------------------------------
742
741
743 @queue
742 @queue
744 def push_serialized(self, namespace):
743 def push_serialized(self, namespace):
745 pass
744 pass
746
745
747 @queue
746 @queue
748 def pull_serialized(self, keys):
747 def pull_serialized(self, keys):
749 pass
748 pass
750
749
751 #---------------------------------------------------------------------------
750 #---------------------------------------------------------------------------
752 # IEngineProperties methods
751 # IEngineProperties methods
753 #---------------------------------------------------------------------------
752 #---------------------------------------------------------------------------
754
753
755 @queue
754 @queue
756 def set_properties(self, namespace):
755 def set_properties(self, namespace):
757 pass
756 pass
758
757
759 @queue
758 @queue
760 def get_properties(self, keys=None):
759 def get_properties(self, keys=None):
761 pass
760 pass
762
761
763 @queue
762 @queue
764 def del_properties(self, keys):
763 def del_properties(self, keys):
765 pass
764 pass
766
765
767 @queue
766 @queue
768 def has_properties(self, keys):
767 def has_properties(self, keys):
769 pass
768 pass
770
769
771 @queue
770 @queue
772 def clear_properties(self):
771 def clear_properties(self):
773 pass
772 pass
774
773
775 #---------------------------------------------------------------------------
774 #---------------------------------------------------------------------------
776 # IQueuedEngine methods
775 # IQueuedEngine methods
777 #---------------------------------------------------------------------------
776 #---------------------------------------------------------------------------
778
777
779 def clear_queue(self, msg=''):
778 def clear_queue(self, msg=''):
780 """Clear the queue, but doesn't cancel the currently running commmand."""
779 """Clear the queue, but doesn't cancel the currently running commmand."""
781
780
782 for cmd in self.queued:
781 for cmd in self.queued:
783 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
782 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
784 self.queued = []
783 self.queued = []
785 return defer.succeed(None)
784 return defer.succeed(None)
786
785
787 def queue_status(self):
786 def queue_status(self):
788 if self.currentCommand is not None:
787 if self.currentCommand is not None:
789 if self.currentCommand.finished:
788 if self.currentCommand.finished:
790 pending = repr(None)
789 pending = repr(None)
791 else:
790 else:
792 pending = repr(self.currentCommand)
791 pending = repr(self.currentCommand)
793 else:
792 else:
794 pending = repr(None)
793 pending = repr(None)
795 dikt = {'queue':map(repr,self.queued), 'pending':pending}
794 dikt = {'queue':map(repr,self.queued), 'pending':pending}
796 return defer.succeed(dikt)
795 return defer.succeed(dikt)
797
796
798 def register_failure_observer(self, obs):
797 def register_failure_observer(self, obs):
799 self.failureObservers.append(obs)
798 self.failureObservers.append(obs)
800
799
801 def unregister_failure_observer(self, obs):
800 def unregister_failure_observer(self, obs):
802 self.failureObservers.remove(obs)
801 self.failureObservers.remove(obs)
803
802
804
803
805 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
804 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
806 # IEngineQueued.
805 # IEngineQueued.
807 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
806 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
808
807
809
808
810 class Command(object):
809 class Command(object):
811 """A command object that encapslates queued commands.
810 """A command object that encapslates queued commands.
812
811
813 This class basically keeps track of a command that has been queued
812 This class basically keeps track of a command that has been queued
814 in a QueuedEngine. It manages the deferreds and hold the method to be called
813 in a QueuedEngine. It manages the deferreds and hold the method to be called
815 and the arguments to that method.
814 and the arguments to that method.
816 """
815 """
817
816
818
817
819 def __init__(self, remoteMethod, *args, **kwargs):
818 def __init__(self, remoteMethod, *args, **kwargs):
820 """Build a new Command object."""
819 """Build a new Command object."""
821
820
822 self.remoteMethod = remoteMethod
821 self.remoteMethod = remoteMethod
823 self.args = args
822 self.args = args
824 self.kwargs = kwargs
823 self.kwargs = kwargs
825 self.finished = False
824 self.finished = False
826
825
827 def setDeferred(self, d):
826 def setDeferred(self, d):
828 """Sets the deferred attribute of the Command."""
827 """Sets the deferred attribute of the Command."""
829
828
830 self.deferred = d
829 self.deferred = d
831
830
832 def __repr__(self):
831 def __repr__(self):
833 if not self.args:
832 if not self.args:
834 args = ''
833 args = ''
835 else:
834 else:
836 args = str(self.args)[1:-2] #cut off (...,)
835 args = str(self.args)[1:-2] #cut off (...,)
837 for k,v in self.kwargs.iteritems():
836 for k,v in self.kwargs.iteritems():
838 if args:
837 if args:
839 args += ', '
838 args += ', '
840 args += '%s=%r' %(k,v)
839 args += '%s=%r' %(k,v)
841 return "%s(%s)" %(self.remoteMethod, args)
840 return "%s(%s)" %(self.remoteMethod, args)
842
841
843 def handleResult(self, result):
842 def handleResult(self, result):
844 """When the result is ready, relay it to self.deferred."""
843 """When the result is ready, relay it to self.deferred."""
845
844
846 self.deferred.callback(result)
845 self.deferred.callback(result)
847
846
848 def handleError(self, reason):
847 def handleError(self, reason):
849 """When an error has occured, relay it to self.deferred."""
848 """When an error has occured, relay it to self.deferred."""
850
849
851 self.deferred.errback(reason)
850 self.deferred.errback(reason)
852
851
853 class ThreadedEngineService(EngineService):
852 class ThreadedEngineService(EngineService):
854 """An EngineService subclass that defers execute commands to a separate
853 """An EngineService subclass that defers execute commands to a separate
855 thread.
854 thread.
856
855
857 ThreadedEngineService uses twisted.internet.threads.deferToThread to
856 ThreadedEngineService uses twisted.internet.threads.deferToThread to
858 defer execute requests to a separate thread. GUI frontends may want to
857 defer execute requests to a separate thread. GUI frontends may want to
859 use ThreadedEngineService as the engine in an
858 use ThreadedEngineService as the engine in an
860 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
859 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
861 block execution from blocking the GUI thread.
860 block execution from blocking the GUI thread.
862 """
861 """
863
862
864 zi.implements(IEngineBase)
863 zi.implements(IEngineBase)
865
864
866 def __init__(self, shellClass=Interpreter, mpi=None):
865 def __init__(self, shellClass=Interpreter, mpi=None):
867 EngineService.__init__(self, shellClass, mpi)
866 EngineService.__init__(self, shellClass, mpi)
868
867
869 def wrapped_execute(self, msg, lines):
868 def wrapped_execute(self, msg, lines):
870 """Wrap self.shell.execute to add extra information to tracebacks"""
869 """Wrap self.shell.execute to add extra information to tracebacks"""
871
870
872 try:
871 try:
873 result = self.shell.execute(lines)
872 result = self.shell.execute(lines)
874 except Exception,e:
873 except Exception,e:
875 # This gives the following:
874 # This gives the following:
876 # et=exception class
875 # et=exception class
877 # ev=exception class instance
876 # ev=exception class instance
878 # tb=traceback object
877 # tb=traceback object
879 et,ev,tb = sys.exc_info()
878 et,ev,tb = sys.exc_info()
880 # This call adds attributes to the exception value
879 # This call adds attributes to the exception value
881 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
880 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
882 # Add another attribute
881 # Add another attribute
883
882
884 # Create a new exception with the new attributes
883 # Create a new exception with the new attributes
885 e = et(ev._ipython_traceback_text)
884 e = et(ev._ipython_traceback_text)
886 e._ipython_engine_info = msg
885 e._ipython_engine_info = msg
887
886
888 # Re-raise
887 # Re-raise
889 raise e
888 raise e
890
889
891 return result
890 return result
892
891
893
892
894 def execute(self, lines):
893 def execute(self, lines):
895 # Only import this if we are going to use this class
894 # Only import this if we are going to use this class
896 from twisted.internet import threads
895 from twisted.internet import threads
897
896
898 msg = {'engineid':self.id,
897 msg = {'engineid':self.id,
899 'method':'execute',
898 'method':'execute',
900 'args':[lines]}
899 'args':[lines]}
901
900
902 d = threads.deferToThread(self.wrapped_execute, msg, lines)
901 d = threads.deferToThread(self.wrapped_execute, msg, lines)
903 d.addCallback(self.addIDToResult)
902 d.addCallback(self.addIDToResult)
904 return d
903 return d
@@ -1,754 +1,753 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3
3
4 """Adapt the IPython ControllerServer to IMultiEngine.
4 """Adapt the IPython ControllerServer to IMultiEngine.
5
5
6 This module provides classes that adapt a ControllerService to the
6 This module provides classes that adapt a ControllerService to the
7 IMultiEngine interface. This interface is a basic interactive interface
7 IMultiEngine interface. This interface is a basic interactive interface
8 for working with a set of engines where it is desired to have explicit
8 for working with a set of engines where it is desired to have explicit
9 access to each registered engine.
9 access to each registered engine.
10
10
11 The classes here are exposed to the network in files like:
11 The classes here are exposed to the network in files like:
12
12
13 * multienginevanilla.py
13 * multienginevanilla.py
14 * multienginepb.py
14 * multienginepb.py
15 """
15 """
16
16
17 __docformat__ = "restructuredtext en"
17 __docformat__ = "restructuredtext en"
18
18
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20 # Copyright (C) 2008 The IPython Development Team
20 # Copyright (C) 2008 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-------------------------------------------------------------------------------
24 #-------------------------------------------------------------------------------
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Imports
27 # Imports
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29
29
30 from new import instancemethod
30 from new import instancemethod
31 from types import FunctionType
31 from types import FunctionType
32
32
33 from twisted.application import service
33 from twisted.application import service
34 from twisted.internet import defer, reactor
34 from twisted.internet import defer, reactor
35 from twisted.python import log, components, failure
35 from twisted.python import log, components, failure
36 from zope.interface import Interface, implements, Attribute
36 from zope.interface import Interface, implements, Attribute
37
37
38 from IPython.tools import growl
38 from IPython.tools import growl
39 from IPython.kernel.util import printer
39 from IPython.kernel.util import printer
40 from IPython.kernel.twistedutil import gatherBoth
40 from IPython.kernel.twistedutil import gatherBoth
41 from IPython.kernel import map as Map
41 from IPython.kernel import map as Map
42 from IPython.kernel import error
42 from IPython.kernel import error
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 from IPython.kernel.controllerservice import \
44 from IPython.kernel.controllerservice import \
45 ControllerAdapterBase, \
45 ControllerAdapterBase, \
46 ControllerService, \
46 ControllerService, \
47 IControllerBase
47 IControllerBase
48
48
49
49
50 #-------------------------------------------------------------------------------
50 #-------------------------------------------------------------------------------
51 # Interfaces for the MultiEngine representation of a controller
51 # Interfaces for the MultiEngine representation of a controller
52 #-------------------------------------------------------------------------------
52 #-------------------------------------------------------------------------------
53
53
54 class IEngineMultiplexer(Interface):
54 class IEngineMultiplexer(Interface):
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56
56
57 This class simply acts as a multiplexer of methods that are in the
57 This class simply acts as a multiplexer of methods that are in the
58 various IEngines* interfaces. Thus the methods here are jut like those
58 various IEngines* interfaces. Thus the methods here are jut like those
59 in the IEngine* interfaces, but with an extra first argument, targets.
59 in the IEngine* interfaces, but with an extra first argument, targets.
60 The targets argument can have the following forms:
60 The targets argument can have the following forms:
61
61
62 * targets = 10 # Engines are indexed by ints
62 * targets = 10 # Engines are indexed by ints
63 * targets = [0,1,2,3] # A list of ints
63 * targets = [0,1,2,3] # A list of ints
64 * targets = 'all' # A string to indicate all targets
64 * targets = 'all' # A string to indicate all targets
65
65
66 If targets is bad in any way, an InvalidEngineID will be raised. This
66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 includes engines not being registered.
67 includes engines not being registered.
68
68
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 with length equal to the number of targets. The elements of the list will
70 with length equal to the number of targets. The elements of the list will
71 correspond to the return of the corresponding IEngine method.
71 correspond to the return of the corresponding IEngine method.
72
72
73 Failures are aggressive, meaning that if an action fails for any target,
73 Failures are aggressive, meaning that if an action fails for any target,
74 the overall action will fail immediately with that Failure.
74 the overall action will fail immediately with that Failure.
75
75
76 :Parameters:
76 :Parameters:
77 targets : int, list of ints, or 'all'
77 targets : int, list of ints, or 'all'
78 Engine ids the action will apply to.
78 Engine ids the action will apply to.
79
79
80 :Returns: Deferred to a list of results for each engine.
80 :Returns: Deferred to a list of results for each engine.
81
81
82 :Exception:
82 :Exception:
83 InvalidEngineID
83 InvalidEngineID
84 If the targets argument is bad or engines aren't registered.
84 If the targets argument is bad or engines aren't registered.
85 NoEnginesRegistered
85 NoEnginesRegistered
86 If there are no engines registered and targets='all'
86 If there are no engines registered and targets='all'
87 """
87 """
88
88
89 #---------------------------------------------------------------------------
89 #---------------------------------------------------------------------------
90 # Mutiplexed methods
90 # Mutiplexed methods
91 #---------------------------------------------------------------------------
91 #---------------------------------------------------------------------------
92
92
93 def execute(lines, targets='all'):
93 def execute(lines, targets='all'):
94 """Execute lines of Python code on targets.
94 """Execute lines of Python code on targets.
95
95
96 See the class docstring for information about targets and possible
96 See the class docstring for information about targets and possible
97 exceptions this method can raise.
97 exceptions this method can raise.
98
98
99 :Parameters:
99 :Parameters:
100 lines : str
100 lines : str
101 String of python code to be executed on targets.
101 String of python code to be executed on targets.
102 """
102 """
103
103
104 def push(namespace, targets='all'):
104 def push(namespace, targets='all'):
105 """Push dict namespace into the user's namespace on targets.
105 """Push dict namespace into the user's namespace on targets.
106
106
107 See the class docstring for information about targets and possible
107 See the class docstring for information about targets and possible
108 exceptions this method can raise.
108 exceptions this method can raise.
109
109
110 :Parameters:
110 :Parameters:
111 namspace : dict
111 namspace : dict
112 Dict of key value pairs to be put into the users namspace.
112 Dict of key value pairs to be put into the users namspace.
113 """
113 """
114
114
115 def pull(keys, targets='all'):
115 def pull(keys, targets='all'):
116 """Pull values out of the user's namespace on targets by keys.
116 """Pull values out of the user's namespace on targets by keys.
117
117
118 See the class docstring for information about targets and possible
118 See the class docstring for information about targets and possible
119 exceptions this method can raise.
119 exceptions this method can raise.
120
120
121 :Parameters:
121 :Parameters:
122 keys : tuple of strings
122 keys : tuple of strings
123 Sequence of keys to be pulled from user's namespace.
123 Sequence of keys to be pulled from user's namespace.
124 """
124 """
125
125
126 def push_function(namespace, targets='all'):
126 def push_function(namespace, targets='all'):
127 """"""
127 """"""
128
128
129 def pull_function(keys, targets='all'):
129 def pull_function(keys, targets='all'):
130 """"""
130 """"""
131
131
132 def get_result(i=None, targets='all'):
132 def get_result(i=None, targets='all'):
133 """Get the result for command i from targets.
133 """Get the result for command i from targets.
134
134
135 See the class docstring for information about targets and possible
135 See the class docstring for information about targets and possible
136 exceptions this method can raise.
136 exceptions this method can raise.
137
137
138 :Parameters:
138 :Parameters:
139 i : int or None
139 i : int or None
140 Command index or None to indicate most recent command.
140 Command index or None to indicate most recent command.
141 """
141 """
142
142
143 def reset(targets='all'):
143 def reset(targets='all'):
144 """Reset targets.
144 """Reset targets.
145
145
146 This clears the users namespace of the Engines, but won't cause
146 This clears the users namespace of the Engines, but won't cause
147 modules to be reloaded.
147 modules to be reloaded.
148 """
148 """
149
149
150 def keys(targets='all'):
150 def keys(targets='all'):
151 """Get variable names defined in user's namespace on targets."""
151 """Get variable names defined in user's namespace on targets."""
152
152
153 def kill(controller=False, targets='all'):
153 def kill(controller=False, targets='all'):
154 """Kill the targets Engines and possibly the controller.
154 """Kill the targets Engines and possibly the controller.
155
155
156 :Parameters:
156 :Parameters:
157 controller : boolean
157 controller : boolean
158 Should the controller be killed as well. If so all the
158 Should the controller be killed as well. If so all the
159 engines will be killed first no matter what targets is.
159 engines will be killed first no matter what targets is.
160 """
160 """
161
161
162 def push_serialized(namespace, targets='all'):
162 def push_serialized(namespace, targets='all'):
163 """Push a namespace of Serialized objects to targets.
163 """Push a namespace of Serialized objects to targets.
164
164
165 :Parameters:
165 :Parameters:
166 namespace : dict
166 namespace : dict
167 A dict whose keys are the variable names and whose values
167 A dict whose keys are the variable names and whose values
168 are serialized version of the objects.
168 are serialized version of the objects.
169 """
169 """
170
170
171 def pull_serialized(keys, targets='all'):
171 def pull_serialized(keys, targets='all'):
172 """Pull Serialized objects by keys from targets.
172 """Pull Serialized objects by keys from targets.
173
173
174 :Parameters:
174 :Parameters:
175 keys : tuple of strings
175 keys : tuple of strings
176 Sequence of variable names to pull as serialized objects.
176 Sequence of variable names to pull as serialized objects.
177 """
177 """
178
178
179 def clear_queue(targets='all'):
179 def clear_queue(targets='all'):
180 """Clear the queue of pending command for targets."""
180 """Clear the queue of pending command for targets."""
181
181
182 def queue_status(targets='all'):
182 def queue_status(targets='all'):
183 """Get the status of the queue on the targets."""
183 """Get the status of the queue on the targets."""
184
184
185 def set_properties(properties, targets='all'):
185 def set_properties(properties, targets='all'):
186 """set properties by key and value"""
186 """set properties by key and value"""
187
187
188 def get_properties(keys=None, targets='all'):
188 def get_properties(keys=None, targets='all'):
189 """get a list of properties by `keys`, if no keys specified, get all"""
189 """get a list of properties by `keys`, if no keys specified, get all"""
190
190
191 def del_properties(keys, targets='all'):
191 def del_properties(keys, targets='all'):
192 """delete properties by `keys`"""
192 """delete properties by `keys`"""
193
193
194 def has_properties(keys, targets='all'):
194 def has_properties(keys, targets='all'):
195 """get a list of bool values for whether `properties` has `keys`"""
195 """get a list of bool values for whether `properties` has `keys`"""
196
196
197 def clear_properties(targets='all'):
197 def clear_properties(targets='all'):
198 """clear the properties dict"""
198 """clear the properties dict"""
199
199
200
200
201 class IMultiEngine(IEngineMultiplexer):
201 class IMultiEngine(IEngineMultiplexer):
202 """A controller that exposes an explicit interface to all of its engines.
202 """A controller that exposes an explicit interface to all of its engines.
203
203
204 This is the primary inteface for interactive usage.
204 This is the primary inteface for interactive usage.
205 """
205 """
206
206
207 def get_ids():
207 def get_ids():
208 """Return list of currently registered ids.
208 """Return list of currently registered ids.
209
209
210 :Returns: A Deferred to a list of registered engine ids.
210 :Returns: A Deferred to a list of registered engine ids.
211 """
211 """
212
212
213
213
214
214
215 #-------------------------------------------------------------------------------
215 #-------------------------------------------------------------------------------
216 # Implementation of the core MultiEngine classes
216 # Implementation of the core MultiEngine classes
217 #-------------------------------------------------------------------------------
217 #-------------------------------------------------------------------------------
218
218
219 class MultiEngine(ControllerAdapterBase):
219 class MultiEngine(ControllerAdapterBase):
220 """The representation of a ControllerService as a IMultiEngine.
220 """The representation of a ControllerService as a IMultiEngine.
221
221
222 Although it is not implemented currently, this class would be where a
222 Although it is not implemented currently, this class would be where a
223 client/notification API is implemented. It could inherit from something
223 client/notification API is implemented. It could inherit from something
224 like results.NotifierParent and then use the notify method to send
224 like results.NotifierParent and then use the notify method to send
225 notifications.
225 notifications.
226 """
226 """
227
227
228 implements(IMultiEngine)
228 implements(IMultiEngine)
229
229
230 def __init(self, controller):
230 def __init(self, controller):
231 ControllerAdapterBase.__init__(self, controller)
231 ControllerAdapterBase.__init__(self, controller)
232
232
233 #---------------------------------------------------------------------------
233 #---------------------------------------------------------------------------
234 # Helper methods
234 # Helper methods
235 #---------------------------------------------------------------------------
235 #---------------------------------------------------------------------------
236
236
237 def engineList(self, targets):
237 def engineList(self, targets):
238 """Parse the targets argument into a list of valid engine objects.
238 """Parse the targets argument into a list of valid engine objects.
239
239
240 :Parameters:
240 :Parameters:
241 targets : int, list of ints or 'all'
241 targets : int, list of ints or 'all'
242 The targets argument to be parsed.
242 The targets argument to be parsed.
243
243
244 :Returns: List of engine objects.
244 :Returns: List of engine objects.
245
245
246 :Exception:
246 :Exception:
247 InvalidEngineID
247 InvalidEngineID
248 If targets is not valid or if an engine is not registered.
248 If targets is not valid or if an engine is not registered.
249 """
249 """
250 if isinstance(targets, int):
250 if isinstance(targets, int):
251 if targets not in self.engines.keys():
251 if targets not in self.engines.keys():
252 log.msg("Engine with id %i is not registered" % targets)
252 log.msg("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 else:
254 else:
255 return [self.engines[targets]]
255 return [self.engines[targets]]
256 elif isinstance(targets, (list, tuple)):
256 elif isinstance(targets, (list, tuple)):
257 for id in targets:
257 for id in targets:
258 if id not in self.engines.keys():
258 if id not in self.engines.keys():
259 log.msg("Engine with id %r is not registered" % id)
259 log.msg("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 return map(self.engines.get, targets)
261 return map(self.engines.get, targets)
262 elif targets == 'all':
262 elif targets == 'all':
263 eList = self.engines.values()
263 eList = self.engines.values()
264 if len(eList) == 0:
264 if len(eList) == 0:
265 msg = """There are no engines registered.
265 msg = """There are no engines registered.
266 Check the logs in ~/.ipython/log if you think there should have been."""
266 Check the logs in ~/.ipython/log if you think there should have been."""
267 raise error.NoEnginesRegistered(msg)
267 raise error.NoEnginesRegistered(msg)
268 else:
268 else:
269 return eList
269 return eList
270 else:
270 else:
271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272
272
273 def _performOnEngines(self, methodName, *args, **kwargs):
273 def _performOnEngines(self, methodName, *args, **kwargs):
274 """Calls a method on engines and returns deferred to list of results.
274 """Calls a method on engines and returns deferred to list of results.
275
275
276 :Parameters:
276 :Parameters:
277 methodName : str
277 methodName : str
278 Name of the method to be called.
278 Name of the method to be called.
279 targets : int, list of ints, 'all'
279 targets : int, list of ints, 'all'
280 The targets argument to be parsed into a list of engine objects.
280 The targets argument to be parsed into a list of engine objects.
281 args
281 args
282 The positional keyword arguments to be passed to the engines.
282 The positional keyword arguments to be passed to the engines.
283 kwargs
283 kwargs
284 The keyword arguments passed to the method
284 The keyword arguments passed to the method
285
285
286 :Returns: List of deferreds to the results on each engine
286 :Returns: List of deferreds to the results on each engine
287
287
288 :Exception:
288 :Exception:
289 InvalidEngineID
289 InvalidEngineID
290 If the targets argument is bad in any way.
290 If the targets argument is bad in any way.
291 AttributeError
291 AttributeError
292 If the method doesn't exist on one of the engines.
292 If the method doesn't exist on one of the engines.
293 """
293 """
294 targets = kwargs.pop('targets')
294 targets = kwargs.pop('targets')
295 log.msg("Performing %s on %r" % (methodName, targets))
295 log.msg("Performing %s on %r" % (methodName, targets))
296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 # This will and should raise if targets is not valid!
297 # This will and should raise if targets is not valid!
298 engines = self.engineList(targets)
298 engines = self.engineList(targets)
299 dList = []
299 dList = []
300 for e in engines:
300 for e in engines:
301 meth = getattr(e, methodName, None)
301 meth = getattr(e, methodName, None)
302 if meth is not None:
302 if meth is not None:
303 dList.append(meth(*args, **kwargs))
303 dList.append(meth(*args, **kwargs))
304 else:
304 else:
305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 return dList
306 return dList
307
307
308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 """Called _performOnEngines and wraps result/exception into deferred."""
309 """Called _performOnEngines and wraps result/exception into deferred."""
310 try:
310 try:
311 dList = self._performOnEngines(methodName, *args, **kwargs)
311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 return defer.fail(failure.Failure())
313 return defer.fail(failure.Failure())
314 else:
314 else:
315 # Having fireOnOneErrback is causing problems with the determinacy
315 # Having fireOnOneErrback is causing problems with the determinacy
316 # of the system. Basically, once a single engine has errbacked, this
316 # of the system. Basically, once a single engine has errbacked, this
317 # method returns. In some cases, this will cause client to submit
317 # method returns. In some cases, this will cause client to submit
318 # another command. Because the previous command is still running
318 # another command. Because the previous command is still running
319 # on some engines, this command will be queued. When those commands
319 # on some engines, this command will be queued. When those commands
320 # then errback, the second command will raise QueueCleared. Ahhh!
320 # then errback, the second command will raise QueueCleared. Ahhh!
321 d = gatherBoth(dList,
321 d = gatherBoth(dList,
322 fireOnOneErrback=0,
322 fireOnOneErrback=0,
323 consumeErrors=1,
323 consumeErrors=1,
324 logErrors=0)
324 logErrors=0)
325 d.addCallback(error.collect_exceptions, methodName)
325 d.addCallback(error.collect_exceptions, methodName)
326 return d
326 return d
327
327
328 #---------------------------------------------------------------------------
328 #---------------------------------------------------------------------------
329 # General IMultiEngine methods
329 # General IMultiEngine methods
330 #---------------------------------------------------------------------------
330 #---------------------------------------------------------------------------
331
331
332 def get_ids(self):
332 def get_ids(self):
333 return defer.succeed(self.engines.keys())
333 return defer.succeed(self.engines.keys())
334
334
335 #---------------------------------------------------------------------------
335 #---------------------------------------------------------------------------
336 # IEngineMultiplexer methods
336 # IEngineMultiplexer methods
337 #---------------------------------------------------------------------------
337 #---------------------------------------------------------------------------
338
338
339 def execute(self, lines, targets='all'):
339 def execute(self, lines, targets='all'):
340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341
341
342 def push(self, ns, targets='all'):
342 def push(self, ns, targets='all'):
343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344
344
345 def pull(self, keys, targets='all'):
345 def pull(self, keys, targets='all'):
346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347
347
348 def push_function(self, ns, targets='all'):
348 def push_function(self, ns, targets='all'):
349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350
350
351 def pull_function(self, keys, targets='all'):
351 def pull_function(self, keys, targets='all'):
352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353
353
354 def get_result(self, i=None, targets='all'):
354 def get_result(self, i=None, targets='all'):
355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356
356
357 def reset(self, targets='all'):
357 def reset(self, targets='all'):
358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359
359
360 def keys(self, targets='all'):
360 def keys(self, targets='all'):
361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362
362
363 def kill(self, controller=False, targets='all'):
363 def kill(self, controller=False, targets='all'):
364 if controller:
364 if controller:
365 targets = 'all'
365 targets = 'all'
366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 if controller:
367 if controller:
368 log.msg("Killing controller")
368 log.msg("Killing controller")
369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 # Consume any weird stuff coming back
370 # Consume any weird stuff coming back
371 d.addBoth(lambda _: None)
371 d.addBoth(lambda _: None)
372 return d
372 return d
373
373
374 def push_serialized(self, namespace, targets='all'):
374 def push_serialized(self, namespace, targets='all'):
375 for k, v in namespace.iteritems():
375 for k, v in namespace.iteritems():
376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 return d
378 return d
379
379
380 def pull_serialized(self, keys, targets='all'):
380 def pull_serialized(self, keys, targets='all'):
381 try:
381 try:
382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 return defer.fail(failure.Failure())
384 return defer.fail(failure.Failure())
385 else:
385 else:
386 for d in dList:
386 for d in dList:
387 d.addCallback(self._logSizes)
387 d.addCallback(self._logSizes)
388 d = gatherBoth(dList,
388 d = gatherBoth(dList,
389 fireOnOneErrback=0,
389 fireOnOneErrback=0,
390 consumeErrors=1,
390 consumeErrors=1,
391 logErrors=0)
391 logErrors=0)
392 d.addCallback(error.collect_exceptions, 'pull_serialized')
392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 return d
393 return d
394
394
395 def _logSizes(self, listOfSerialized):
395 def _logSizes(self, listOfSerialized):
396 if isinstance(listOfSerialized, (list, tuple)):
396 if isinstance(listOfSerialized, (list, tuple)):
397 for s in listOfSerialized:
397 for s in listOfSerialized:
398 log.msg("Pulled object is %f MB" % s.getDataSize())
398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 else:
399 else:
400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 return listOfSerialized
401 return listOfSerialized
402
402
403 def clear_queue(self, targets='all'):
403 def clear_queue(self, targets='all'):
404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405
405
406 def queue_status(self, targets='all'):
406 def queue_status(self, targets='all'):
407 log.msg("Getting queue status on %r" % targets)
407 log.msg("Getting queue status on %r" % targets)
408 try:
408 try:
409 engines = self.engineList(targets)
409 engines = self.engineList(targets)
410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 return defer.fail(failure.Failure())
411 return defer.fail(failure.Failure())
412 else:
412 else:
413 dList = []
413 dList = []
414 for e in engines:
414 for e in engines:
415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 d = gatherBoth(dList,
416 d = gatherBoth(dList,
417 fireOnOneErrback=0,
417 fireOnOneErrback=0,
418 consumeErrors=1,
418 consumeErrors=1,
419 logErrors=0)
419 logErrors=0)
420 d.addCallback(error.collect_exceptions, 'queue_status')
420 d.addCallback(error.collect_exceptions, 'queue_status')
421 return d
421 return d
422
422
423 def get_properties(self, keys=None, targets='all'):
423 def get_properties(self, keys=None, targets='all'):
424 log.msg("Getting properties on %r" % targets)
424 log.msg("Getting properties on %r" % targets)
425 try:
425 try:
426 engines = self.engineList(targets)
426 engines = self.engineList(targets)
427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 return defer.fail(failure.Failure())
428 return defer.fail(failure.Failure())
429 else:
429 else:
430 dList = [e.get_properties(keys) for e in engines]
430 dList = [e.get_properties(keys) for e in engines]
431 d = gatherBoth(dList,
431 d = gatherBoth(dList,
432 fireOnOneErrback=0,
432 fireOnOneErrback=0,
433 consumeErrors=1,
433 consumeErrors=1,
434 logErrors=0)
434 logErrors=0)
435 d.addCallback(error.collect_exceptions, 'get_properties')
435 d.addCallback(error.collect_exceptions, 'get_properties')
436 return d
436 return d
437
437
438 def set_properties(self, properties, targets='all'):
438 def set_properties(self, properties, targets='all'):
439 log.msg("Setting properties on %r" % targets)
439 log.msg("Setting properties on %r" % targets)
440 try:
440 try:
441 engines = self.engineList(targets)
441 engines = self.engineList(targets)
442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 return defer.fail(failure.Failure())
443 return defer.fail(failure.Failure())
444 else:
444 else:
445 dList = [e.set_properties(properties) for e in engines]
445 dList = [e.set_properties(properties) for e in engines]
446 d = gatherBoth(dList,
446 d = gatherBoth(dList,
447 fireOnOneErrback=0,
447 fireOnOneErrback=0,
448 consumeErrors=1,
448 consumeErrors=1,
449 logErrors=0)
449 logErrors=0)
450 d.addCallback(error.collect_exceptions, 'set_properties')
450 d.addCallback(error.collect_exceptions, 'set_properties')
451 return d
451 return d
452
452
453 def has_properties(self, keys, targets='all'):
453 def has_properties(self, keys, targets='all'):
454 log.msg("Checking properties on %r" % targets)
454 log.msg("Checking properties on %r" % targets)
455 try:
455 try:
456 engines = self.engineList(targets)
456 engines = self.engineList(targets)
457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 return defer.fail(failure.Failure())
458 return defer.fail(failure.Failure())
459 else:
459 else:
460 dList = [e.has_properties(keys) for e in engines]
460 dList = [e.has_properties(keys) for e in engines]
461 d = gatherBoth(dList,
461 d = gatherBoth(dList,
462 fireOnOneErrback=0,
462 fireOnOneErrback=0,
463 consumeErrors=1,
463 consumeErrors=1,
464 logErrors=0)
464 logErrors=0)
465 d.addCallback(error.collect_exceptions, 'has_properties')
465 d.addCallback(error.collect_exceptions, 'has_properties')
466 return d
466 return d
467
467
468 def del_properties(self, keys, targets='all'):
468 def del_properties(self, keys, targets='all'):
469 log.msg("Deleting properties on %r" % targets)
469 log.msg("Deleting properties on %r" % targets)
470 try:
470 try:
471 engines = self.engineList(targets)
471 engines = self.engineList(targets)
472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 return defer.fail(failure.Failure())
473 return defer.fail(failure.Failure())
474 else:
474 else:
475 dList = [e.del_properties(keys) for e in engines]
475 dList = [e.del_properties(keys) for e in engines]
476 d = gatherBoth(dList,
476 d = gatherBoth(dList,
477 fireOnOneErrback=0,
477 fireOnOneErrback=0,
478 consumeErrors=1,
478 consumeErrors=1,
479 logErrors=0)
479 logErrors=0)
480 d.addCallback(error.collect_exceptions, 'del_properties')
480 d.addCallback(error.collect_exceptions, 'del_properties')
481 return d
481 return d
482
482
483 def clear_properties(self, targets='all'):
483 def clear_properties(self, targets='all'):
484 log.msg("Clearing properties on %r" % targets)
484 log.msg("Clearing properties on %r" % targets)
485 try:
485 try:
486 engines = self.engineList(targets)
486 engines = self.engineList(targets)
487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 return defer.fail(failure.Failure())
488 return defer.fail(failure.Failure())
489 else:
489 else:
490 dList = [e.clear_properties() for e in engines]
490 dList = [e.clear_properties() for e in engines]
491 d = gatherBoth(dList,
491 d = gatherBoth(dList,
492 fireOnOneErrback=0,
492 fireOnOneErrback=0,
493 consumeErrors=1,
493 consumeErrors=1,
494 logErrors=0)
494 logErrors=0)
495 d.addCallback(error.collect_exceptions, 'clear_properties')
495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 return d
496 return d
497
497
498
498
499 components.registerAdapter(MultiEngine,
499 components.registerAdapter(MultiEngine,
500 IControllerBase,
500 IControllerBase,
501 IMultiEngine)
501 IMultiEngine)
502
502
503
503
504 #-------------------------------------------------------------------------------
504 #-------------------------------------------------------------------------------
505 # Interfaces for the Synchronous MultiEngine
505 # Interfaces for the Synchronous MultiEngine
506 #-------------------------------------------------------------------------------
506 #-------------------------------------------------------------------------------
507
507
508 class ISynchronousEngineMultiplexer(Interface):
508 class ISynchronousEngineMultiplexer(Interface):
509 pass
509 pass
510
510
511
511
512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 """Synchronous, two-phase version of IMultiEngine.
513 """Synchronous, two-phase version of IMultiEngine.
514
514
515 Methods in this interface are identical to those of IMultiEngine, but they
515 Methods in this interface are identical to those of IMultiEngine, but they
516 take one additional argument:
516 take one additional argument:
517
517
518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519
519
520 :Parameters:
520 :Parameters:
521 block : boolean
521 block : boolean
522 Should the method return a deferred to a deferredID or the
522 Should the method return a deferred to a deferredID or the
523 actual result. If block=False a deferred to a deferredID is
523 actual result. If block=False a deferred to a deferredID is
524 returned and the user must call `get_pending_deferred` at a later
524 returned and the user must call `get_pending_deferred` at a later
525 point. If block=True, a deferred to the actual result comes back.
525 point. If block=True, a deferred to the actual result comes back.
526 """
526 """
527 def get_pending_deferred(deferredID, block=True):
527 def get_pending_deferred(deferredID, block=True):
528 """"""
528 """"""
529
529
530 def clear_pending_deferreds():
530 def clear_pending_deferreds():
531 """"""
531 """"""
532
532
533
533
534 #-------------------------------------------------------------------------------
534 #-------------------------------------------------------------------------------
535 # Implementation of the Synchronous MultiEngine
535 # Implementation of the Synchronous MultiEngine
536 #-------------------------------------------------------------------------------
536 #-------------------------------------------------------------------------------
537
537
538 class SynchronousMultiEngine(PendingDeferredManager):
538 class SynchronousMultiEngine(PendingDeferredManager):
539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540
540
541 Warning, this class uses a decorator that currently uses **kwargs.
541 Warning, this class uses a decorator that currently uses **kwargs.
542 Because of this block must be passed as a kwarg, not positionally.
542 Because of this block must be passed as a kwarg, not positionally.
543 """
543 """
544
544
545 implements(ISynchronousMultiEngine)
545 implements(ISynchronousMultiEngine)
546
546
547 def __init__(self, multiengine):
547 def __init__(self, multiengine):
548 self.multiengine = multiengine
548 self.multiengine = multiengine
549 PendingDeferredManager.__init__(self)
549 PendingDeferredManager.__init__(self)
550
550
551 #---------------------------------------------------------------------------
551 #---------------------------------------------------------------------------
552 # Decorated pending deferred methods
552 # Decorated pending deferred methods
553 #---------------------------------------------------------------------------
553 #---------------------------------------------------------------------------
554
554
555 @profile
556 @two_phase
555 @two_phase
557 def execute(self, lines, targets='all'):
556 def execute(self, lines, targets='all'):
558 d = self.multiengine.execute(lines, targets)
557 d = self.multiengine.execute(lines, targets)
559 return d
558 return d
560
559
561 @two_phase
560 @two_phase
562 def push(self, namespace, targets='all'):
561 def push(self, namespace, targets='all'):
563 return self.multiengine.push(namespace, targets)
562 return self.multiengine.push(namespace, targets)
564
563
565 @two_phase
564 @two_phase
566 def pull(self, keys, targets='all'):
565 def pull(self, keys, targets='all'):
567 d = self.multiengine.pull(keys, targets)
566 d = self.multiengine.pull(keys, targets)
568 return d
567 return d
569
568
570 @two_phase
569 @two_phase
571 def push_function(self, namespace, targets='all'):
570 def push_function(self, namespace, targets='all'):
572 return self.multiengine.push_function(namespace, targets)
571 return self.multiengine.push_function(namespace, targets)
573
572
574 @two_phase
573 @two_phase
575 def pull_function(self, keys, targets='all'):
574 def pull_function(self, keys, targets='all'):
576 d = self.multiengine.pull_function(keys, targets)
575 d = self.multiengine.pull_function(keys, targets)
577 return d
576 return d
578
577
579 @two_phase
578 @two_phase
580 def get_result(self, i=None, targets='all'):
579 def get_result(self, i=None, targets='all'):
581 return self.multiengine.get_result(i, targets='all')
580 return self.multiengine.get_result(i, targets='all')
582
581
583 @two_phase
582 @two_phase
584 def reset(self, targets='all'):
583 def reset(self, targets='all'):
585 return self.multiengine.reset(targets)
584 return self.multiengine.reset(targets)
586
585
587 @two_phase
586 @two_phase
588 def keys(self, targets='all'):
587 def keys(self, targets='all'):
589 return self.multiengine.keys(targets)
588 return self.multiengine.keys(targets)
590
589
591 @two_phase
590 @two_phase
592 def kill(self, controller=False, targets='all'):
591 def kill(self, controller=False, targets='all'):
593 return self.multiengine.kill(controller, targets)
592 return self.multiengine.kill(controller, targets)
594
593
595 @two_phase
594 @two_phase
596 def push_serialized(self, namespace, targets='all'):
595 def push_serialized(self, namespace, targets='all'):
597 return self.multiengine.push_serialized(namespace, targets)
596 return self.multiengine.push_serialized(namespace, targets)
598
597
599 @two_phase
598 @two_phase
600 def pull_serialized(self, keys, targets='all'):
599 def pull_serialized(self, keys, targets='all'):
601 return self.multiengine.pull_serialized(keys, targets)
600 return self.multiengine.pull_serialized(keys, targets)
602
601
603 @two_phase
602 @two_phase
604 def clear_queue(self, targets='all'):
603 def clear_queue(self, targets='all'):
605 return self.multiengine.clear_queue(targets)
604 return self.multiengine.clear_queue(targets)
606
605
607 @two_phase
606 @two_phase
608 def queue_status(self, targets='all'):
607 def queue_status(self, targets='all'):
609 return self.multiengine.queue_status(targets)
608 return self.multiengine.queue_status(targets)
610
609
611 @two_phase
610 @two_phase
612 def set_properties(self, properties, targets='all'):
611 def set_properties(self, properties, targets='all'):
613 return self.multiengine.set_properties(properties, targets)
612 return self.multiengine.set_properties(properties, targets)
614
613
615 @two_phase
614 @two_phase
616 def get_properties(self, keys=None, targets='all'):
615 def get_properties(self, keys=None, targets='all'):
617 return self.multiengine.get_properties(keys, targets)
616 return self.multiengine.get_properties(keys, targets)
618
617
619 @two_phase
618 @two_phase
620 def has_properties(self, keys, targets='all'):
619 def has_properties(self, keys, targets='all'):
621 return self.multiengine.has_properties(keys, targets)
620 return self.multiengine.has_properties(keys, targets)
622
621
623 @two_phase
622 @two_phase
624 def del_properties(self, keys, targets='all'):
623 def del_properties(self, keys, targets='all'):
625 return self.multiengine.del_properties(keys, targets)
624 return self.multiengine.del_properties(keys, targets)
626
625
627 @two_phase
626 @two_phase
628 def clear_properties(self, targets='all'):
627 def clear_properties(self, targets='all'):
629 return self.multiengine.clear_properties(targets)
628 return self.multiengine.clear_properties(targets)
630
629
631 #---------------------------------------------------------------------------
630 #---------------------------------------------------------------------------
632 # IMultiEngine methods
631 # IMultiEngine methods
633 #---------------------------------------------------------------------------
632 #---------------------------------------------------------------------------
634
633
635 def get_ids(self):
634 def get_ids(self):
636 """Return a list of registered engine ids.
635 """Return a list of registered engine ids.
637
636
638 Never use the two phase block/non-block stuff for this.
637 Never use the two phase block/non-block stuff for this.
639 """
638 """
640 return self.multiengine.get_ids()
639 return self.multiengine.get_ids()
641
640
642
641
643 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
644
643
645
644
646 #-------------------------------------------------------------------------------
645 #-------------------------------------------------------------------------------
647 # Various high-level interfaces that can be used as MultiEngine mix-ins
646 # Various high-level interfaces that can be used as MultiEngine mix-ins
648 #-------------------------------------------------------------------------------
647 #-------------------------------------------------------------------------------
649
648
650 #-------------------------------------------------------------------------------
649 #-------------------------------------------------------------------------------
651 # IMultiEngineCoordinator
650 # IMultiEngineCoordinator
652 #-------------------------------------------------------------------------------
651 #-------------------------------------------------------------------------------
653
652
654 class IMultiEngineCoordinator(Interface):
653 class IMultiEngineCoordinator(Interface):
655 """Methods that work on multiple engines explicitly."""
654 """Methods that work on multiple engines explicitly."""
656
655
657 def scatter(key, seq, dist='b', flatten=False, targets='all'):
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
658 """Partition and distribute a sequence to targets."""
657 """Partition and distribute a sequence to targets."""
659
658
660 def gather(key, dist='b', targets='all'):
659 def gather(key, dist='b', targets='all'):
661 """Gather object key from targets."""
660 """Gather object key from targets."""
662
661
663 def raw_map(func, seqs, dist='b', targets='all'):
662 def raw_map(func, seqs, dist='b', targets='all'):
664 """
663 """
665 A parallelized version of Python's builtin `map` function.
664 A parallelized version of Python's builtin `map` function.
666
665
667 This has a slightly different syntax than the builtin `map`.
666 This has a slightly different syntax than the builtin `map`.
668 This is needed because we need to have keyword arguments and thus
667 This is needed because we need to have keyword arguments and thus
669 can't use *args to capture all the sequences. Instead, they must
668 can't use *args to capture all the sequences. Instead, they must
670 be passed in a list or tuple.
669 be passed in a list or tuple.
671
670
672 The equivalence is:
671 The equivalence is:
673
672
674 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
675
674
676 Most users will want to use parallel functions or the `mapper`
675 Most users will want to use parallel functions or the `mapper`
677 and `map` methods for an API that follows that of the builtin
676 and `map` methods for an API that follows that of the builtin
678 `map`.
677 `map`.
679 """
678 """
680
679
681
680
682 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
683 """Methods that work on multiple engines explicitly."""
682 """Methods that work on multiple engines explicitly."""
684
683
685 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
686 """Partition and distribute a sequence to targets."""
685 """Partition and distribute a sequence to targets."""
687
686
688 def gather(key, dist='b', targets='all', block=True):
687 def gather(key, dist='b', targets='all', block=True):
689 """Gather object key from targets"""
688 """Gather object key from targets"""
690
689
691 def raw_map(func, seqs, dist='b', targets='all', block=True):
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
692 """
691 """
693 A parallelized version of Python's builtin map.
692 A parallelized version of Python's builtin map.
694
693
695 This has a slightly different syntax than the builtin `map`.
694 This has a slightly different syntax than the builtin `map`.
696 This is needed because we need to have keyword arguments and thus
695 This is needed because we need to have keyword arguments and thus
697 can't use *args to capture all the sequences. Instead, they must
696 can't use *args to capture all the sequences. Instead, they must
698 be passed in a list or tuple.
697 be passed in a list or tuple.
699
698
700 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
701
700
702 Most users will want to use parallel functions or the `mapper`
701 Most users will want to use parallel functions or the `mapper`
703 and `map` methods for an API that follows that of the builtin
702 and `map` methods for an API that follows that of the builtin
704 `map`.
703 `map`.
705 """
704 """
706
705
707
706
708 #-------------------------------------------------------------------------------
707 #-------------------------------------------------------------------------------
709 # IMultiEngineExtras
708 # IMultiEngineExtras
710 #-------------------------------------------------------------------------------
709 #-------------------------------------------------------------------------------
711
710
712 class IMultiEngineExtras(Interface):
711 class IMultiEngineExtras(Interface):
713
712
714 def zip_pull(targets, keys):
713 def zip_pull(targets, keys):
715 """
714 """
716 Pull, but return results in a different format from `pull`.
715 Pull, but return results in a different format from `pull`.
717
716
718 This method basically returns zip(pull(targets, *keys)), with a few
717 This method basically returns zip(pull(targets, *keys)), with a few
719 edge cases handled differently. Users of chainsaw will find this format
718 edge cases handled differently. Users of chainsaw will find this format
720 familiar.
719 familiar.
721 """
720 """
722
721
723 def run(targets, fname):
722 def run(targets, fname):
724 """Run a .py file on targets."""
723 """Run a .py file on targets."""
725
724
726
725
727 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
728 def zip_pull(targets, keys, block=True):
727 def zip_pull(targets, keys, block=True):
729 """
728 """
730 Pull, but return results in a different format from `pull`.
729 Pull, but return results in a different format from `pull`.
731
730
732 This method basically returns zip(pull(targets, *keys)), with a few
731 This method basically returns zip(pull(targets, *keys)), with a few
733 edge cases handled differently. Users of chainsaw will find this format
732 edge cases handled differently. Users of chainsaw will find this format
734 familiar.
733 familiar.
735 """
734 """
736
735
737 def run(targets, fname, block=True):
736 def run(targets, fname, block=True):
738 """Run a .py file on targets."""
737 """Run a .py file on targets."""
739
738
740 #-------------------------------------------------------------------------------
739 #-------------------------------------------------------------------------------
741 # The full MultiEngine interface
740 # The full MultiEngine interface
742 #-------------------------------------------------------------------------------
741 #-------------------------------------------------------------------------------
743
742
744 class IFullMultiEngine(IMultiEngine,
743 class IFullMultiEngine(IMultiEngine,
745 IMultiEngineCoordinator,
744 IMultiEngineCoordinator,
746 IMultiEngineExtras):
745 IMultiEngineExtras):
747 pass
746 pass
748
747
749
748
750 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
751 ISynchronousMultiEngineCoordinator,
750 ISynchronousMultiEngineCoordinator,
752 ISynchronousMultiEngineExtras):
751 ISynchronousMultiEngineExtras):
753 pass
752 pass
754
753
@@ -1,658 +1,723 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import get_ipython_dir, num_cpus
32 from IPython.genutils import get_ipython_dir, num_cpus
33 from IPython.kernel.fcutil import have_crypto
33 from IPython.kernel.fcutil import have_crypto
34 from IPython.kernel.error import SecurityError
34 from IPython.kernel.error import SecurityError
35 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.fcutil import have_crypto
36 from IPython.kernel.twistedutil import gatherBoth
36 from IPython.kernel.twistedutil import gatherBoth
37 from IPython.kernel.util import printer
37 from IPython.kernel.util import printer
38
38
39
39
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41 # General process handling code
41 # General process handling code
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44 def find_exe(cmd):
44 def find_exe(cmd):
45 try:
45 try:
46 import win32api
46 import win32api
47 except ImportError:
47 except ImportError:
48 raise ImportError('you need to have pywin32 installed for this to work')
48 raise ImportError('you need to have pywin32 installed for this to work')
49 else:
49 else:
50 try:
50 try:
51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 except:
52 except:
53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 return path
54 return path
55
55
56 class ProcessStateError(Exception):
56 class ProcessStateError(Exception):
57 pass
57 pass
58
58
59 class UnknownStatus(Exception):
59 class UnknownStatus(Exception):
60 pass
60 pass
61
61
62 class LauncherProcessProtocol(ProcessProtocol):
62 class LauncherProcessProtocol(ProcessProtocol):
63 """
63 """
64 A ProcessProtocol to go with the ProcessLauncher.
64 A ProcessProtocol to go with the ProcessLauncher.
65 """
65 """
66 def __init__(self, process_launcher):
66 def __init__(self, process_launcher):
67 self.process_launcher = process_launcher
67 self.process_launcher = process_launcher
68
68
69 def connectionMade(self):
69 def connectionMade(self):
70 self.process_launcher.fire_start_deferred(self.transport.pid)
70 self.process_launcher.fire_start_deferred(self.transport.pid)
71
71
72 def processEnded(self, status):
72 def processEnded(self, status):
73 value = status.value
73 value = status.value
74 if isinstance(value, ProcessDone):
74 if isinstance(value, ProcessDone):
75 self.process_launcher.fire_stop_deferred(0)
75 self.process_launcher.fire_stop_deferred(0)
76 elif isinstance(value, ProcessTerminated):
76 elif isinstance(value, ProcessTerminated):
77 self.process_launcher.fire_stop_deferred(
77 self.process_launcher.fire_stop_deferred(
78 {'exit_code':value.exitCode,
78 {'exit_code':value.exitCode,
79 'signal':value.signal,
79 'signal':value.signal,
80 'status':value.status
80 'status':value.status
81 }
81 }
82 )
82 )
83 else:
83 else:
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85
85
86 def outReceived(self, data):
86 def outReceived(self, data):
87 log.msg(data)
87 log.msg(data)
88
88
89 def errReceived(self, data):
89 def errReceived(self, data):
90 log.err(data)
90 log.err(data)
91
91
92 class ProcessLauncher(object):
92 class ProcessLauncher(object):
93 """
93 """
94 Start and stop an external process in an asynchronous manner.
94 Start and stop an external process in an asynchronous manner.
95
95
96 Currently this uses deferreds to notify other parties of process state
96 Currently this uses deferreds to notify other parties of process state
97 changes. This is an awkward design and should be moved to using
97 changes. This is an awkward design and should be moved to using
98 a formal NotificationCenter.
98 a formal NotificationCenter.
99 """
99 """
100 def __init__(self, cmd_and_args):
100 def __init__(self, cmd_and_args):
101 self.cmd = cmd_and_args[0]
101 self.cmd = cmd_and_args[0]
102 self.args = cmd_and_args
102 self.args = cmd_and_args
103 self._reset()
103 self._reset()
104
104
105 def _reset(self):
105 def _reset(self):
106 self.process_protocol = None
106 self.process_protocol = None
107 self.pid = None
107 self.pid = None
108 self.start_deferred = None
108 self.start_deferred = None
109 self.stop_deferreds = []
109 self.stop_deferreds = []
110 self.state = 'before' # before, running, or after
110 self.state = 'before' # before, running, or after
111
111
112 @property
112 @property
113 def running(self):
113 def running(self):
114 if self.state == 'running':
114 if self.state == 'running':
115 return True
115 return True
116 else:
116 else:
117 return False
117 return False
118
118
119 def fire_start_deferred(self, pid):
119 def fire_start_deferred(self, pid):
120 self.pid = pid
120 self.pid = pid
121 self.state = 'running'
121 self.state = 'running'
122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 self.start_deferred.callback(pid)
123 self.start_deferred.callback(pid)
124
124
125 def start(self):
125 def start(self):
126 if self.state == 'before':
126 if self.state == 'before':
127 self.process_protocol = LauncherProcessProtocol(self)
127 self.process_protocol = LauncherProcessProtocol(self)
128 self.start_deferred = defer.Deferred()
128 self.start_deferred = defer.Deferred()
129 self.process_transport = reactor.spawnProcess(
129 self.process_transport = reactor.spawnProcess(
130 self.process_protocol,
130 self.process_protocol,
131 self.cmd,
131 self.cmd,
132 self.args,
132 self.args,
133 env=os.environ
133 env=os.environ
134 )
134 )
135 return self.start_deferred
135 return self.start_deferred
136 else:
136 else:
137 s = 'the process has already been started and has state: %r' % \
137 s = 'the process has already been started and has state: %r' % \
138 self.state
138 self.state
139 return defer.fail(ProcessStateError(s))
139 return defer.fail(ProcessStateError(s))
140
140
141 def get_stop_deferred(self):
141 def get_stop_deferred(self):
142 if self.state == 'running' or self.state == 'before':
142 if self.state == 'running' or self.state == 'before':
143 d = defer.Deferred()
143 d = defer.Deferred()
144 self.stop_deferreds.append(d)
144 self.stop_deferreds.append(d)
145 return d
145 return d
146 else:
146 else:
147 s = 'this process is already complete'
147 s = 'this process is already complete'
148 return defer.fail(ProcessStateError(s))
148 return defer.fail(ProcessStateError(s))
149
149
150 def fire_stop_deferred(self, exit_code):
150 def fire_stop_deferred(self, exit_code):
151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 self.state = 'after'
152 self.state = 'after'
153 for d in self.stop_deferreds:
153 for d in self.stop_deferreds:
154 d.callback(exit_code)
154 d.callback(exit_code)
155
155
156 def signal(self, sig):
156 def signal(self, sig):
157 """
157 """
158 Send a signal to the process.
158 Send a signal to the process.
159
159
160 The argument sig can be ('KILL','INT', etc.) or any signal number.
160 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 """
161 """
162 if self.state == 'running':
162 if self.state == 'running':
163 self.process_transport.signalProcess(sig)
163 self.process_transport.signalProcess(sig)
164
164
165 # def __del__(self):
165 # def __del__(self):
166 # self.signal('KILL')
166 # self.signal('KILL')
167
167
168 def interrupt_then_kill(self, delay=1.0):
168 def interrupt_then_kill(self, delay=1.0):
169 self.signal('INT')
169 self.signal('INT')
170 reactor.callLater(delay, self.signal, 'KILL')
170 reactor.callLater(delay, self.signal, 'KILL')
171
171
172
172
173 #-----------------------------------------------------------------------------
173 #-----------------------------------------------------------------------------
174 # Code for launching controller and engines
174 # Code for launching controller and engines
175 #-----------------------------------------------------------------------------
175 #-----------------------------------------------------------------------------
176
176
177
177
178 class ControllerLauncher(ProcessLauncher):
178 class ControllerLauncher(ProcessLauncher):
179
179
180 def __init__(self, extra_args=None):
180 def __init__(self, extra_args=None):
181 if sys.platform == 'win32':
181 if sys.platform == 'win32':
182 # This logic is needed because the ipcontroller script doesn't
182 # This logic is needed because the ipcontroller script doesn't
183 # always get installed in the same way or in the same location.
183 # always get installed in the same way or in the same location.
184 from IPython.kernel.scripts import ipcontroller
184 from IPython.kernel.scripts import ipcontroller
185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 # The -u option here turns on unbuffered output, which is required
186 # The -u option here turns on unbuffered output, which is required
187 # on Win32 to prevent wierd conflict and problems with Twisted
187 # on Win32 to prevent wierd conflict and problems with Twisted
188 args = [find_exe('python'), '-u', script_location]
188 args = [find_exe('python'), '-u', script_location]
189 else:
189 else:
190 args = ['ipcontroller']
190 args = ['ipcontroller']
191 self.extra_args = extra_args
191 self.extra_args = extra_args
192 if extra_args is not None:
192 if extra_args is not None:
193 args.extend(extra_args)
193 args.extend(extra_args)
194
194
195 ProcessLauncher.__init__(self, args)
195 ProcessLauncher.__init__(self, args)
196
196
197
197
198 class EngineLauncher(ProcessLauncher):
198 class EngineLauncher(ProcessLauncher):
199
199
200 def __init__(self, extra_args=None):
200 def __init__(self, extra_args=None):
201 if sys.platform == 'win32':
201 if sys.platform == 'win32':
202 # This logic is needed because the ipcontroller script doesn't
202 # This logic is needed because the ipcontroller script doesn't
203 # always get installed in the same way or in the same location.
203 # always get installed in the same way or in the same location.
204 from IPython.kernel.scripts import ipengine
204 from IPython.kernel.scripts import ipengine
205 script_location = ipengine.__file__.replace('.pyc', '.py')
205 script_location = ipengine.__file__.replace('.pyc', '.py')
206 # The -u option here turns on unbuffered output, which is required
206 # The -u option here turns on unbuffered output, which is required
207 # on Win32 to prevent wierd conflict and problems with Twisted
207 # on Win32 to prevent wierd conflict and problems with Twisted
208 args = [find_exe('python'), '-u', script_location]
208 args = [find_exe('python'), '-u', script_location]
209 else:
209 else:
210 args = ['ipengine']
210 args = ['ipengine']
211 self.extra_args = extra_args
211 self.extra_args = extra_args
212 if extra_args is not None:
212 if extra_args is not None:
213 args.extend(extra_args)
213 args.extend(extra_args)
214
214
215 ProcessLauncher.__init__(self, args)
215 ProcessLauncher.__init__(self, args)
216
216
217
217
218 class LocalEngineSet(object):
218 class LocalEngineSet(object):
219
219
220 def __init__(self, extra_args=None):
220 def __init__(self, extra_args=None):
221 self.extra_args = extra_args
221 self.extra_args = extra_args
222 self.launchers = []
222 self.launchers = []
223
223
224 def start(self, n):
224 def start(self, n):
225 dlist = []
225 dlist = []
226 for i in range(n):
226 for i in range(n):
227 el = EngineLauncher(extra_args=self.extra_args)
227 el = EngineLauncher(extra_args=self.extra_args)
228 d = el.start()
228 d = el.start()
229 self.launchers.append(el)
229 self.launchers.append(el)
230 dlist.append(d)
230 dlist.append(d)
231 dfinal = gatherBoth(dlist, consumeErrors=True)
231 dfinal = gatherBoth(dlist, consumeErrors=True)
232 dfinal.addCallback(self._handle_start)
232 dfinal.addCallback(self._handle_start)
233 return dfinal
233 return dfinal
234
234
235 def _handle_start(self, r):
235 def _handle_start(self, r):
236 log.msg('Engines started with pids: %r' % r)
236 log.msg('Engines started with pids: %r' % r)
237 return r
237 return r
238
238
239 def _handle_stop(self, r):
239 def _handle_stop(self, r):
240 log.msg('Engines received signal: %r' % r)
240 log.msg('Engines received signal: %r' % r)
241 return r
241 return r
242
242
243 def signal(self, sig):
243 def signal(self, sig):
244 dlist = []
244 dlist = []
245 for el in self.launchers:
245 for el in self.launchers:
246 d = el.get_stop_deferred()
246 d = el.get_stop_deferred()
247 dlist.append(d)
247 dlist.append(d)
248 el.signal(sig)
248 el.signal(sig)
249 dfinal = gatherBoth(dlist, consumeErrors=True)
249 dfinal = gatherBoth(dlist, consumeErrors=True)
250 dfinal.addCallback(self._handle_stop)
250 dfinal.addCallback(self._handle_stop)
251 return dfinal
251 return dfinal
252
252
253 def interrupt_then_kill(self, delay=1.0):
253 def interrupt_then_kill(self, delay=1.0):
254 dlist = []
254 dlist = []
255 for el in self.launchers:
255 for el in self.launchers:
256 d = el.get_stop_deferred()
256 d = el.get_stop_deferred()
257 dlist.append(d)
257 dlist.append(d)
258 el.interrupt_then_kill(delay)
258 el.interrupt_then_kill(delay)
259 dfinal = gatherBoth(dlist, consumeErrors=True)
259 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal.addCallback(self._handle_stop)
260 dfinal.addCallback(self._handle_stop)
261 return dfinal
261 return dfinal
262
262
263
263
264 class BatchEngineSet(object):
264 class BatchEngineSet(object):
265
265
266 # Subclasses must fill these in. See PBSEngineSet
266 # Subclasses must fill these in. See PBSEngineSet
267 submit_command = ''
267 submit_command = ''
268 delete_command = ''
268 delete_command = ''
269 job_id_regexp = ''
269 job_id_regexp = ''
270
270
271 def __init__(self, template_file, **kwargs):
271 def __init__(self, template_file, **kwargs):
272 self.template_file = template_file
272 self.template_file = template_file
273 self.context = {}
273 self.context = {}
274 self.context.update(kwargs)
274 self.context.update(kwargs)
275 self.batch_file = self.template_file+'-run'
275 self.batch_file = self.template_file+'-run'
276
276
277 def parse_job_id(self, output):
277 def parse_job_id(self, output):
278 m = re.match(self.job_id_regexp, output)
278 m = re.match(self.job_id_regexp, output)
279 if m is not None:
279 if m is not None:
280 job_id = m.group()
280 job_id = m.group()
281 else:
281 else:
282 raise Exception("job id couldn't be determined: %s" % output)
282 raise Exception("job id couldn't be determined: %s" % output)
283 self.job_id = job_id
283 self.job_id = job_id
284 log.msg('Job started with job id: %r' % job_id)
284 log.msg('Job started with job id: %r' % job_id)
285 return job_id
285 return job_id
286
286
287 def write_batch_script(self, n):
287 def write_batch_script(self, n):
288 self.context['n'] = n
288 self.context['n'] = n
289 template = open(self.template_file, 'r').read()
289 template = open(self.template_file, 'r').read()
290 log.msg('Using template for batch script: %s' % self.template_file)
290 log.msg('Using template for batch script: %s' % self.template_file)
291 script_as_string = Itpl.itplns(template, self.context)
291 script_as_string = Itpl.itplns(template, self.context)
292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
293 f = open(self.batch_file,'w')
293 f = open(self.batch_file,'w')
294 f.write(script_as_string)
294 f.write(script_as_string)
295 f.close()
295 f.close()
296
296
297 def handle_error(self, f):
297 def handle_error(self, f):
298 f.printTraceback()
298 f.printTraceback()
299 f.raiseException()
299 f.raiseException()
300
300
301 def start(self, n):
301 def start(self, n):
302 self.write_batch_script(n)
302 self.write_batch_script(n)
303 d = getProcessOutput(self.submit_command,
303 d = getProcessOutput(self.submit_command,
304 [self.batch_file],env=os.environ)
304 [self.batch_file],env=os.environ)
305 d.addCallback(self.parse_job_id)
305 d.addCallback(self.parse_job_id)
306 d.addErrback(self.handle_error)
306 d.addErrback(self.handle_error)
307 return d
307 return d
308
308
309 def kill(self):
309 def kill(self):
310 d = getProcessOutput(self.delete_command,
310 d = getProcessOutput(self.delete_command,
311 [self.job_id],env=os.environ)
311 [self.job_id],env=os.environ)
312 return d
312 return d
313
313
314 class PBSEngineSet(BatchEngineSet):
314 class PBSEngineSet(BatchEngineSet):
315
315
316 submit_command = 'qsub'
316 submit_command = 'qsub'
317 delete_command = 'qdel'
317 delete_command = 'qdel'
318 job_id_regexp = '\d+'
318 job_id_regexp = '\d+'
319
319
320 def __init__(self, template_file, **kwargs):
320 def __init__(self, template_file, **kwargs):
321 BatchEngineSet.__init__(self, template_file, **kwargs)
321 BatchEngineSet.__init__(self, template_file, **kwargs)
322
322
323 class SSHEngineSet(object):
323
324 sshx_template="""#!/bin/sh
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
325 "$@" &> /dev/null &
326 echo $!"""
326 echo $!
327
327 """
328 engine_killer_template="""#!/bin/sh
328
329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
329
332
330 ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM"""
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
331
336
332 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
339
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
344 """
345
333 self.temp_dir = tempfile.gettempdir()
346 self.temp_dir = tempfile.gettempdir()
334 if sshx != None:
347 if sshx is not None:
335 self.sshx = sshx
348 self.sshx = sshx
336 else:
349 else:
337 self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER'])
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
338 f = open(self.sshx, 'w')
355 f = open(self.sshx, 'w')
339 f.writelines(self.sshx_template)
356 f.writelines(self.sshx_template)
340 f.close()
357 f.close()
341 self.engine_command = ipengine
358 self.engine_command = ipengine
342 self.engine_hosts = engine_hosts
359 self.engine_hosts = engine_hosts
343 self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER'])
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
344 f = open(self.engine_killer, 'w')
365 f = open(self.engine_killer, 'w')
345 f.writelines(self.engine_killer_template)
366 f.writelines(self.engine_killer_template)
346 f.close()
367 f.close()
347
368
348 def start(self, send_furl=False):
369 def start(self, send_furl=False):
370 dlist = []
349 for host in self.engine_hosts.keys():
371 for host in self.engine_hosts.keys():
350 count = self.engine_hosts[host]
372 count = self.engine_hosts[host]
351 self._start(host, count, send_furl)
373 d = self._start(host, count, send_furl)
352
374 dlist.append(d)
353 def killall(self):
375 return gatherBoth(dlist, consumeErrors=True)
354 for host in self.engine_hosts.keys():
355 self._killall(host)
356
376
357 def _start(self, host_name, count=1, send_furl=False):
377 def _start(self, hostname, count=1, send_furl=False):
358
359 def _scp_sshx(d):
360 scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER'])
361 sshx_scp = scp_cmd.split()
362 print sshx_scp
363 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
364 d.addCallback(_exec_engine)
365
366 def _exec_engine(d):
367 exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command)
368 cmds = exec_engine.split()
369 print cmds
370 for i in range(count):
371 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
372
373 if send_furl:
378 if send_furl:
374 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name)
379 d = self._scp_furl(hostname)
375 cmd_list = scp_cmd.split()
376 cmd_list[1] = os.path.expanduser(cmd_list[1])
377 print cmd_list
378 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
379 d.addCallback(_scp_sshx)
380 else:
380 else:
381 _scp_sshx(d=None)
381 d = defer.succeed(None)
382
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 def _killall(self, host_name):
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 def _exec_err(d):
384 return d
385 if d.getErrorMessage()[-18:] != "No such process\\n\'":
386 raise d
387
385
388 def _exec_kill(d):
386 def _scp_furl(self, hostname):
389 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER'])
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
390 kill_cmd = kill_cmd.split()
388 cmd_list = scp_cmd.split()
391 print kill_cmd
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
392 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
390 log.msg('Copying furl file: %s' % scp_cmd)
393 d.addErrback(_exec_err)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
394 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER'])
392 return d
393
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
398 )
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
401 sshx_scp = scp_cmd.split()
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 return d
404
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
409 )
410 cmds = exec_engine.split()
411 dlist = []
412 log.msg("about to start engines...")
413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
418
419 def kill(self):
420 dlist = []
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
425
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
430 return d
431
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
435 hostname,
436 self.temp_dir,
437 os.environ['USER']
438 )
395 cmds = scp_cmd.split()
439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
396 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
397 d.addCallback(_exec_kill)
442 return d
398 d.addErrback(_exec_err)
443
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
447 self.temp_dir,
448 os.environ['USER']
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
399
454
455 def _exec_err(self, r):
456 log.msg(r)
400
457
401 #-----------------------------------------------------------------------------
458 #-----------------------------------------------------------------------------
402 # Main functions for the different types of clusters
459 # Main functions for the different types of clusters
403 #-----------------------------------------------------------------------------
460 #-----------------------------------------------------------------------------
404
461
405 # TODO:
462 # TODO:
406 # The logic in these codes should be moved into classes like LocalCluster
463 # The logic in these codes should be moved into classes like LocalCluster
407 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
408 # The main functions should then just parse the command line arguments, create
465 # The main functions should then just parse the command line arguments, create
409 # the appropriate class and call a 'start' method.
466 # the appropriate class and call a 'start' method.
410
467
411 def check_security(args, cont_args):
468 def check_security(args, cont_args):
412 if (not args.x or not args.y) and not have_crypto:
469 if (not args.x or not args.y) and not have_crypto:
413 log.err("""
470 log.err("""
414 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
415 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
416 reactor.stop()
473 reactor.stop()
417 return False
474 return False
418 if args.x:
475 if args.x:
419 cont_args.append('-x')
476 cont_args.append('-x')
420 if args.y:
477 if args.y:
421 cont_args.append('-y')
478 cont_args.append('-y')
422 return True
479 return True
423
480
424
481
425 def main_local(args):
482 def main_local(args):
426 cont_args = []
483 cont_args = []
427 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
428
485
429 # Check security settings before proceeding
486 # Check security settings before proceeding
430 if not check_security(args, cont_args):
487 if not check_security(args, cont_args):
431 return
488 return
432
489
433 cl = ControllerLauncher(extra_args=cont_args)
490 cl = ControllerLauncher(extra_args=cont_args)
434 dstart = cl.start()
491 dstart = cl.start()
435 def start_engines(cont_pid):
492 def start_engines(cont_pid):
436 engine_args = []
493 engine_args = []
437 engine_args.append('--logfile=%s' % \
494 engine_args.append('--logfile=%s' % \
438 pjoin(args.logdir,'ipengine%s-' % cont_pid))
495 pjoin(args.logdir,'ipengine%s-' % cont_pid))
439 eset = LocalEngineSet(extra_args=engine_args)
496 eset = LocalEngineSet(extra_args=engine_args)
440 def shutdown(signum, frame):
497 def shutdown(signum, frame):
441 log.msg('Stopping local cluster')
498 log.msg('Stopping local cluster')
442 # We are still playing with the times here, but these seem
499 # We are still playing with the times here, but these seem
443 # to be reliable in allowing everything to exit cleanly.
500 # to be reliable in allowing everything to exit cleanly.
444 eset.interrupt_then_kill(0.5)
501 eset.interrupt_then_kill(0.5)
445 cl.interrupt_then_kill(0.5)
502 cl.interrupt_then_kill(0.5)
446 reactor.callLater(1.0, reactor.stop)
503 reactor.callLater(1.0, reactor.stop)
447 signal.signal(signal.SIGINT,shutdown)
504 signal.signal(signal.SIGINT,shutdown)
448 d = eset.start(args.n)
505 d = eset.start(args.n)
449 return d
506 return d
450 def delay_start(cont_pid):
507 def delay_start(cont_pid):
451 # This is needed because the controller doesn't start listening
508 # This is needed because the controller doesn't start listening
452 # right when it starts and the controller needs to write
509 # right when it starts and the controller needs to write
453 # furl files for the engine to pick up
510 # furl files for the engine to pick up
454 reactor.callLater(1.0, start_engines, cont_pid)
511 reactor.callLater(1.0, start_engines, cont_pid)
455 dstart.addCallback(delay_start)
512 dstart.addCallback(delay_start)
456 dstart.addErrback(lambda f: f.raiseException())
513 dstart.addErrback(lambda f: f.raiseException())
457
514
458
515
459 def main_mpirun(args):
516 def main_mpirun(args):
460 cont_args = []
517 cont_args = []
461 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
518 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
462
519
463 # Check security settings before proceeding
520 # Check security settings before proceeding
464 if not check_security(args, cont_args):
521 if not check_security(args, cont_args):
465 return
522 return
466
523
467 cl = ControllerLauncher(extra_args=cont_args)
524 cl = ControllerLauncher(extra_args=cont_args)
468 dstart = cl.start()
525 dstart = cl.start()
469 def start_engines(cont_pid):
526 def start_engines(cont_pid):
470 raw_args = ['mpirun']
527 raw_args = ['mpirun']
471 raw_args.extend(['-n',str(args.n)])
528 raw_args.extend(['-n',str(args.n)])
472 raw_args.append('ipengine')
529 raw_args.append('ipengine')
473 raw_args.append('-l')
530 raw_args.append('-l')
474 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
531 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
475 if args.mpi:
532 if args.mpi:
476 raw_args.append('--mpi=%s' % args.mpi)
533 raw_args.append('--mpi=%s' % args.mpi)
477 eset = ProcessLauncher(raw_args)
534 eset = ProcessLauncher(raw_args)
478 def shutdown(signum, frame):
535 def shutdown(signum, frame):
479 log.msg('Stopping local cluster')
536 log.msg('Stopping local cluster')
480 # We are still playing with the times here, but these seem
537 # We are still playing with the times here, but these seem
481 # to be reliable in allowing everything to exit cleanly.
538 # to be reliable in allowing everything to exit cleanly.
482 eset.interrupt_then_kill(1.0)
539 eset.interrupt_then_kill(1.0)
483 cl.interrupt_then_kill(1.0)
540 cl.interrupt_then_kill(1.0)
484 reactor.callLater(2.0, reactor.stop)
541 reactor.callLater(2.0, reactor.stop)
485 signal.signal(signal.SIGINT,shutdown)
542 signal.signal(signal.SIGINT,shutdown)
486 d = eset.start()
543 d = eset.start()
487 return d
544 return d
488 def delay_start(cont_pid):
545 def delay_start(cont_pid):
489 # This is needed because the controller doesn't start listening
546 # This is needed because the controller doesn't start listening
490 # right when it starts and the controller needs to write
547 # right when it starts and the controller needs to write
491 # furl files for the engine to pick up
548 # furl files for the engine to pick up
492 reactor.callLater(1.0, start_engines, cont_pid)
549 reactor.callLater(1.0, start_engines, cont_pid)
493 dstart.addCallback(delay_start)
550 dstart.addCallback(delay_start)
494 dstart.addErrback(lambda f: f.raiseException())
551 dstart.addErrback(lambda f: f.raiseException())
495
552
496
553
497 def main_pbs(args):
554 def main_pbs(args):
498 cont_args = []
555 cont_args = []
499 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
556 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
500
557
501 # Check security settings before proceeding
558 # Check security settings before proceeding
502 if not check_security(args, cont_args):
559 if not check_security(args, cont_args):
503 return
560 return
504
561
505 cl = ControllerLauncher(extra_args=cont_args)
562 cl = ControllerLauncher(extra_args=cont_args)
506 dstart = cl.start()
563 dstart = cl.start()
507 def start_engines(r):
564 def start_engines(r):
508 pbs_set = PBSEngineSet(args.pbsscript)
565 pbs_set = PBSEngineSet(args.pbsscript)
509 def shutdown(signum, frame):
566 def shutdown(signum, frame):
510 log.msg('Stopping pbs cluster')
567 log.msg('Stopping pbs cluster')
511 d = pbs_set.kill()
568 d = pbs_set.kill()
512 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
569 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
513 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
570 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
514 signal.signal(signal.SIGINT,shutdown)
571 signal.signal(signal.SIGINT,shutdown)
515 d = pbs_set.start(args.n)
572 d = pbs_set.start(args.n)
516 return d
573 return d
517 dstart.addCallback(start_engines)
574 dstart.addCallback(start_engines)
518 dstart.addErrback(lambda f: f.raiseException())
575 dstart.addErrback(lambda f: f.raiseException())
519
576
520
577
521 # currently the ssh launcher only launches the controller on localhost.
522 def main_ssh(args):
578 def main_ssh(args):
523 # the clusterfile should look like:
579 """Start a controller on localhost and engines using ssh.
524 # send_furl = False # True, if you want
580
525 # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2}
581 Your clusterfile should look like::
582
583 send_furl = False # True, if you want
584 engines = {
585 'engine_host1' : engine_count,
586 'engine_host2' : engine_count2
587 }
588 """
526 clusterfile = {}
589 clusterfile = {}
527 execfile(args.clusterfile, clusterfile)
590 execfile(args.clusterfile, clusterfile)
528 if not clusterfile.has_key('send_furl'):
591 if not clusterfile.has_key('send_furl'):
529 clusterfile['send_furl'] = False
592 clusterfile['send_furl'] = False
530
593
531 cont_args = []
594 cont_args = []
532 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
595 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
533 if args.x:
596
534 cont_args.append('-x')
597 # Check security settings before proceeding
535 if args.y:
598 if not check_security(args, cont_args):
536 cont_args.append('-y')
599 return
600
537 cl = ControllerLauncher(extra_args=cont_args)
601 cl = ControllerLauncher(extra_args=cont_args)
538 dstart = cl.start()
602 dstart = cl.start()
539 def start_engines(cont_pid):
603 def start_engines(cont_pid):
540 est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
604 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
541 est.start(clusterfile['send_furl'])
542 def shutdown(signum, frame):
605 def shutdown(signum, frame):
543 est.killall()
606 d = ssh_set.kill()
544 cl.interrupt_then_kill(0.5)
607 # d.addErrback(log.err)
608 cl.interrupt_then_kill(1.0)
545 reactor.callLater(2.0, reactor.stop)
609 reactor.callLater(2.0, reactor.stop)
546 signal.signal(signal.SIGINT,shutdown)
610 signal.signal(signal.SIGINT,shutdown)
547
611 d = ssh_set.start(clusterfile['send_furl'])
612 return d
613
548 def delay_start(cont_pid):
614 def delay_start(cont_pid):
549 reactor.callLater(1.0, start_engines, cont_pid)
615 reactor.callLater(1.0, start_engines, cont_pid)
550
616
551 dstart.addCallback(delay_start)
617 dstart.addCallback(delay_start)
552 dstart.addErrback(lambda f: f.raiseException())
618 dstart.addErrback(lambda f: f.raiseException())
553
619
554
620
555 def get_args():
621 def get_args():
556 base_parser = argparse.ArgumentParser(add_help=False)
622 base_parser = argparse.ArgumentParser(add_help=False)
557 base_parser.add_argument(
623 base_parser.add_argument(
558 '-x',
624 '-x',
559 action='store_true',
625 action='store_true',
560 dest='x',
626 dest='x',
561 help='turn off client security'
627 help='turn off client security'
562 )
628 )
563 base_parser.add_argument(
629 base_parser.add_argument(
564 '-y',
630 '-y',
565 action='store_true',
631 action='store_true',
566 dest='y',
632 dest='y',
567 help='turn off engine security'
633 help='turn off engine security'
568 )
634 )
569 base_parser.add_argument(
635 base_parser.add_argument(
570 "--logdir",
636 "--logdir",
571 type=str,
637 type=str,
572 dest="logdir",
638 dest="logdir",
573 help="directory to put log files (default=$IPYTHONDIR/log)",
639 help="directory to put log files (default=$IPYTHONDIR/log)",
574 default=pjoin(get_ipython_dir(),'log')
640 default=pjoin(get_ipython_dir(),'log')
575 )
641 )
576 base_parser.add_argument(
642 base_parser.add_argument(
577 "-n",
643 "-n",
578 "--num",
644 "--num",
579 type=int,
645 type=int,
580 dest="n",
646 dest="n",
581 default=2,
647 default=2,
582 help="the number of engines to start"
648 help="the number of engines to start"
583 )
649 )
584
650
585 parser = argparse.ArgumentParser(
651 parser = argparse.ArgumentParser(
586 description='IPython cluster startup. This starts a controller and\
652 description='IPython cluster startup. This starts a controller and\
587 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
653 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
588 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
654 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
589 )
655 )
590 subparsers = parser.add_subparsers(
656 subparsers = parser.add_subparsers(
591 help='available cluster types. For help, do "ipcluster TYPE --help"')
657 help='available cluster types. For help, do "ipcluster TYPE --help"')
592
658
593 parser_local = subparsers.add_parser(
659 parser_local = subparsers.add_parser(
594 'local',
660 'local',
595 help='run a local cluster',
661 help='run a local cluster',
596 parents=[base_parser]
662 parents=[base_parser]
597 )
663 )
598 parser_local.set_defaults(func=main_local)
664 parser_local.set_defaults(func=main_local)
599
665
600 parser_mpirun = subparsers.add_parser(
666 parser_mpirun = subparsers.add_parser(
601 'mpirun',
667 'mpirun',
602 help='run a cluster using mpirun',
668 help='run a cluster using mpirun',
603 parents=[base_parser]
669 parents=[base_parser]
604 )
670 )
605 parser_mpirun.add_argument(
671 parser_mpirun.add_argument(
606 "--mpi",
672 "--mpi",
607 type=str,
673 type=str,
608 dest="mpi", # Don't put a default here to allow no MPI support
674 dest="mpi", # Don't put a default here to allow no MPI support
609 help="how to call MPI_Init (default=mpi4py)"
675 help="how to call MPI_Init (default=mpi4py)"
610 )
676 )
611 parser_mpirun.set_defaults(func=main_mpirun)
677 parser_mpirun.set_defaults(func=main_mpirun)
612
678
613 parser_pbs = subparsers.add_parser(
679 parser_pbs = subparsers.add_parser(
614 'pbs',
680 'pbs',
615 help='run a pbs cluster',
681 help='run a pbs cluster',
616 parents=[base_parser]
682 parents=[base_parser]
617 )
683 )
618 parser_pbs.add_argument(
684 parser_pbs.add_argument(
619 '--pbs-script',
685 '--pbs-script',
620 type=str,
686 type=str,
621 dest='pbsscript',
687 dest='pbsscript',
622 help='PBS script template',
688 help='PBS script template',
623 default='pbs.template'
689 default='pbs.template'
624 )
690 )
625 parser_pbs.set_defaults(func=main_pbs)
691 parser_pbs.set_defaults(func=main_pbs)
626
692
627 parser_ssh = subparsers.add_parser(
693 parser_ssh = subparsers.add_parser(
628 'ssh',
694 'ssh',
629 help='run a cluster using ssh, should have ssh-keys setup',
695 help='run a cluster using ssh, should have ssh-keys setup',
630 parents=[base_parser]
696 parents=[base_parser]
631 )
697 )
632 parser_ssh.add_argument(
698 parser_ssh.add_argument(
633 '--clusterfile',
699 '--clusterfile',
634 type=str,
700 type=str,
635 dest='clusterfile',
701 dest='clusterfile',
636 help='python file describing the cluster',
702 help='python file describing the cluster',
637 default='clusterfile.py',
703 default='clusterfile.py',
638 )
704 )
639 parser_ssh.add_argument(
705 parser_ssh.add_argument(
640 '--sshx',
706 '--sshx',
641 type=str,
707 type=str,
642 dest='sshx',
708 dest='sshx',
643 help='sshx launcher helper',
709 help='sshx launcher helper'
644 default='sshx.sh',
645 )
710 )
646 parser_ssh.set_defaults(func=main_ssh)
711 parser_ssh.set_defaults(func=main_ssh)
647
712
648 args = parser.parse_args()
713 args = parser.parse_args()
649 return args
714 return args
650
715
651 def main():
716 def main():
652 args = get_args()
717 args = get_args()
653 reactor.callWhenRunning(args.func, args)
718 reactor.callWhenRunning(args.func, args)
654 log.startLogging(sys.stdout)
719 log.startLogging(sys.stdout)
655 reactor.run()
720 reactor.run()
656
721
657 if __name__ == '__main__':
722 if __name__ == '__main__':
658 main()
723 main()
@@ -1,393 +1,398 b''
1 .. _changes:
1 .. _changes:
2
2
3 ==========
3 ==========
4 What's new
4 What's new
5 ==========
5 ==========
6
6
7 .. contents::
7 .. contents::
8 ..
8 ..
9 1 Release 0.9.1
9 1 Release 0.9.1
10 2 Release 0.9
10 2 Release 0.9
11 2.1 New features
11 2.1 New features
12 2.2 Bug fixes
12 2.2 Bug fixes
13 2.3 Backwards incompatible changes
13 2.3 Backwards incompatible changes
14 2.4 Changes merged in from IPython1
14 2.4 Changes merged in from IPython1
15 2.4.1 New features
15 2.4.1 New features
16 2.4.2 Bug fixes
16 2.4.2 Bug fixes
17 2.4.3 Backwards incompatible changes
17 2.4.3 Backwards incompatible changes
18 3 Release 0.8.4
18 3 Release 0.8.4
19 4 Release 0.8.3
19 4 Release 0.8.3
20 5 Release 0.8.2
20 5 Release 0.8.2
21 6 Older releases
21 6 Older releases
22 ..
22 ..
23
23
24 Release dev
24 Release dev
25 ===========
25 ===========
26
26
27 New features
27 New features
28 ------------
28 ------------
29
29
30 * The new ipcluster now has a fully working ssh mode that should work on
31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32
30 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
31 to Matt Foster for this patch.
34 to Matt Foster for this patch.
32
35
33 * Fully refactored :command:`ipcluster` command line program for starting
36 * Fully refactored :command:`ipcluster` command line program for starting
34 IPython clusters. This new version is a complete rewrite and 1) is fully
37 IPython clusters. This new version is a complete rewrite and 1) is fully
35 cross platform (we now use Twisted's process management), 2) has much
38 cross platform (we now use Twisted's process management), 2) has much
36 improved performance, 3) uses subcommands for different types of clusters,
39 improved performance, 3) uses subcommands for different types of clusters,
37 4) uses argparse for parsing command line options, 5) has better support
40 4) uses argparse for parsing command line options, 5) has better support
38 for starting clusters using :command:`mpirun`, 6) has experimental support
41 for starting clusters using :command:`mpirun`, 6) has experimental support
39 for starting engines using PBS. However, this new version of ipcluster
42 for starting engines using PBS. However, this new version of ipcluster
40 should be considered a technology preview. We plan on changing the API
43 should be considered a technology preview. We plan on changing the API
41 in significant ways before it is final.
44 in significant ways before it is final.
42
45
43 * The :mod:`argparse` module has been added to :mod:`IPython.external`.
46 * The :mod:`argparse` module has been added to :mod:`IPython.external`.
44
47
45 * Fully description of the security model added to the docs.
48 * Fully description of the security model added to the docs.
46
49
47 * cd completer: show bookmarks if no other completions are available.
50 * cd completer: show bookmarks if no other completions are available.
48
51
49 * sh profile: easy way to give 'title' to prompt: assign to variable
52 * sh profile: easy way to give 'title' to prompt: assign to variable
50 '_prompt_title'. It looks like this::
53 '_prompt_title'. It looks like this::
51
54
52 [~]|1> _prompt_title = 'sudo!'
55 [~]|1> _prompt_title = 'sudo!'
53 sudo![~]|2>
56 sudo![~]|2>
54
57
55 * %edit: If you do '%edit pasted_block', pasted_block
58 * %edit: If you do '%edit pasted_block', pasted_block
56 variable gets updated with new data (so repeated
59 variable gets updated with new data (so repeated
57 editing makes sense)
60 editing makes sense)
58
61
59 Bug fixes
62 Bug fixes
60 ---------
63 ---------
61
64
65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66
62 * The ipengine and ipcontroller scripts now handle missing furl files
67 * The ipengine and ipcontroller scripts now handle missing furl files
63 more gracefully by giving better error messages.
68 more gracefully by giving better error messages.
64
69
65 * %rehashx: Aliases no longer contain dots. python3.0 binary
70 * %rehashx: Aliases no longer contain dots. python3.0 binary
66 will create alias python30. Fixes:
71 will create alias python30. Fixes:
67 #259716 "commands with dots in them don't work"
72 #259716 "commands with dots in them don't work"
68
73
69 * %cpaste: %cpaste -r repeats the last pasted block.
74 * %cpaste: %cpaste -r repeats the last pasted block.
70 The block is assigned to pasted_block even if code
75 The block is assigned to pasted_block even if code
71 raises exception.
76 raises exception.
72
77
73 Backwards incompatible changes
78 Backwards incompatible changes
74 ------------------------------
79 ------------------------------
75
80
76 * The controller now has a ``-r`` flag that needs to be used if you want to
81 * The controller now has a ``-r`` flag that needs to be used if you want to
77 reuse existing furl files. Otherwise they are deleted (the default).
82 reuse existing furl files. Otherwise they are deleted (the default).
78
83
79 * Remove ipy_leo.py. "easy_install ipython-extension" to get it.
84 * Remove ipy_leo.py. "easy_install ipython-extension" to get it.
80 (done to decouple it from ipython release cycle)
85 (done to decouple it from ipython release cycle)
81
86
82
87
83
88
84 Release 0.9.1
89 Release 0.9.1
85 =============
90 =============
86
91
87 This release was quickly made to restore compatibility with Python 2.4, which
92 This release was quickly made to restore compatibility with Python 2.4, which
88 version 0.9 accidentally broke. No new features were introduced, other than
93 version 0.9 accidentally broke. No new features were introduced, other than
89 some additional testing support for internal use.
94 some additional testing support for internal use.
90
95
91
96
92 Release 0.9
97 Release 0.9
93 ===========
98 ===========
94
99
95 New features
100 New features
96 ------------
101 ------------
97
102
98 * All furl files and security certificates are now put in a read-only
103 * All furl files and security certificates are now put in a read-only
99 directory named ~./ipython/security.
104 directory named ~./ipython/security.
100
105
101 * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that
106 * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that
102 determines the user's IPython directory in a robust manner.
107 determines the user's IPython directory in a robust manner.
103
108
104 * Laurent's WX application has been given a top-level script called
109 * Laurent's WX application has been given a top-level script called
105 ipython-wx, and it has received numerous fixes. We expect this code to be
110 ipython-wx, and it has received numerous fixes. We expect this code to be
106 architecturally better integrated with Gael's WX 'ipython widget' over the
111 architecturally better integrated with Gael's WX 'ipython widget' over the
107 next few releases.
112 next few releases.
108
113
109 * The Editor synchronization work by Vivian De Smedt has been merged in. This
114 * The Editor synchronization work by Vivian De Smedt has been merged in. This
110 code adds a number of new editor hooks to synchronize with editors under
115 code adds a number of new editor hooks to synchronize with editors under
111 Windows.
116 Windows.
112
117
113 * A new, still experimental but highly functional, WX shell by Gael Varoquaux.
118 * A new, still experimental but highly functional, WX shell by Gael Varoquaux.
114 This work was sponsored by Enthought, and while it's still very new, it is
119 This work was sponsored by Enthought, and while it's still very new, it is
115 based on a more cleanly organized arhictecture of the various IPython
120 based on a more cleanly organized arhictecture of the various IPython
116 components. We will continue to develop this over the next few releases as a
121 components. We will continue to develop this over the next few releases as a
117 model for GUI components that use IPython.
122 model for GUI components that use IPython.
118
123
119 * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework),
124 * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework),
120 authored by Barry Wark. Currently the WX and the Cocoa ones have slightly
125 authored by Barry Wark. Currently the WX and the Cocoa ones have slightly
121 different internal organizations, but the whole team is working on finding
126 different internal organizations, but the whole team is working on finding
122 what the right abstraction points are for a unified codebase.
127 what the right abstraction points are for a unified codebase.
123
128
124 * As part of the frontend work, Barry Wark also implemented an experimental
129 * As part of the frontend work, Barry Wark also implemented an experimental
125 event notification system that various ipython components can use. In the
130 event notification system that various ipython components can use. In the
126 next release the implications and use patterns of this system regarding the
131 next release the implications and use patterns of this system regarding the
127 various GUI options will be worked out.
132 various GUI options will be worked out.
128
133
129 * IPython finally has a full test system, that can test docstrings with
134 * IPython finally has a full test system, that can test docstrings with
130 IPython-specific functionality. There are still a few pieces missing for it
135 IPython-specific functionality. There are still a few pieces missing for it
131 to be widely accessible to all users (so they can run the test suite at any
136 to be widely accessible to all users (so they can run the test suite at any
132 time and report problems), but it now works for the developers. We are
137 time and report problems), but it now works for the developers. We are
133 working hard on continuing to improve it, as this was probably IPython's
138 working hard on continuing to improve it, as this was probably IPython's
134 major Achilles heel (the lack of proper test coverage made it effectively
139 major Achilles heel (the lack of proper test coverage made it effectively
135 impossible to do large-scale refactoring). The full test suite can now
140 impossible to do large-scale refactoring). The full test suite can now
136 be run using the :command:`iptest` command line program.
141 be run using the :command:`iptest` command line program.
137
142
138 * The notion of a task has been completely reworked. An `ITask` interface has
143 * The notion of a task has been completely reworked. An `ITask` interface has
139 been created. This interface defines the methods that tasks need to
144 been created. This interface defines the methods that tasks need to
140 implement. These methods are now responsible for things like submitting
145 implement. These methods are now responsible for things like submitting
141 tasks and processing results. There are two basic task types:
146 tasks and processing results. There are two basic task types:
142 :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but
147 :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but
143 renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on
148 renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on
144 a function.
149 a function.
145
150
146 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
151 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
147 standardize the idea of a `map` method. This interface has a single `map`
152 standardize the idea of a `map` method. This interface has a single `map`
148 method that has the same syntax as the built-in `map`. We have also defined
153 method that has the same syntax as the built-in `map`. We have also defined
149 a `mapper` factory interface that creates objects that implement
154 a `mapper` factory interface that creates objects that implement
150 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the
155 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the
151 multiengine and task controller now have mapping capabilties.
156 multiengine and task controller now have mapping capabilties.
152
157
153 * The parallel function capabilities have been reworks. The major changes are
158 * The parallel function capabilities have been reworks. The major changes are
154 that i) there is now an `@parallel` magic that creates parallel functions,
159 that i) there is now an `@parallel` magic that creates parallel functions,
155 ii) the syntax for mulitple variable follows that of `map`, iii) both the
160 ii) the syntax for mulitple variable follows that of `map`, iii) both the
156 multiengine and task controller now have a parallel function implementation.
161 multiengine and task controller now have a parallel function implementation.
157
162
158 * All of the parallel computing capabilities from `ipython1-dev` have been
163 * All of the parallel computing capabilities from `ipython1-dev` have been
159 merged into IPython proper. This resulted in the following new subpackages:
164 merged into IPython proper. This resulted in the following new subpackages:
160 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
165 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
161 :mod:`IPython.tools` and :mod:`IPython.testing`.
166 :mod:`IPython.tools` and :mod:`IPython.testing`.
162
167
163 * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and
168 * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and
164 friends have been completely refactored. Now we are checking for
169 friends have been completely refactored. Now we are checking for
165 dependencies using the approach that matplotlib uses.
170 dependencies using the approach that matplotlib uses.
166
171
167 * The documentation has been completely reorganized to accept the
172 * The documentation has been completely reorganized to accept the
168 documentation from `ipython1-dev`.
173 documentation from `ipython1-dev`.
169
174
170 * We have switched to using Foolscap for all of our network protocols in
175 * We have switched to using Foolscap for all of our network protocols in
171 :mod:`IPython.kernel`. This gives us secure connections that are both
176 :mod:`IPython.kernel`. This gives us secure connections that are both
172 encrypted and authenticated.
177 encrypted and authenticated.
173
178
174 * We have a brand new `COPYING.txt` files that describes the IPython license
179 * We have a brand new `COPYING.txt` files that describes the IPython license
175 and copyright. The biggest change is that we are putting "The IPython
180 and copyright. The biggest change is that we are putting "The IPython
176 Development Team" as the copyright holder. We give more details about
181 Development Team" as the copyright holder. We give more details about
177 exactly what this means in this file. All developer should read this and use
182 exactly what this means in this file. All developer should read this and use
178 the new banner in all IPython source code files.
183 the new banner in all IPython source code files.
179
184
180 * sh profile: ./foo runs foo as system command, no need to do !./foo anymore
185 * sh profile: ./foo runs foo as system command, no need to do !./foo anymore
181
186
182 * String lists now support ``sort(field, nums = True)`` method (to easily sort
187 * String lists now support ``sort(field, nums = True)`` method (to easily sort
183 system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``.
188 system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``.
184
189
185 * '%cpaste foo' now assigns the pasted block as string list, instead of string
190 * '%cpaste foo' now assigns the pasted block as string list, instead of string
186
191
187 * The ipcluster script now run by default with no security. This is done
192 * The ipcluster script now run by default with no security. This is done
188 because the main usage of the script is for starting things on localhost.
193 because the main usage of the script is for starting things on localhost.
189 Eventually when ipcluster is able to start things on other hosts, we will put
194 Eventually when ipcluster is able to start things on other hosts, we will put
190 security back.
195 security back.
191
196
192 * 'cd --foo' searches directory history for string foo, and jumps to that dir.
197 * 'cd --foo' searches directory history for string foo, and jumps to that dir.
193 Last part of dir name is checked first. If no matches for that are found,
198 Last part of dir name is checked first. If no matches for that are found,
194 look at the whole path.
199 look at the whole path.
195
200
196
201
197 Bug fixes
202 Bug fixes
198 ---------
203 ---------
199
204
200 * The Windows installer has been fixed. Now all IPython scripts have ``.bat``
205 * The Windows installer has been fixed. Now all IPython scripts have ``.bat``
201 versions created. Also, the Start Menu shortcuts have been updated.
206 versions created. Also, the Start Menu shortcuts have been updated.
202
207
203 * The colors escapes in the multiengine client are now turned off on win32 as
208 * The colors escapes in the multiengine client are now turned off on win32 as
204 they don't print correctly.
209 they don't print correctly.
205
210
206 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing
211 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing
207 mpi_import_statement incorrectly, which was leading the engine to crash when
212 mpi_import_statement incorrectly, which was leading the engine to crash when
208 mpi was enabled.
213 mpi was enabled.
209
214
210 * A few subpackages had missing ``__init__.py`` files.
215 * A few subpackages had missing ``__init__.py`` files.
211
216
212 * The documentation is only created if Sphinx is found. Previously, the
217 * The documentation is only created if Sphinx is found. Previously, the
213 ``setup.py`` script would fail if it was missing.
218 ``setup.py`` script would fail if it was missing.
214
219
215 * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as
220 * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as
216 it caused problems on certain platforms.
221 it caused problems on certain platforms.
217
222
218
223
219 Backwards incompatible changes
224 Backwards incompatible changes
220 ------------------------------
225 ------------------------------
221
226
222 * The ``clusterfile`` options of the :command:`ipcluster` command has been
227 * The ``clusterfile`` options of the :command:`ipcluster` command has been
223 removed as it was not working and it will be replaced soon by something much
228 removed as it was not working and it will be replaced soon by something much
224 more robust.
229 more robust.
225
230
226 * The :mod:`IPython.kernel` configuration now properly find the user's
231 * The :mod:`IPython.kernel` configuration now properly find the user's
227 IPython directory.
232 IPython directory.
228
233
229 * In ipapi, the :func:`make_user_ns` function has been replaced with
234 * In ipapi, the :func:`make_user_ns` function has been replaced with
230 :func:`make_user_namespaces`, to support dict subclasses in namespace
235 :func:`make_user_namespaces`, to support dict subclasses in namespace
231 creation.
236 creation.
232
237
233 * :class:`IPython.kernel.client.Task` has been renamed
238 * :class:`IPython.kernel.client.Task` has been renamed
234 :class:`IPython.kernel.client.StringTask` to make way for new task types.
239 :class:`IPython.kernel.client.StringTask` to make way for new task types.
235
240
236 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
241 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
237 and `map`.
242 and `map`.
238
243
239 * Renamed the values that the rename `dist` keyword argument can have from
244 * Renamed the values that the rename `dist` keyword argument can have from
240 `'basic'` to `'b'`.
245 `'basic'` to `'b'`.
241
246
242 * IPython has a larger set of dependencies if you want all of its capabilities.
247 * IPython has a larger set of dependencies if you want all of its capabilities.
243 See the `setup.py` script for details.
248 See the `setup.py` script for details.
244
249
245 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
250 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
246 :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple.
251 :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple.
247 Instead they take the filename of a file that contains the FURL for that
252 Instead they take the filename of a file that contains the FURL for that
248 client. If the FURL file is in your IPYTHONDIR, it will be found automatically
253 client. If the FURL file is in your IPYTHONDIR, it will be found automatically
249 and the constructor can be left empty.
254 and the constructor can be left empty.
250
255
251 * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created
256 * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created
252 using the factory functions :func:`get_multiengine_client` and
257 using the factory functions :func:`get_multiengine_client` and
253 :func:`get_task_client`. These return a `Deferred` to the actual client.
258 :func:`get_task_client`. These return a `Deferred` to the actual client.
254
259
255 * The command line options to `ipcontroller` and `ipengine` have changed to
260 * The command line options to `ipcontroller` and `ipengine` have changed to
256 reflect the new Foolscap network protocol and the FURL files. Please see the
261 reflect the new Foolscap network protocol and the FURL files. Please see the
257 help for these scripts for details.
262 help for these scripts for details.
258
263
259 * The configuration files for the kernel have changed because of the Foolscap
264 * The configuration files for the kernel have changed because of the Foolscap
260 stuff. If you were using custom config files before, you should delete them
265 stuff. If you were using custom config files before, you should delete them
261 and regenerate new ones.
266 and regenerate new ones.
262
267
263 Changes merged in from IPython1
268 Changes merged in from IPython1
264 -------------------------------
269 -------------------------------
265
270
266 New features
271 New features
267 ............
272 ............
268
273
269 * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and
274 * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and
270 zope.interface are now easy installable, we can declare them as dependencies
275 zope.interface are now easy installable, we can declare them as dependencies
271 in our setupegg.py script.
276 in our setupegg.py script.
272
277
273 * IPython is now compatible with Twisted 2.5.0 and 8.x.
278 * IPython is now compatible with Twisted 2.5.0 and 8.x.
274
279
275 * Added a new example of how to use :mod:`ipython1.kernel.asynclient`.
280 * Added a new example of how to use :mod:`ipython1.kernel.asynclient`.
276
281
277 * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not
282 * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not
278 been merged into IPython and is still in `ipython1-dev`.
283 been merged into IPython and is still in `ipython1-dev`.
279
284
280 * The ``TaskController`` now has methods for getting the queue status.
285 * The ``TaskController`` now has methods for getting the queue status.
281
286
282 * The ``TaskResult`` objects not have information about how long the task
287 * The ``TaskResult`` objects not have information about how long the task
283 took to run.
288 took to run.
284
289
285 * We are attaching additional attributes to exceptions ``(_ipython_*)`` that
290 * We are attaching additional attributes to exceptions ``(_ipython_*)`` that
286 we use to carry additional info around.
291 we use to carry additional info around.
287
292
288 * New top-level module :mod:`asyncclient` that has asynchronous versions (that
293 * New top-level module :mod:`asyncclient` that has asynchronous versions (that
289 return deferreds) of the client classes. This is designed to users who want
294 return deferreds) of the client classes. This is designed to users who want
290 to run their own Twisted reactor.
295 to run their own Twisted reactor.
291
296
292 * All the clients in :mod:`client` are now based on Twisted. This is done by
297 * All the clients in :mod:`client` are now based on Twisted. This is done by
293 running the Twisted reactor in a separate thread and using the
298 running the Twisted reactor in a separate thread and using the
294 :func:`blockingCallFromThread` function that is in recent versions of Twisted.
299 :func:`blockingCallFromThread` function that is in recent versions of Twisted.
295
300
296 * Functions can now be pushed/pulled to/from engines using
301 * Functions can now be pushed/pulled to/from engines using
297 :meth:`MultiEngineClient.push_function` and
302 :meth:`MultiEngineClient.push_function` and
298 :meth:`MultiEngineClient.pull_function`.
303 :meth:`MultiEngineClient.pull_function`.
299
304
300 * Gather/scatter are now implemented in the client to reduce the work load
305 * Gather/scatter are now implemented in the client to reduce the work load
301 of the controller and improve performance.
306 of the controller and improve performance.
302
307
303 * Complete rewrite of the IPython docuementation. All of the documentation
308 * Complete rewrite of the IPython docuementation. All of the documentation
304 from the IPython website has been moved into docs/source as restructured
309 from the IPython website has been moved into docs/source as restructured
305 text documents. PDF and HTML documentation are being generated using
310 text documents. PDF and HTML documentation are being generated using
306 Sphinx.
311 Sphinx.
307
312
308 * New developer oriented documentation: development guidelines and roadmap.
313 * New developer oriented documentation: development guidelines and roadmap.
309
314
310 * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt``
315 * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt``
311 file that is organized by release and is meant to provide something more
316 file that is organized by release and is meant to provide something more
312 relevant for users.
317 relevant for users.
313
318
314 Bug fixes
319 Bug fixes
315 .........
320 .........
316
321
317 * Created a proper ``MANIFEST.in`` file to create source distributions.
322 * Created a proper ``MANIFEST.in`` file to create source distributions.
318
323
319 * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine
324 * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine
320 actions were being collected with a :class:`DeferredList` with
325 actions were being collected with a :class:`DeferredList` with
321 ``fireononeerrback=1``. This meant that methods were returning
326 ``fireononeerrback=1``. This meant that methods were returning
322 before all engines had given their results. This was causing extremely odd
327 before all engines had given their results. This was causing extremely odd
323 bugs in certain cases. To fix this problem, we have 1) set
328 bugs in certain cases. To fix this problem, we have 1) set
324 ``fireononeerrback=0`` to make sure all results (or exceptions) are in
329 ``fireononeerrback=0`` to make sure all results (or exceptions) are in
325 before returning and 2) introduced a :exc:`CompositeError` exception
330 before returning and 2) introduced a :exc:`CompositeError` exception
326 that wraps all of the engine exceptions. This is a huge change as it means
331 that wraps all of the engine exceptions. This is a huge change as it means
327 that users will have to catch :exc:`CompositeError` rather than the actual
332 that users will have to catch :exc:`CompositeError` rather than the actual
328 exception.
333 exception.
329
334
330 Backwards incompatible changes
335 Backwards incompatible changes
331 ..............................
336 ..............................
332
337
333 * All names have been renamed to conform to the lowercase_with_underscore
338 * All names have been renamed to conform to the lowercase_with_underscore
334 convention. This will require users to change references to all names like
339 convention. This will require users to change references to all names like
335 ``queueStatus`` to ``queue_status``.
340 ``queueStatus`` to ``queue_status``.
336
341
337 * Previously, methods like :meth:`MultiEngineClient.push` and
342 * Previously, methods like :meth:`MultiEngineClient.push` and
338 :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was
343 :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was
339 becoming a problem as we weren't able to introduce new keyword arguments into
344 becoming a problem as we weren't able to introduce new keyword arguments into
340 the API. Now these methods simple take a dict or sequence. This has also
345 the API. Now these methods simple take a dict or sequence. This has also
341 allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and
346 allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and
342 :meth:`pullAll`. These things are now handled with the ``targets`` keyword
347 :meth:`pullAll`. These things are now handled with the ``targets`` keyword
343 argument that defaults to ``'all'``.
348 argument that defaults to ``'all'``.
344
349
345 * The :attr:`MultiEngineClient.magicTargets` has been renamed to
350 * The :attr:`MultiEngineClient.magicTargets` has been renamed to
346 :attr:`MultiEngineClient.targets`.
351 :attr:`MultiEngineClient.targets`.
347
352
348 * All methods in the MultiEngine interface now accept the optional keyword
353 * All methods in the MultiEngine interface now accept the optional keyword
349 argument ``block``.
354 argument ``block``.
350
355
351 * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and
356 * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and
352 :class:`TaskController` to :class:`TaskClient`.
357 :class:`TaskController` to :class:`TaskClient`.
353
358
354 * Renamed the top-level module from :mod:`api` to :mod:`client`.
359 * Renamed the top-level module from :mod:`api` to :mod:`client`.
355
360
356 * Most methods in the multiengine interface now raise a :exc:`CompositeError`
361 * Most methods in the multiengine interface now raise a :exc:`CompositeError`
357 exception that wraps the user's exceptions, rather than just raising the raw
362 exception that wraps the user's exceptions, rather than just raising the raw
358 user's exception.
363 user's exception.
359
364
360 * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push``
365 * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push``
361 and ``pull``.
366 and ``pull``.
362
367
363
368
364 Release 0.8.4
369 Release 0.8.4
365 =============
370 =============
366
371
367 This was a quick release to fix an unfortunate bug that slipped into the 0.8.3
372 This was a quick release to fix an unfortunate bug that slipped into the 0.8.3
368 release. The ``--twisted`` option was disabled, as it turned out to be broken
373 release. The ``--twisted`` option was disabled, as it turned out to be broken
369 across several platforms.
374 across several platforms.
370
375
371
376
372 Release 0.8.3
377 Release 0.8.3
373 =============
378 =============
374
379
375 * pydb is now disabled by default (due to %run -d problems). You can enable
380 * pydb is now disabled by default (due to %run -d problems). You can enable
376 it by passing -pydb command line argument to IPython. Note that setting
381 it by passing -pydb command line argument to IPython. Note that setting
377 it in config file won't work.
382 it in config file won't work.
378
383
379
384
380 Release 0.8.2
385 Release 0.8.2
381 =============
386 =============
382
387
383 * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory
388 * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory
384 and jumps to /foo. The current behaviour is closer to the documented
389 and jumps to /foo. The current behaviour is closer to the documented
385 behaviour, and should not trip anyone.
390 behaviour, and should not trip anyone.
386
391
387
392
388 Older releases
393 Older releases
389 ==============
394 ==============
390
395
391 Changes in earlier releases of IPython are described in the older file
396 Changes in earlier releases of IPython are described in the older file
392 ``ChangeLog``. Please refer to this document for details.
397 ``ChangeLog``. Please refer to this document for details.
393
398
@@ -1,326 +1,324 b''
1 .. _parallel_process:
1 .. _parallel_process:
2
2
3 ===========================================
3 ===========================================
4 Starting the IPython controller and engines
4 Starting the IPython controller and engines
5 ===========================================
5 ===========================================
6
6
7 To use IPython for parallel computing, you need to start one instance of
7 To use IPython for parallel computing, you need to start one instance of
8 the controller and one or more instances of the engine. The controller
8 the controller and one or more instances of the engine. The controller
9 and each engine can run on different machines or on the same machine.
9 and each engine can run on different machines or on the same machine.
10 Because of this, there are many different possibilities.
10 Because of this, there are many different possibilities.
11
11
12 Broadly speaking, there are two ways of going about starting a controller and engines:
12 Broadly speaking, there are two ways of going about starting a controller and engines:
13
13
14 * In an automated manner using the :command:`ipcluster` command.
14 * In an automated manner using the :command:`ipcluster` command.
15 * In a more manual way using the :command:`ipcontroller` and
15 * In a more manual way using the :command:`ipcontroller` and
16 :command:`ipengine` commands.
16 :command:`ipengine` commands.
17
17
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19
19
20 General considerations
20 General considerations
21 ======================
21 ======================
22
22
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24
24
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26
26
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 ``host0``.
28 ``host0``.
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 controller from ``host0`` to hosts ``host1``-``hostn``.
30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 3. Start the engines on hosts ``host1``-``hostn`` by running
31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 :command:`ipengine`. This command has to be told where the FURL file
32 :command:`ipengine`. This command has to be told where the FURL file
33 (:file:`ipcontroller-engine.furl`) is located.
33 (:file:`ipcontroller-engine.furl`) is located.
34
34
35 At this point, the controller and engines will be connected. By default, the
35 At this point, the controller and engines will be connected. By default, the
36 FURL files created by the controller are put into the
36 FURL files created by the controller are put into the
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 the controller, step 2 can be skipped as the engines will automatically look
38 the controller, step 2 can be skipped as the engines will automatically look
39 at that location.
39 at that location.
40
40
41 The final step required required to actually use the running controller from a
41 The final step required required to actually use the running controller from a
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45
45
46 Using :command:`ipcluster`
46 Using :command:`ipcluster`
47 ==========================
47 ==========================
48
48
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50
50
51 1. When the controller and engines are all run on localhost. This is useful
51 1. When the controller and engines are all run on localhost. This is useful
52 for testing or running on a multicore computer.
52 for testing or running on a multicore computer.
53 2. When engines are started using the :command:`mpirun` command that comes
53 2. When engines are started using the :command:`mpirun` command that comes
54 with most MPI [MPI]_ implementations
54 with most MPI [MPI]_ implementations
55 3. When engines are started using the PBS [PBS]_ batch system.
55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
56 4. When the controller is started on localhost and the engines are started on
57 remote nodes using :command:`ssh`.
57 remote nodes using :command:`ssh`.
58
58
59 .. note::
59 .. note::
60
60
61 It is also possible for advanced users to add support to
61 It is also possible for advanced users to add support to
62 :command:`ipcluster` for starting controllers and engines using other
62 :command:`ipcluster` for starting controllers and engines using other
63 methods (like Sun's Grid Engine for example).
63 methods (like Sun's Grid Engine for example).
64
64
65 .. note::
65 .. note::
66
66
67 Currently :command:`ipcluster` requires that the
67 Currently :command:`ipcluster` requires that the
68 :file:`~/.ipython/security` directory live on a shared filesystem that is
68 :file:`~/.ipython/security` directory live on a shared filesystem that is
69 seen by both the controller and engines. If you don't have a shared file
69 seen by both the controller and engines. If you don't have a shared file
70 system you will need to use :command:`ipcontroller` and
70 system you will need to use :command:`ipcontroller` and
71 :command:`ipengine` directly. This constraint can be relaxed if you are
71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 using the :command:`ssh` method to start the cluster.
72 using the :command:`ssh` method to start the cluster.
73
73
74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
75 and :command:`ipengine` to perform the steps described above.
75 and :command:`ipengine` to perform the steps described above.
76
76
77 Using :command:`ipcluster` in local mode
77 Using :command:`ipcluster` in local mode
78 ----------------------------------------
78 ----------------------------------------
79
79
80 To start one controller and 4 engines on localhost, just do::
80 To start one controller and 4 engines on localhost, just do::
81
81
82 $ ipcluster local -n 4
82 $ ipcluster local -n 4
83
83
84 To see other command line options for the local mode, do::
84 To see other command line options for the local mode, do::
85
85
86 $ ipcluster local -h
86 $ ipcluster local -h
87
87
88 Using :command:`ipcluster` in mpirun mode
88 Using :command:`ipcluster` in mpirun mode
89 -----------------------------------------
89 -----------------------------------------
90
90
91 The mpirun mode is useful if you:
91 The mpirun mode is useful if you:
92
92
93 1. Have MPI installed.
93 1. Have MPI installed.
94 2. Your systems are configured to use the :command:`mpirun` command to start
94 2. Your systems are configured to use the :command:`mpirun` command to start
95 processes.
95 processes.
96
96
97 If these are satisfied, you can start an IPython cluster using::
97 If these are satisfied, you can start an IPython cluster using::
98
98
99 $ ipcluster mpirun -n 4
99 $ ipcluster mpirun -n 4
100
100
101 This does the following:
101 This does the following:
102
102
103 1. Starts the IPython controller on current host.
103 1. Starts the IPython controller on current host.
104 2. Uses :command:`mpirun` to start 4 engines.
104 2. Uses :command:`mpirun` to start 4 engines.
105
105
106 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
106 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
107
107
108 $ ipcluster mpirun -n 4 --mpi=mpi4py
108 $ ipcluster mpirun -n 4 --mpi=mpi4py
109
109
110 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
110 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
111
111
112 Additional command line options for this mode can be found by doing::
112 Additional command line options for this mode can be found by doing::
113
113
114 $ ipcluster mpirun -h
114 $ ipcluster mpirun -h
115
115
116 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
116 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
117
117
118
118
119 Using :command:`ipcluster` in PBS mode
119 Using :command:`ipcluster` in PBS mode
120 --------------------------------------
120 --------------------------------------
121
121
122 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
122 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
123
123
124 .. sourcecode:: bash
124 .. sourcecode:: bash
125
125
126 #PBS -N ipython
126 #PBS -N ipython
127 #PBS -j oe
127 #PBS -j oe
128 #PBS -l walltime=00:10:00
128 #PBS -l walltime=00:10:00
129 #PBS -l nodes=${n/4}:ppn=4
129 #PBS -l nodes=${n/4}:ppn=4
130 #PBS -q parallel
130 #PBS -q parallel
131
131
132 cd $$PBS_O_WORKDIR
132 cd $$PBS_O_WORKDIR
133 export PATH=$$HOME/usr/local/bin
133 export PATH=$$HOME/usr/local/bin
134 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
134 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
135 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
135 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
136
136
137 There are a few important points about this template:
137 There are a few important points about this template:
138
138
139 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
139 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
140 template engine.
140 template engine.
141
141
142 2. Instead of putting in the actual number of engines, use the notation
142 2. Instead of putting in the actual number of engines, use the notation
143 ``${n}`` to indicate the number of engines to be started. You can also uses
143 ``${n}`` to indicate the number of engines to be started. You can also uses
144 expressions like ``${n/4}`` in the template to indicate the number of
144 expressions like ``${n/4}`` in the template to indicate the number of
145 nodes.
145 nodes.
146
146
147 3. Because ``$`` is a special character used by the template engine, you must
147 3. Because ``$`` is a special character used by the template engine, you must
148 escape any ``$`` by using ``$$``. This is important when referring to
148 escape any ``$`` by using ``$$``. This is important when referring to
149 environment variables in the template.
149 environment variables in the template.
150
150
151 4. Any options to :command:`ipengine` should be given in the batch script
151 4. Any options to :command:`ipengine` should be given in the batch script
152 template.
152 template.
153
153
154 5. Depending on the configuration of you system, you may have to set
154 5. Depending on the configuration of you system, you may have to set
155 environment variables in the script template.
155 environment variables in the script template.
156
156
157 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
157 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
158
158
159 $ ipcluster pbs -n 128 --pbs-script=pbs.template
159 $ ipcluster pbs -n 128 --pbs-script=pbs.template
160
160
161 Additional command line options for this mode can be found by doing::
161 Additional command line options for this mode can be found by doing::
162
162
163 $ ipcluster pbs -h
163 $ ipcluster pbs -h
164
164
165 Using :command:`ipcluster` in SSH mode
165 Using :command:`ipcluster` in SSH mode
166 --------------------------------------
166 --------------------------------------
167
167
168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 nodes and the :command:`ipcontroller` on localhost.
169 nodes and the :command:`ipcontroller` on localhost.
170
170
171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172
172
173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174
174
175 .. sourcecode:: python
175 .. sourcecode:: python
176
176
177 send_furl = True
177 send_furl = True
178 engines = { 'host1.example.com' : 2,
178 engines = { 'host1.example.com' : 2,
179 'host2.example.com' : 5,
179 'host2.example.com' : 5,
180 'host3.example.com' : 1,
180 'host3.example.com' : 1,
181 'host4.example.com' : 8 }
181 'host4.example.com' : 8 }
182
182
183 Since this is a regular python file usual python syntax applies. Things to note:
183 Since this is a regular python file usual python syntax applies. Things to note:
184
184
185 * The `engines` dict, where the keys is the host we want to run engines on and
185 * The `engines` dict, where the keys is the host we want to run engines on and
186 the value is the number of engines to run on that host.
186 the value is the number of engines to run on that host.
187 * send_furl can either be `True` or `False`, if `True` it will copy over the
187 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 furl needed for :command:`ipengine` to each host.
188 furl needed for :command:`ipengine` to each host.
189
189
190 The ``--clusterfile`` command line option lets you specify the file to use for
190 The ``--clusterfile`` command line option lets you specify the file to use for
191 the cluster definition. Once you have your cluster file and you can
191 the cluster definition. Once you have your cluster file and you can
192 :command:`ssh` into the remote hosts with out an password you are ready to
192 :command:`ssh` into the remote hosts with out an password you are ready to
193 start your cluster like so:
193 start your cluster like so:
194
194
195 .. sourcecode:: bash
195 .. sourcecode:: bash
196
196
197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198
198
199
199
200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201
201
202 * sshx.sh
202 * sshx.sh
203 * engine_killer.sh
203 * engine_killer.sh
204
204
205 Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a
205 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
206 temp directory on the remote host and executed from there, on most Unix, Linux
207 and OS X systems this is /tmp.
208
206
209 The sshx.sh is as simple as:
207 The default sshx.sh is the following:
210
208
211 .. sourcecode:: bash
209 .. sourcecode:: bash
212
210
213 #!/bin/sh
211 #!/bin/sh
214 "$@" &> /dev/null &
212 "$@" &> /dev/null &
215 echo $!
213 echo $!
216
214
217 If you want to use a custom sshx.sh script you need to use the ``--sshx``
215 If you want to use a custom sshx.sh script you need to use the ``--sshx``
218 option and specify the file to use. Using a custom sshx.sh file could be
216 option and specify the file to use. Using a custom sshx.sh file could be
219 helpful when you need to setup the environment on the remote host before
217 helpful when you need to setup the environment on the remote host before
220 executing :command:`ipengine`.
218 executing :command:`ipengine`.
221
219
222 For a detailed options list:
220 For a detailed options list:
223
221
224 .. sourcecode:: bash
222 .. sourcecode:: bash
225
223
226 $ ipcluster ssh -h
224 $ ipcluster ssh -h
227
225
228 Current limitations of the SSH mode of :command:`ipcluster` are:
226 Current limitations of the SSH mode of :command:`ipcluster` are:
229
227
230 * Untested on Windows. Would require a working :command:`ssh` on Windows.
228 * Untested on Windows. Would require a working :command:`ssh` on Windows.
231 Also, we are using shell scripts to setup and execute commands on remote
229 Also, we are using shell scripts to setup and execute commands on remote
232 hosts.
230 hosts.
233 * :command:`ipcontroller` is started on localhost, with no option to start it
231 * :command:`ipcontroller` is started on localhost, with no option to start it
234 on a remote node also.
232 on a remote node.
235
233
236 Using the :command:`ipcontroller` and :command:`ipengine` commands
234 Using the :command:`ipcontroller` and :command:`ipengine` commands
237 ==================================================================
235 ==================================================================
238
236
239 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
237 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
240
238
241 Starting the controller and engine on your local machine
239 Starting the controller and engine on your local machine
242 --------------------------------------------------------
240 --------------------------------------------------------
243
241
244 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
242 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
245 local machine, do the following.
243 local machine, do the following.
246
244
247 First start the controller::
245 First start the controller::
248
246
249 $ ipcontroller
247 $ ipcontroller
250
248
251 Next, start however many instances of the engine you want using (repeatedly) the command::
249 Next, start however many instances of the engine you want using (repeatedly) the command::
252
250
253 $ ipengine
251 $ ipengine
254
252
255 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
253 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
256
254
257 .. warning::
255 .. warning::
258
256
259 The order of the above operations is very important. You *must*
257 The order of the above operations is very important. You *must*
260 start the controller before the engines, since the engines connect
258 start the controller before the engines, since the engines connect
261 to the controller as they get started.
259 to the controller as they get started.
262
260
263 .. note::
261 .. note::
264
262
265 On some platforms (OS X), to put the controller and engine into the
263 On some platforms (OS X), to put the controller and engine into the
266 background you may need to give these commands in the form ``(ipcontroller
264 background you may need to give these commands in the form ``(ipcontroller
267 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
265 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
268 properly.
266 properly.
269
267
270 Starting the controller and engines on different hosts
268 Starting the controller and engines on different hosts
271 ------------------------------------------------------
269 ------------------------------------------------------
272
270
273 When the controller and engines are running on different hosts, things are
271 When the controller and engines are running on different hosts, things are
274 slightly more complicated, but the underlying ideas are the same:
272 slightly more complicated, but the underlying ideas are the same:
275
273
276 1. Start the controller on a host using :command:`ipcontroller`.
274 1. Start the controller on a host using :command:`ipcontroller`.
277 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
275 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
278 3. Use :command:`ipengine` on the engine's hosts to start the engines.
276 3. Use :command:`ipengine` on the engine's hosts to start the engines.
279
277
280 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
278 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
281
279
282 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
280 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
283 directory on the engine's host, where it will be found automatically.
281 directory on the engine's host, where it will be found automatically.
284 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
282 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
285 flag.
283 flag.
286
284
287 The ``--furl-file`` flag works like this::
285 The ``--furl-file`` flag works like this::
288
286
289 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
287 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
290
288
291 .. note::
289 .. note::
292
290
293 If the controller's and engine's hosts all have a shared file system
291 If the controller's and engine's hosts all have a shared file system
294 (:file:`~./ipython/security` is the same on all of them), then things
292 (:file:`~./ipython/security` is the same on all of them), then things
295 will just work!
293 will just work!
296
294
297 Make FURL files persistent
295 Make FURL files persistent
298 ---------------------------
296 ---------------------------
299
297
300 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
298 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
301
299
302 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
300 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
303
301
304 $ ipcontroller -r --client-port=10101 --engine-port=10102
302 $ ipcontroller -r --client-port=10101 --engine-port=10102
305
303
306 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
304 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
307
305
308 .. note::
306 .. note::
309
307
310 You may ask the question: what ports does the controller listen on if you
308 You may ask the question: what ports does the controller listen on if you
311 don't tell is to use specific ones? The default is to use high random port
309 don't tell is to use specific ones? The default is to use high random port
312 numbers. We do this for two reasons: i) to increase security through
310 numbers. We do this for two reasons: i) to increase security through
313 obscurity and ii) to multiple controllers on a given host to start and
311 obscurity and ii) to multiple controllers on a given host to start and
314 automatically use different ports.
312 automatically use different ports.
315
313
316 Log files
314 Log files
317 ---------
315 ---------
318
316
319 All of the components of IPython have log files associated with them.
317 All of the components of IPython have log files associated with them.
320 These log files can be extremely useful in debugging problems with
318 These log files can be extremely useful in debugging problems with
321 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
319 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
322 the log files to us will often help us to debug any problems.
320 the log files to us will often help us to debug any problems.
323
321
324
322
325 .. [PBS] Portable Batch System. http://www.openpbs.org/
323 .. [PBS] Portable Batch System. http://www.openpbs.org/
326 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
324 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now