##// END OF EJS Templates
Fixing small things in response to review.
Brian Granger -
Show More
@@ -1,753 +1,753 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3
3
4 """Adapt the IPython ControllerServer to IMultiEngine.
4 """Adapt the IPython ControllerServer to IMultiEngine.
5
5
6 This module provides classes that adapt a ControllerService to the
6 This module provides classes that adapt a ControllerService to the
7 IMultiEngine interface. This interface is a basic interactive interface
7 IMultiEngine interface. This interface is a basic interactive interface
8 for working with a set of engines where it is desired to have explicit
8 for working with a set of engines where it is desired to have explicit
9 access to each registered engine.
9 access to each registered engine.
10
10
11 The classes here are exposed to the network in files like:
11 The classes here are exposed to the network in files like:
12
12
13 * multienginevanilla.py
13 * multienginevanilla.py
14 * multienginepb.py
14 * multienginepb.py
15 """
15 """
16
16
17 __docformat__ = "restructuredtext en"
17 __docformat__ = "restructuredtext en"
18
18
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20 # Copyright (C) 2008 The IPython Development Team
20 # Copyright (C) 2008 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-------------------------------------------------------------------------------
24 #-------------------------------------------------------------------------------
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Imports
27 # Imports
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29
29
30 from new import instancemethod
30 from new import instancemethod
31 from types import FunctionType
31 from types import FunctionType
32
32
33 from twisted.application import service
33 from twisted.application import service
34 from twisted.internet import defer, reactor
34 from twisted.internet import defer, reactor
35 from twisted.python import log, components, failure
35 from twisted.python import log, components, failure
36 from zope.interface import Interface, implements, Attribute
36 from zope.interface import Interface, implements, Attribute
37
37
38 from IPython.tools import growl
38 from IPython.tools import growl
39 from IPython.kernel.util import printer
39 from IPython.kernel.util import printer
40 from IPython.kernel.twistedutil import gatherBoth
40 from IPython.kernel.twistedutil import gatherBoth
41 from IPython.kernel import map as Map
41 from IPython.kernel import map as Map
42 from IPython.kernel import error
42 from IPython.kernel import error
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 from IPython.kernel.controllerservice import \
44 from IPython.kernel.controllerservice import \
45 ControllerAdapterBase, \
45 ControllerAdapterBase, \
46 ControllerService, \
46 ControllerService, \
47 IControllerBase
47 IControllerBase
48
48
49
49
50 #-------------------------------------------------------------------------------
50 #-------------------------------------------------------------------------------
51 # Interfaces for the MultiEngine representation of a controller
51 # Interfaces for the MultiEngine representation of a controller
52 #-------------------------------------------------------------------------------
52 #-------------------------------------------------------------------------------
53
53
54 class IEngineMultiplexer(Interface):
54 class IEngineMultiplexer(Interface):
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56
56
57 This class simply acts as a multiplexer of methods that are in the
57 This class simply acts as a multiplexer of methods that are in the
58 various IEngines* interfaces. Thus the methods here are jut like those
58 various IEngines* interfaces. Thus the methods here are jut like those
59 in the IEngine* interfaces, but with an extra first argument, targets.
59 in the IEngine* interfaces, but with an extra first argument, targets.
60 The targets argument can have the following forms:
60 The targets argument can have the following forms:
61
61
62 * targets = 10 # Engines are indexed by ints
62 * targets = 10 # Engines are indexed by ints
63 * targets = [0,1,2,3] # A list of ints
63 * targets = [0,1,2,3] # A list of ints
64 * targets = 'all' # A string to indicate all targets
64 * targets = 'all' # A string to indicate all targets
65
65
66 If targets is bad in any way, an InvalidEngineID will be raised. This
66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 includes engines not being registered.
67 includes engines not being registered.
68
68
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 with length equal to the number of targets. The elements of the list will
70 with length equal to the number of targets. The elements of the list will
71 correspond to the return of the corresponding IEngine method.
71 correspond to the return of the corresponding IEngine method.
72
72
73 Failures are aggressive, meaning that if an action fails for any target,
73 Failures are aggressive, meaning that if an action fails for any target,
74 the overall action will fail immediately with that Failure.
74 the overall action will fail immediately with that Failure.
75
75
76 :Parameters:
76 :Parameters:
77 targets : int, list of ints, or 'all'
77 targets : int, list of ints, or 'all'
78 Engine ids the action will apply to.
78 Engine ids the action will apply to.
79
79
80 :Returns: Deferred to a list of results for each engine.
80 :Returns: Deferred to a list of results for each engine.
81
81
82 :Exception:
82 :Exception:
83 InvalidEngineID
83 InvalidEngineID
84 If the targets argument is bad or engines aren't registered.
84 If the targets argument is bad or engines aren't registered.
85 NoEnginesRegistered
85 NoEnginesRegistered
86 If there are no engines registered and targets='all'
86 If there are no engines registered and targets='all'
87 """
87 """
88
88
89 #---------------------------------------------------------------------------
89 #---------------------------------------------------------------------------
90 # Mutiplexed methods
90 # Mutiplexed methods
91 #---------------------------------------------------------------------------
91 #---------------------------------------------------------------------------
92
92
93 def execute(lines, targets='all'):
93 def execute(lines, targets='all'):
94 """Execute lines of Python code on targets.
94 """Execute lines of Python code on targets.
95
95
96 See the class docstring for information about targets and possible
96 See the class docstring for information about targets and possible
97 exceptions this method can raise.
97 exceptions this method can raise.
98
98
99 :Parameters:
99 :Parameters:
100 lines : str
100 lines : str
101 String of python code to be executed on targets.
101 String of python code to be executed on targets.
102 """
102 """
103
103
104 def push(namespace, targets='all'):
104 def push(namespace, targets='all'):
105 """Push dict namespace into the user's namespace on targets.
105 """Push dict namespace into the user's namespace on targets.
106
106
107 See the class docstring for information about targets and possible
107 See the class docstring for information about targets and possible
108 exceptions this method can raise.
108 exceptions this method can raise.
109
109
110 :Parameters:
110 :Parameters:
111 namspace : dict
111 namspace : dict
112 Dict of key value pairs to be put into the users namspace.
112 Dict of key value pairs to be put into the users namspace.
113 """
113 """
114
114
115 def pull(keys, targets='all'):
115 def pull(keys, targets='all'):
116 """Pull values out of the user's namespace on targets by keys.
116 """Pull values out of the user's namespace on targets by keys.
117
117
118 See the class docstring for information about targets and possible
118 See the class docstring for information about targets and possible
119 exceptions this method can raise.
119 exceptions this method can raise.
120
120
121 :Parameters:
121 :Parameters:
122 keys : tuple of strings
122 keys : tuple of strings
123 Sequence of keys to be pulled from user's namespace.
123 Sequence of keys to be pulled from user's namespace.
124 """
124 """
125
125
126 def push_function(namespace, targets='all'):
126 def push_function(namespace, targets='all'):
127 """"""
127 """"""
128
128
129 def pull_function(keys, targets='all'):
129 def pull_function(keys, targets='all'):
130 """"""
130 """"""
131
131
132 def get_result(i=None, targets='all'):
132 def get_result(i=None, targets='all'):
133 """Get the result for command i from targets.
133 """Get the result for command i from targets.
134
134
135 See the class docstring for information about targets and possible
135 See the class docstring for information about targets and possible
136 exceptions this method can raise.
136 exceptions this method can raise.
137
137
138 :Parameters:
138 :Parameters:
139 i : int or None
139 i : int or None
140 Command index or None to indicate most recent command.
140 Command index or None to indicate most recent command.
141 """
141 """
142
142
143 def reset(targets='all'):
143 def reset(targets='all'):
144 """Reset targets.
144 """Reset targets.
145
145
146 This clears the users namespace of the Engines, but won't cause
146 This clears the users namespace of the Engines, but won't cause
147 modules to be reloaded.
147 modules to be reloaded.
148 """
148 """
149
149
150 def keys(targets='all'):
150 def keys(targets='all'):
151 """Get variable names defined in user's namespace on targets."""
151 """Get variable names defined in user's namespace on targets."""
152
152
153 def kill(controller=False, targets='all'):
153 def kill(controller=False, targets='all'):
154 """Kill the targets Engines and possibly the controller.
154 """Kill the targets Engines and possibly the controller.
155
155
156 :Parameters:
156 :Parameters:
157 controller : boolean
157 controller : boolean
158 Should the controller be killed as well. If so all the
158 Should the controller be killed as well. If so all the
159 engines will be killed first no matter what targets is.
159 engines will be killed first no matter what targets is.
160 """
160 """
161
161
162 def push_serialized(namespace, targets='all'):
162 def push_serialized(namespace, targets='all'):
163 """Push a namespace of Serialized objects to targets.
163 """Push a namespace of Serialized objects to targets.
164
164
165 :Parameters:
165 :Parameters:
166 namespace : dict
166 namespace : dict
167 A dict whose keys are the variable names and whose values
167 A dict whose keys are the variable names and whose values
168 are serialized version of the objects.
168 are serialized version of the objects.
169 """
169 """
170
170
171 def pull_serialized(keys, targets='all'):
171 def pull_serialized(keys, targets='all'):
172 """Pull Serialized objects by keys from targets.
172 """Pull Serialized objects by keys from targets.
173
173
174 :Parameters:
174 :Parameters:
175 keys : tuple of strings
175 keys : tuple of strings
176 Sequence of variable names to pull as serialized objects.
176 Sequence of variable names to pull as serialized objects.
177 """
177 """
178
178
179 def clear_queue(targets='all'):
179 def clear_queue(targets='all'):
180 """Clear the queue of pending command for targets."""
180 """Clear the queue of pending command for targets."""
181
181
182 def queue_status(targets='all'):
182 def queue_status(targets='all'):
183 """Get the status of the queue on the targets."""
183 """Get the status of the queue on the targets."""
184
184
185 def set_properties(properties, targets='all'):
185 def set_properties(properties, targets='all'):
186 """set properties by key and value"""
186 """set properties by key and value"""
187
187
188 def get_properties(keys=None, targets='all'):
188 def get_properties(keys=None, targets='all'):
189 """get a list of properties by `keys`, if no keys specified, get all"""
189 """get a list of properties by `keys`, if no keys specified, get all"""
190
190
191 def del_properties(keys, targets='all'):
191 def del_properties(keys, targets='all'):
192 """delete properties by `keys`"""
192 """delete properties by `keys`"""
193
193
194 def has_properties(keys, targets='all'):
194 def has_properties(keys, targets='all'):
195 """get a list of bool values for whether `properties` has `keys`"""
195 """get a list of bool values for whether `properties` has `keys`"""
196
196
197 def clear_properties(targets='all'):
197 def clear_properties(targets='all'):
198 """clear the properties dict"""
198 """clear the properties dict"""
199
199
200
200
201 class IMultiEngine(IEngineMultiplexer):
201 class IMultiEngine(IEngineMultiplexer):
202 """A controller that exposes an explicit interface to all of its engines.
202 """A controller that exposes an explicit interface to all of its engines.
203
203
204 This is the primary inteface for interactive usage.
204 This is the primary inteface for interactive usage.
205 """
205 """
206
206
207 def get_ids():
207 def get_ids():
208 """Return list of currently registered ids.
208 """Return list of currently registered ids.
209
209
210 :Returns: A Deferred to a list of registered engine ids.
210 :Returns: A Deferred to a list of registered engine ids.
211 """
211 """
212
212
213
213
214
214
215 #-------------------------------------------------------------------------------
215 #-------------------------------------------------------------------------------
216 # Implementation of the core MultiEngine classes
216 # Implementation of the core MultiEngine classes
217 #-------------------------------------------------------------------------------
217 #-------------------------------------------------------------------------------
218
218
219 class MultiEngine(ControllerAdapterBase):
219 class MultiEngine(ControllerAdapterBase):
220 """The representation of a ControllerService as a IMultiEngine.
220 """The representation of a ControllerService as a IMultiEngine.
221
221
222 Although it is not implemented currently, this class would be where a
222 Although it is not implemented currently, this class would be where a
223 client/notification API is implemented. It could inherit from something
223 client/notification API is implemented. It could inherit from something
224 like results.NotifierParent and then use the notify method to send
224 like results.NotifierParent and then use the notify method to send
225 notifications.
225 notifications.
226 """
226 """
227
227
228 implements(IMultiEngine)
228 implements(IMultiEngine)
229
229
230 def __init(self, controller):
230 def __init(self, controller):
231 ControllerAdapterBase.__init__(self, controller)
231 ControllerAdapterBase.__init__(self, controller)
232
232
233 #---------------------------------------------------------------------------
233 #---------------------------------------------------------------------------
234 # Helper methods
234 # Helper methods
235 #---------------------------------------------------------------------------
235 #---------------------------------------------------------------------------
236
236
237 def engineList(self, targets):
237 def engineList(self, targets):
238 """Parse the targets argument into a list of valid engine objects.
238 """Parse the targets argument into a list of valid engine objects.
239
239
240 :Parameters:
240 :Parameters:
241 targets : int, list of ints or 'all'
241 targets : int, list of ints or 'all'
242 The targets argument to be parsed.
242 The targets argument to be parsed.
243
243
244 :Returns: List of engine objects.
244 :Returns: List of engine objects.
245
245
246 :Exception:
246 :Exception:
247 InvalidEngineID
247 InvalidEngineID
248 If targets is not valid or if an engine is not registered.
248 If targets is not valid or if an engine is not registered.
249 """
249 """
250 if isinstance(targets, int):
250 if isinstance(targets, int):
251 if targets not in self.engines.keys():
251 if targets not in self.engines.keys():
252 log.msg("Engine with id %i is not registered" % targets)
252 log.msg("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 else:
254 else:
255 return [self.engines[targets]]
255 return [self.engines[targets]]
256 elif isinstance(targets, (list, tuple)):
256 elif isinstance(targets, (list, tuple)):
257 for id in targets:
257 for id in targets:
258 if id not in self.engines.keys():
258 if id not in self.engines.keys():
259 log.msg("Engine with id %r is not registered" % id)
259 log.msg("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 return map(self.engines.get, targets)
261 return map(self.engines.get, targets)
262 elif targets == 'all':
262 elif targets == 'all':
263 eList = self.engines.values()
263 eList = self.engines.values()
264 if len(eList) == 0:
264 if len(eList) == 0:
265 msg = """There are no engines registered.
265 msg = """There are no engines registered.
266 Check the logs in ~/.ipython/log if you think there should have been."""
266 Check the logs in ~/.ipython/log if you think there should have been."""
267 raise error.NoEnginesRegistered(msg)
267 raise error.NoEnginesRegistered(msg)
268 else:
268 else:
269 return eList
269 return eList
270 else:
270 else:
271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272
272
273 def _performOnEngines(self, methodName, *args, **kwargs):
273 def _performOnEngines(self, methodName, *args, **kwargs):
274 """Calls a method on engines and returns deferred to list of results.
274 """Calls a method on engines and returns deferred to list of results.
275
275
276 :Parameters:
276 :Parameters:
277 methodName : str
277 methodName : str
278 Name of the method to be called.
278 Name of the method to be called.
279 targets : int, list of ints, 'all'
279 targets : int, list of ints, 'all'
280 The targets argument to be parsed into a list of engine objects.
280 The targets argument to be parsed into a list of engine objects.
281 args
281 args
282 The positional keyword arguments to be passed to the engines.
282 The positional keyword arguments to be passed to the engines.
283 kwargs
283 kwargs
284 The keyword arguments passed to the method
284 The keyword arguments passed to the method
285
285
286 :Returns: List of deferreds to the results on each engine
286 :Returns: List of deferreds to the results on each engine
287
287
288 :Exception:
288 :Exception:
289 InvalidEngineID
289 InvalidEngineID
290 If the targets argument is bad in any way.
290 If the targets argument is bad in any way.
291 AttributeError
291 AttributeError
292 If the method doesn't exist on one of the engines.
292 If the method doesn't exist on one of the engines.
293 """
293 """
294 targets = kwargs.pop('targets')
294 targets = kwargs.pop('targets')
295 log.msg("Performing %s on %r" % (methodName, targets))
295 log.msg("Performing %s on %r" % (methodName, targets))
296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 # This will and should raise if targets is not valid!
297 # This will and should raise if targets is not valid!
298 engines = self.engineList(targets)
298 engines = self.engineList(targets)
299 dList = []
299 dList = []
300 for e in engines:
300 for e in engines:
301 meth = getattr(e, methodName, None)
301 meth = getattr(e, methodName, None)
302 if meth is not None:
302 if meth is not None:
303 dList.append(meth(*args, **kwargs))
303 dList.append(meth(*args, **kwargs))
304 else:
304 else:
305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 return dList
306 return dList
307
307
308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 """Called _performOnEngines and wraps result/exception into deferred."""
309 """Called _performOnEngines and wraps result/exception into deferred."""
310 try:
310 try:
311 dList = self._performOnEngines(methodName, *args, **kwargs)
311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 return defer.fail(failure.Failure())
313 return defer.fail(failure.Failure())
314 else:
314 else:
315 # Having fireOnOneErrback is causing problems with the determinacy
315 # Having fireOnOneErrback is causing problems with the determinacy
316 # of the system. Basically, once a single engine has errbacked, this
316 # of the system. Basically, once a single engine has errbacked, this
317 # method returns. In some cases, this will cause client to submit
317 # method returns. In some cases, this will cause client to submit
318 # another command. Because the previous command is still running
318 # another command. Because the previous command is still running
319 # on some engines, this command will be queued. When those commands
319 # on some engines, this command will be queued. When those commands
320 # then errback, the second command will raise QueueCleared. Ahhh!
320 # then errback, the second command will raise QueueCleared. Ahhh!
321 d = gatherBoth(dList,
321 d = gatherBoth(dList,
322 fireOnOneErrback=0,
322 fireOnOneErrback=0,
323 consumeErrors=1,
323 consumeErrors=1,
324 logErrors=0)
324 logErrors=0)
325 d.addCallback(error.collect_exceptions, methodName)
325 d.addCallback(error.collect_exceptions, methodName)
326 return d
326 return d
327
327
328 #---------------------------------------------------------------------------
328 #---------------------------------------------------------------------------
329 # General IMultiEngine methods
329 # General IMultiEngine methods
330 #---------------------------------------------------------------------------
330 #---------------------------------------------------------------------------
331
331
332 def get_ids(self):
332 def get_ids(self):
333 return defer.succeed(self.engines.keys())
333 return defer.succeed(self.engines.keys())
334
334
335 #---------------------------------------------------------------------------
335 #---------------------------------------------------------------------------
336 # IEngineMultiplexer methods
336 # IEngineMultiplexer methods
337 #---------------------------------------------------------------------------
337 #---------------------------------------------------------------------------
338
338
339 def execute(self, lines, targets='all'):
339 def execute(self, lines, targets='all'):
340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341
341
342 def push(self, ns, targets='all'):
342 def push(self, ns, targets='all'):
343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344
344
345 def pull(self, keys, targets='all'):
345 def pull(self, keys, targets='all'):
346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347
347
348 def push_function(self, ns, targets='all'):
348 def push_function(self, ns, targets='all'):
349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350
350
351 def pull_function(self, keys, targets='all'):
351 def pull_function(self, keys, targets='all'):
352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353
353
354 def get_result(self, i=None, targets='all'):
354 def get_result(self, i=None, targets='all'):
355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356
356
357 def reset(self, targets='all'):
357 def reset(self, targets='all'):
358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359
359
360 def keys(self, targets='all'):
360 def keys(self, targets='all'):
361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362
362
363 def kill(self, controller=False, targets='all'):
363 def kill(self, controller=False, targets='all'):
364 if controller:
364 if controller:
365 targets = 'all'
365 targets = 'all'
366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 if controller:
367 if controller:
368 log.msg("Killing controller")
368 log.msg("Killing controller")
369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 # Consume any weird stuff coming back
370 # Consume any weird stuff coming back
371 d.addBoth(lambda _: None)
371 d.addBoth(lambda _: None)
372 return d
372 return d
373
373
374 def push_serialized(self, namespace, targets='all'):
374 def push_serialized(self, namespace, targets='all'):
375 for k, v in namespace.iteritems():
375 for k, v in namespace.iteritems():
376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 return d
378 return d
379
379
380 def pull_serialized(self, keys, targets='all'):
380 def pull_serialized(self, keys, targets='all'):
381 try:
381 try:
382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 return defer.fail(failure.Failure())
384 return defer.fail(failure.Failure())
385 else:
385 else:
386 for d in dList:
386 for d in dList:
387 d.addCallback(self._logSizes)
387 d.addCallback(self._logSizes)
388 d = gatherBoth(dList,
388 d = gatherBoth(dList,
389 fireOnOneErrback=0,
389 fireOnOneErrback=0,
390 consumeErrors=1,
390 consumeErrors=1,
391 logErrors=0)
391 logErrors=0)
392 d.addCallback(error.collect_exceptions, 'pull_serialized')
392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 return d
393 return d
394
394
395 def _logSizes(self, listOfSerialized):
395 def _logSizes(self, listOfSerialized):
396 if isinstance(listOfSerialized, (list, tuple)):
396 if isinstance(listOfSerialized, (list, tuple)):
397 for s in listOfSerialized:
397 for s in listOfSerialized:
398 log.msg("Pulled object is %f MB" % s.getDataSize())
398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 else:
399 else:
400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 return listOfSerialized
401 return listOfSerialized
402
402
403 def clear_queue(self, targets='all'):
403 def clear_queue(self, targets='all'):
404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405
405
406 def queue_status(self, targets='all'):
406 def queue_status(self, targets='all'):
407 log.msg("Getting queue status on %r" % targets)
407 log.msg("Getting queue status on %r" % targets)
408 try:
408 try:
409 engines = self.engineList(targets)
409 engines = self.engineList(targets)
410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 return defer.fail(failure.Failure())
411 return defer.fail(failure.Failure())
412 else:
412 else:
413 dList = []
413 dList = []
414 for e in engines:
414 for e in engines:
415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 d = gatherBoth(dList,
416 d = gatherBoth(dList,
417 fireOnOneErrback=0,
417 fireOnOneErrback=0,
418 consumeErrors=1,
418 consumeErrors=1,
419 logErrors=0)
419 logErrors=0)
420 d.addCallback(error.collect_exceptions, 'queue_status')
420 d.addCallback(error.collect_exceptions, 'queue_status')
421 return d
421 return d
422
422
423 def get_properties(self, keys=None, targets='all'):
423 def get_properties(self, keys=None, targets='all'):
424 log.msg("Getting properties on %r" % targets)
424 log.msg("Getting properties on %r" % targets)
425 try:
425 try:
426 engines = self.engineList(targets)
426 engines = self.engineList(targets)
427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 return defer.fail(failure.Failure())
428 return defer.fail(failure.Failure())
429 else:
429 else:
430 dList = [e.get_properties(keys) for e in engines]
430 dList = [e.get_properties(keys) for e in engines]
431 d = gatherBoth(dList,
431 d = gatherBoth(dList,
432 fireOnOneErrback=0,
432 fireOnOneErrback=0,
433 consumeErrors=1,
433 consumeErrors=1,
434 logErrors=0)
434 logErrors=0)
435 d.addCallback(error.collect_exceptions, 'get_properties')
435 d.addCallback(error.collect_exceptions, 'get_properties')
436 return d
436 return d
437
437
438 def set_properties(self, properties, targets='all'):
438 def set_properties(self, properties, targets='all'):
439 log.msg("Setting properties on %r" % targets)
439 log.msg("Setting properties on %r" % targets)
440 try:
440 try:
441 engines = self.engineList(targets)
441 engines = self.engineList(targets)
442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 return defer.fail(failure.Failure())
443 return defer.fail(failure.Failure())
444 else:
444 else:
445 dList = [e.set_properties(properties) for e in engines]
445 dList = [e.set_properties(properties) for e in engines]
446 d = gatherBoth(dList,
446 d = gatherBoth(dList,
447 fireOnOneErrback=0,
447 fireOnOneErrback=0,
448 consumeErrors=1,
448 consumeErrors=1,
449 logErrors=0)
449 logErrors=0)
450 d.addCallback(error.collect_exceptions, 'set_properties')
450 d.addCallback(error.collect_exceptions, 'set_properties')
451 return d
451 return d
452
452
453 def has_properties(self, keys, targets='all'):
453 def has_properties(self, keys, targets='all'):
454 log.msg("Checking properties on %r" % targets)
454 log.msg("Checking properties on %r" % targets)
455 try:
455 try:
456 engines = self.engineList(targets)
456 engines = self.engineList(targets)
457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 return defer.fail(failure.Failure())
458 return defer.fail(failure.Failure())
459 else:
459 else:
460 dList = [e.has_properties(keys) for e in engines]
460 dList = [e.has_properties(keys) for e in engines]
461 d = gatherBoth(dList,
461 d = gatherBoth(dList,
462 fireOnOneErrback=0,
462 fireOnOneErrback=0,
463 consumeErrors=1,
463 consumeErrors=1,
464 logErrors=0)
464 logErrors=0)
465 d.addCallback(error.collect_exceptions, 'has_properties')
465 d.addCallback(error.collect_exceptions, 'has_properties')
466 return d
466 return d
467
467
468 def del_properties(self, keys, targets='all'):
468 def del_properties(self, keys, targets='all'):
469 log.msg("Deleting properties on %r" % targets)
469 log.msg("Deleting properties on %r" % targets)
470 try:
470 try:
471 engines = self.engineList(targets)
471 engines = self.engineList(targets)
472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 return defer.fail(failure.Failure())
473 return defer.fail(failure.Failure())
474 else:
474 else:
475 dList = [e.del_properties(keys) for e in engines]
475 dList = [e.del_properties(keys) for e in engines]
476 d = gatherBoth(dList,
476 d = gatherBoth(dList,
477 fireOnOneErrback=0,
477 fireOnOneErrback=0,
478 consumeErrors=1,
478 consumeErrors=1,
479 logErrors=0)
479 logErrors=0)
480 d.addCallback(error.collect_exceptions, 'del_properties')
480 d.addCallback(error.collect_exceptions, 'del_properties')
481 return d
481 return d
482
482
483 def clear_properties(self, targets='all'):
483 def clear_properties(self, targets='all'):
484 log.msg("Clearing properties on %r" % targets)
484 log.msg("Clearing properties on %r" % targets)
485 try:
485 try:
486 engines = self.engineList(targets)
486 engines = self.engineList(targets)
487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 return defer.fail(failure.Failure())
488 return defer.fail(failure.Failure())
489 else:
489 else:
490 dList = [e.clear_properties() for e in engines]
490 dList = [e.clear_properties() for e in engines]
491 d = gatherBoth(dList,
491 d = gatherBoth(dList,
492 fireOnOneErrback=0,
492 fireOnOneErrback=0,
493 consumeErrors=1,
493 consumeErrors=1,
494 logErrors=0)
494 logErrors=0)
495 d.addCallback(error.collect_exceptions, 'clear_properties')
495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 return d
496 return d
497
497
498
498
499 components.registerAdapter(MultiEngine,
499 components.registerAdapter(MultiEngine,
500 IControllerBase,
500 IControllerBase,
501 IMultiEngine)
501 IMultiEngine)
502
502
503
503
504 #-------------------------------------------------------------------------------
504 #-------------------------------------------------------------------------------
505 # Interfaces for the Synchronous MultiEngine
505 # Interfaces for the Synchronous MultiEngine
506 #-------------------------------------------------------------------------------
506 #-------------------------------------------------------------------------------
507
507
508 class ISynchronousEngineMultiplexer(Interface):
508 class ISynchronousEngineMultiplexer(Interface):
509 pass
509 pass
510
510
511
511
512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 """Synchronous, two-phase version of IMultiEngine.
513 """Synchronous, two-phase version of IMultiEngine.
514
514
515 Methods in this interface are identical to those of IMultiEngine, but they
515 Methods in this interface are identical to those of IMultiEngine, but they
516 take one additional argument:
516 take one additional argument:
517
517
518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519
519
520 :Parameters:
520 :Parameters:
521 block : boolean
521 block : boolean
522 Should the method return a deferred to a deferredID or the
522 Should the method return a deferred to a deferredID or the
523 actual result. If block=False a deferred to a deferredID is
523 actual result. If block=False a deferred to a deferredID is
524 returned and the user must call `get_pending_deferred` at a later
524 returned and the user must call `get_pending_deferred` at a later
525 point. If block=True, a deferred to the actual result comes back.
525 point. If block=True, a deferred to the actual result comes back.
526 """
526 """
527 def get_pending_deferred(deferredID, block=True):
527 def get_pending_deferred(deferredID, block=True):
528 """"""
528 """"""
529
529
530 def clear_pending_deferreds():
530 def clear_pending_deferreds():
531 """"""
531 """"""
532
532
533
533
534 #-------------------------------------------------------------------------------
534 #-------------------------------------------------------------------------------
535 # Implementation of the Synchronous MultiEngine
535 # Implementation of the Synchronous MultiEngine
536 #-------------------------------------------------------------------------------
536 #-------------------------------------------------------------------------------
537
537
538 class SynchronousMultiEngine(PendingDeferredManager):
538 class SynchronousMultiEngine(PendingDeferredManager):
539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540
540
541 Warning, this class uses a decorator that currently uses **kwargs.
541 Warning, this class uses a decorator that currently uses **kwargs.
542 Because of this block must be passed as a kwarg, not positionally.
542 Because of this block must be passed as a kwarg, not positionally.
543 """
543 """
544
544
545 implements(ISynchronousMultiEngine)
545 implements(ISynchronousMultiEngine)
546
546
547 def __init__(self, multiengine):
547 def __init__(self, multiengine):
548 self.multiengine = multiengine
548 self.multiengine = multiengine
549 PendingDeferredManager.__init__(self)
549 PendingDeferredManager.__init__(self)
550
550
551 #---------------------------------------------------------------------------
551 #---------------------------------------------------------------------------
552 # Decorated pending deferred methods
552 # Decorated pending deferred methods
553 #---------------------------------------------------------------------------
553 #---------------------------------------------------------------------------
554
554
555 @two_phase
555 @two_phase
556 def execute(self, lines, targets='all'):
556 def execute(self, lines, targets='all'):
557 d = self.multiengine.execute(lines, targets)
557 d = self.multiengine.execute(lines, targets)
558 return d
558 return d
559
559
560 @two_phase
560 @two_phase
561 def push(self, namespace, targets='all'):
561 def push(self, namespace, targets='all'):
562 return self.multiengine.push(namespace, targets)
562 return self.multiengine.push(namespace, targets)
563
563
564 @two_phase
564 @two_phase
565 def pull(self, keys, targets='all'):
565 def pull(self, keys, targets='all'):
566 d = self.multiengine.pull(keys, targets)
566 d = self.multiengine.pull(keys, targets)
567 return d
567 return d
568
568
569 @two_phase
569 @two_phase
570 def push_function(self, namespace, targets='all'):
570 def push_function(self, namespace, targets='all'):
571 return self.multiengine.push_function(namespace, targets)
571 return self.multiengine.push_function(namespace, targets)
572
572
573 @two_phase
573 @two_phase
574 def pull_function(self, keys, targets='all'):
574 def pull_function(self, keys, targets='all'):
575 d = self.multiengine.pull_function(keys, targets)
575 d = self.multiengine.pull_function(keys, targets)
576 return d
576 return d
577
577
578 @two_phase
578 @two_phase
579 def get_result(self, i=None, targets='all'):
579 def get_result(self, i=None, targets='all'):
580 return self.multiengine.get_result(i, targets='all')
580 return self.multiengine.get_result(i, targets='all')
581
581
582 @two_phase
582 @two_phase
583 def reset(self, targets='all'):
583 def reset(self, targets='all'):
584 return self.multiengine.reset(targets)
584 return self.multiengine.reset(targets)
585
585
586 @two_phase
586 @two_phase
587 def keys(self, targets='all'):
587 def keys(self, targets='all'):
588 return self.multiengine.keys(targets)
588 return self.multiengine.keys(targets)
589
589
590 @two_phase
590 @two_phase
591 def kill(self, controller=False, targets='all'):
591 def kill(self, controller=False, targets='all'):
592 return self.multiengine.kill(controller, targets)
592 return self.multiengine.kill(controller, targets)
593
593
594 @two_phase
594 @two_phase
595 def push_serialized(self, namespace, targets='all'):
595 def push_serialized(self, namespace, targets='all'):
596 return self.multiengine.push_serialized(namespace, targets)
596 return self.multiengine.push_serialized(namespace, targets)
597
597
598 @two_phase
598 @two_phase
599 def pull_serialized(self, keys, targets='all'):
599 def pull_serialized(self, keys, targets='all'):
600 return self.multiengine.pull_serialized(keys, targets)
600 return self.multiengine.pull_serialized(keys, targets)
601
601
602 @two_phase
602 @two_phase
603 def clear_queue(self, targets='all'):
603 def clear_queue(self, targets='all'):
604 return self.multiengine.clear_queue(targets)
604 return self.multiengine.clear_queue(targets)
605
605
606 @two_phase
606 @two_phase
607 def queue_status(self, targets='all'):
607 def queue_status(self, targets='all'):
608 return self.multiengine.queue_status(targets)
608 return self.multiengine.queue_status(targets)
609
609
610 @two_phase
610 @two_phase
611 def set_properties(self, properties, targets='all'):
611 def set_properties(self, properties, targets='all'):
612 return self.multiengine.set_properties(properties, targets)
612 return self.multiengine.set_properties(properties, targets)
613
613
614 @two_phase
614 @two_phase
615 def get_properties(self, keys=None, targets='all'):
615 def get_properties(self, keys=None, targets='all'):
616 return self.multiengine.get_properties(keys, targets)
616 return self.multiengine.get_properties(keys, targets)
617
617
618 @two_phase
618 @two_phase
619 def has_properties(self, keys, targets='all'):
619 def has_properties(self, keys, targets='all'):
620 return self.multiengine.has_properties(keys, targets)
620 return self.multiengine.has_properties(keys, targets)
621
621
622 @two_phase
622 @two_phase
623 def del_properties(self, keys, targets='all'):
623 def del_properties(self, keys, targets='all'):
624 return self.multiengine.del_properties(keys, targets)
624 return self.multiengine.del_properties(keys, targets)
625
625
626 @two_phase
626 @two_phase
627 def clear_properties(self, targets='all'):
627 def clear_properties(self, targets='all'):
628 return self.multiengine.clear_properties(targets)
628 return self.multiengine.clear_properties(targets)
629
629
630 #---------------------------------------------------------------------------
630 #---------------------------------------------------------------------------
631 # IMultiEngine methods
631 # IMultiEngine methods
632 #---------------------------------------------------------------------------
632 #---------------------------------------------------------------------------
633
633
634 def get_ids(self):
634 def get_ids(self):
635 """Return a list of registered engine ids.
635 """Return a list of registered engine ids.
636
636
637 Never use the two phase block/non-block stuff for this.
637 Never use the two phase block/non-block stuff for this.
638 """
638 """
639 return self.multiengine.get_ids()
639 return self.multiengine.get_ids()
640
640
641
641
642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643
643
644
644
645 #-------------------------------------------------------------------------------
645 #-------------------------------------------------------------------------------
646 # Various high-level interfaces that can be used as MultiEngine mix-ins
646 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 #-------------------------------------------------------------------------------
647 #-------------------------------------------------------------------------------
648
648
649 #-------------------------------------------------------------------------------
649 #-------------------------------------------------------------------------------
650 # IMultiEngineCoordinator
650 # IMultiEngineCoordinator
651 #-------------------------------------------------------------------------------
651 #-------------------------------------------------------------------------------
652
652
653 class IMultiEngineCoordinator(Interface):
653 class IMultiEngineCoordinator(Interface):
654 """Methods that work on multiple engines explicitly."""
654 """Methods that work on multiple engines explicitly."""
655
655
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 """Partition and distribute a sequence to targets."""
657 """Partition and distribute a sequence to targets."""
658
658
659 def gather(key, dist='b', targets='all'):
659 def gather(key, dist='b', targets='all'):
660 """Gather object key from targets."""
660 """Gather object key from targets."""
661
661
662 def raw_map(func, seqs, dist='b', targets='all'):
662 def raw_map(func, seqs, dist='b', targets='all'):
663 """
663 """
664 A parallelized version of Python's builtin `map` function.
664 A parallelized version of Python's builtin `map` function.
665
665
666 This has a slightly different syntax than the builtin `map`.
666 This has a slightly different syntax than the builtin `map`.
667 This is needed because we need to have keyword arguments and thus
667 This is needed because we need to have keyword arguments and thus
668 can't use *args to capture all the sequences. Instead, they must
668 can't use *args to capture all the sequences. Instead, they must
669 be passed in a list or tuple.
669 be passed in a list or tuple.
670
670
671 The equivalence is:
671 The equivalence is:
672
672
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674
674
675 Most users will want to use parallel functions or the `mapper`
675 Most users will want to use parallel functions or the `mapper`
676 and `map` methods for an API that follows that of the builtin
676 and `map` methods for an API that follows that of the builtin
677 `map`.
677 `map`.
678 """
678 """
679
679
680
680
681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 """Methods that work on multiple engines explicitly."""
682 """Methods that work on multiple engines explicitly."""
683
683
684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 """Partition and distribute a sequence to targets."""
685 """Partition and distribute a sequence to targets."""
686
686
687 def gather(key, dist='b', targets='all', block=True):
687 def gather(key, dist='b', targets='all', block=True):
688 """Gather object key from targets"""
688 """Gather object key from targets"""
689
689
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 """
691 """
692 A parallelized version of Python's builtin map.
692 A parallelized version of Python's builtin map.
693
693
694 This has a slightly different syntax than the builtin `map`.
694 This has a slightly different syntax than the builtin `map`.
695 This is needed because we need to have keyword arguments and thus
695 This is needed because we need to have keyword arguments and thus
696 can't use *args to capture all the sequences. Instead, they must
696 can't use *args to capture all the sequences. Instead, they must
697 be passed in a list or tuple.
697 be passed in a list or tuple.
698
698
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700
700
701 Most users will want to use parallel functions or the `mapper`
701 Most users will want to use parallel functions or the `mapper`
702 and `map` methods for an API that follows that of the builtin
702 and `map` methods for an API that follows that of the builtin
703 `map`.
703 `map`.
704 """
704 """
705
705
706
706
707 #-------------------------------------------------------------------------------
707 #-------------------------------------------------------------------------------
708 # IMultiEngineExtras
708 # IMultiEngineExtras
709 #-------------------------------------------------------------------------------
709 #-------------------------------------------------------------------------------
710
710
711 class IMultiEngineExtras(Interface):
711 class IMultiEngineExtras(Interface):
712
712
713 def zip_pull(targets, keys):
713 def zip_pull(targets, keys):
714 """
714 """
715 Pull, but return results in a different format from `pull`.
715 Pull, but return results in a different format from `pull`.
716
716
717 This method basically returns zip(pull(targets, *keys)), with a few
717 This method basically returns zip(pull(targets, *keys)), with a few
718 edge cases handled differently. Users of chainsaw will find this format
718 edge cases handled differently. Users of chainsaw will find this format
719 familiar.
719 familiar.
720 """
720 """
721
721
722 def run(targets, fname):
722 def run(targets, fname):
723 """Run a .py file on targets."""
723 """Run a .py file on targets."""
724
724
725
725
726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 def zip_pull(targets, keys, block=True):
727 def zip_pull(targets, keys, block=True):
728 """
728 """
729 Pull, but return results in a different format from `pull`.
729 Pull, but return results in a different format from `pull`.
730
730
731 This method basically returns zip(pull(targets, *keys)), with a few
731 This method basically returns zip(pull(targets, *keys)), with a few
732 edge cases handled differently. Users of chainsaw will find this format
732 edge cases handled differently. Users of chainsaw will find this format
733 familiar.
733 familiar.
734 """
734 """
735
735
736 def run(targets, fname, block=True):
736 def run(targets, fname, block=True):
737 """Run a .py file on targets."""
737 """Run a .py file on targets."""
738
738
739 #-------------------------------------------------------------------------------
739 #-------------------------------------------------------------------------------
740 # The full MultiEngine interface
740 # The full MultiEngine interface
741 #-------------------------------------------------------------------------------
741 #-------------------------------------------------------------------------------
742
742
743 class IFullMultiEngine(IMultiEngine,
743 class IFullMultiEngine(IMultiEngine,
744 IMultiEngineCoordinator,
744 IMultiEngineCoordinator,
745 IMultiEngineExtras):
745 IMultiEngineExtras):
746 pass
746 pass
747
747
748
748
749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 ISynchronousMultiEngineCoordinator,
750 ISynchronousMultiEngineCoordinator,
751 ISynchronousMultiEngineExtras):
751 ISynchronousMultiEngineExtras):
752 pass
752 pass
753
753
@@ -1,519 +1,521 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
24 from twisted.internet.protocol import ProcessProtocol
24 from twisted.internet.protocol import ProcessProtocol
25 from twisted.python import failure, log
26 from twisted.internet.error import ProcessDone, ProcessTerminated
25 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
26 from twisted.internet.utils import getProcessOutput
27 from twisted.python import failure, log
28
28
29 from IPython.external import argparse
29 from IPython.external import argparse
30 from IPython.external import Itpl
30 from IPython.external import Itpl
31 from IPython.kernel.twistedutil import gatherBoth
32 from IPython.kernel.util import printer
33 from IPython.genutils import get_ipython_dir, num_cpus
31 from IPython.genutils import get_ipython_dir, num_cpus
34 from IPython.kernel.fcutil import have_crypto
32 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.error import SecurityError
33 from IPython.kernel.error import SecurityError
34 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.twistedutil import gatherBoth
36 from IPython.kernel.util import printer
37
36
38
37 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
38 # General process handling code
40 # General process handling code
39 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
40
42
41 def find_exe(cmd):
43 def find_exe(cmd):
42 try:
44 try:
43 import win32api
45 import win32api
44 except ImportError:
46 except ImportError:
45 raise ImportError('you need to have pywin32 installed for this to work')
47 raise ImportError('you need to have pywin32 installed for this to work')
46 else:
48 else:
47 try:
49 try:
48 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
50 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
49 except:
51 except:
50 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
52 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
51 return path
53 return path
52
54
53 class ProcessStateError(Exception):
55 class ProcessStateError(Exception):
54 pass
56 pass
55
57
56 class UnknownStatus(Exception):
58 class UnknownStatus(Exception):
57 pass
59 pass
58
60
59 class LauncherProcessProtocol(ProcessProtocol):
61 class LauncherProcessProtocol(ProcessProtocol):
60 """
62 """
61 A ProcessProtocol to go with the ProcessLauncher.
63 A ProcessProtocol to go with the ProcessLauncher.
62 """
64 """
63 def __init__(self, process_launcher):
65 def __init__(self, process_launcher):
64 self.process_launcher = process_launcher
66 self.process_launcher = process_launcher
65
67
66 def connectionMade(self):
68 def connectionMade(self):
67 self.process_launcher.fire_start_deferred(self.transport.pid)
69 self.process_launcher.fire_start_deferred(self.transport.pid)
68
70
69 def processEnded(self, status):
71 def processEnded(self, status):
70 value = status.value
72 value = status.value
71 if isinstance(value, ProcessDone):
73 if isinstance(value, ProcessDone):
72 self.process_launcher.fire_stop_deferred(0)
74 self.process_launcher.fire_stop_deferred(0)
73 elif isinstance(value, ProcessTerminated):
75 elif isinstance(value, ProcessTerminated):
74 self.process_launcher.fire_stop_deferred(
76 self.process_launcher.fire_stop_deferred(
75 {'exit_code':value.exitCode,
77 {'exit_code':value.exitCode,
76 'signal':value.signal,
78 'signal':value.signal,
77 'status':value.status
79 'status':value.status
78 }
80 }
79 )
81 )
80 else:
82 else:
81 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
83 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
82
84
83 def outReceived(self, data):
85 def outReceived(self, data):
84 log.msg(data)
86 log.msg(data)
85
87
86 def errReceived(self, data):
88 def errReceived(self, data):
87 log.err(data)
89 log.err(data)
88
90
89 class ProcessLauncher(object):
91 class ProcessLauncher(object):
90 """
92 """
91 Start and stop an external process in an asynchronous manner.
93 Start and stop an external process in an asynchronous manner.
92
94
93 Currently this uses deferreds to notify other parties of process state
95 Currently this uses deferreds to notify other parties of process state
94 changes. This is an awkward design and should be moved to using
96 changes. This is an awkward design and should be moved to using
95 a formal NotificationCenter.
97 a formal NotificationCenter.
96 """
98 """
97 def __init__(self, cmd_and_args):
99 def __init__(self, cmd_and_args):
98 self.cmd = cmd_and_args[0]
100 self.cmd = cmd_and_args[0]
99 self.args = cmd_and_args
101 self.args = cmd_and_args
100 self._reset()
102 self._reset()
101
103
102 def _reset(self):
104 def _reset(self):
103 self.process_protocol = None
105 self.process_protocol = None
104 self.pid = None
106 self.pid = None
105 self.start_deferred = None
107 self.start_deferred = None
106 self.stop_deferreds = []
108 self.stop_deferreds = []
107 self.state = 'before' # before, running, or after
109 self.state = 'before' # before, running, or after
108
110
109 @property
111 @property
110 def running(self):
112 def running(self):
111 if self.state == 'running':
113 if self.state == 'running':
112 return True
114 return True
113 else:
115 else:
114 return False
116 return False
115
117
116 def fire_start_deferred(self, pid):
118 def fire_start_deferred(self, pid):
117 self.pid = pid
119 self.pid = pid
118 self.state = 'running'
120 self.state = 'running'
119 log.msg('Process %r has started with pid=%i' % (self.args, pid))
121 log.msg('Process %r has started with pid=%i' % (self.args, pid))
120 self.start_deferred.callback(pid)
122 self.start_deferred.callback(pid)
121
123
122 def start(self):
124 def start(self):
123 if self.state == 'before':
125 if self.state == 'before':
124 self.process_protocol = LauncherProcessProtocol(self)
126 self.process_protocol = LauncherProcessProtocol(self)
125 self.start_deferred = defer.Deferred()
127 self.start_deferred = defer.Deferred()
126 self.process_transport = reactor.spawnProcess(
128 self.process_transport = reactor.spawnProcess(
127 self.process_protocol,
129 self.process_protocol,
128 self.cmd,
130 self.cmd,
129 self.args,
131 self.args,
130 env=os.environ
132 env=os.environ
131 )
133 )
132 return self.start_deferred
134 return self.start_deferred
133 else:
135 else:
134 s = 'the process has already been started and has state: %r' % \
136 s = 'the process has already been started and has state: %r' % \
135 self.state
137 self.state
136 return defer.fail(ProcessStateError(s))
138 return defer.fail(ProcessStateError(s))
137
139
138 def get_stop_deferred(self):
140 def get_stop_deferred(self):
139 if self.state == 'running' or self.state == 'before':
141 if self.state == 'running' or self.state == 'before':
140 d = defer.Deferred()
142 d = defer.Deferred()
141 self.stop_deferreds.append(d)
143 self.stop_deferreds.append(d)
142 return d
144 return d
143 else:
145 else:
144 s = 'this process is already complete'
146 s = 'this process is already complete'
145 return defer.fail(ProcessStateError(s))
147 return defer.fail(ProcessStateError(s))
146
148
147 def fire_stop_deferred(self, exit_code):
149 def fire_stop_deferred(self, exit_code):
148 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
150 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
149 self.state = 'after'
151 self.state = 'after'
150 for d in self.stop_deferreds:
152 for d in self.stop_deferreds:
151 d.callback(exit_code)
153 d.callback(exit_code)
152
154
153 def signal(self, sig):
155 def signal(self, sig):
154 """
156 """
155 Send a signal to the process.
157 Send a signal to the process.
156
158
157 The argument sig can be ('KILL','INT', etc.) or any signal number.
159 The argument sig can be ('KILL','INT', etc.) or any signal number.
158 """
160 """
159 if self.state == 'running':
161 if self.state == 'running':
160 self.process_transport.signalProcess(sig)
162 self.process_transport.signalProcess(sig)
161
163
162 # def __del__(self):
164 # def __del__(self):
163 # self.signal('KILL')
165 # self.signal('KILL')
164
166
165 def interrupt_then_kill(self, delay=1.0):
167 def interrupt_then_kill(self, delay=1.0):
166 self.signal('INT')
168 self.signal('INT')
167 reactor.callLater(delay, self.signal, 'KILL')
169 reactor.callLater(delay, self.signal, 'KILL')
168
170
169
171
170 #-----------------------------------------------------------------------------
172 #-----------------------------------------------------------------------------
171 # Code for launching controller and engines
173 # Code for launching controller and engines
172 #-----------------------------------------------------------------------------
174 #-----------------------------------------------------------------------------
173
175
174
176
175 class ControllerLauncher(ProcessLauncher):
177 class ControllerLauncher(ProcessLauncher):
176
178
177 def __init__(self, extra_args=None):
179 def __init__(self, extra_args=None):
178 if sys.platform == 'win32':
180 if sys.platform == 'win32':
179 # This logic is needed because the ipcontroller script doesn't
181 # This logic is needed because the ipcontroller script doesn't
180 # always get installed in the same way or in the same location.
182 # always get installed in the same way or in the same location.
181 from IPython.kernel.scripts import ipcontroller
183 from IPython.kernel.scripts import ipcontroller
182 script_location = ipcontroller.__file__.replace('.pyc', '.py')
184 script_location = ipcontroller.__file__.replace('.pyc', '.py')
183 # The -u option here turns on unbuffered output, which is required
185 # The -u option here turns on unbuffered output, which is required
184 # on Win32 to prevent wierd conflict and problems with Twisted
186 # on Win32 to prevent wierd conflict and problems with Twisted
185 args = [find_exe('python'), '-u', script_location]
187 args = [find_exe('python'), '-u', script_location]
186 else:
188 else:
187 args = ['ipcontroller']
189 args = ['ipcontroller']
188 self.extra_args = extra_args
190 self.extra_args = extra_args
189 if extra_args is not None:
191 if extra_args is not None:
190 args.extend(extra_args)
192 args.extend(extra_args)
191
193
192 ProcessLauncher.__init__(self, args)
194 ProcessLauncher.__init__(self, args)
193
195
194
196
195 class EngineLauncher(ProcessLauncher):
197 class EngineLauncher(ProcessLauncher):
196
198
197 def __init__(self, extra_args=None):
199 def __init__(self, extra_args=None):
198 if sys.platform == 'win32':
200 if sys.platform == 'win32':
199 # This logic is needed because the ipcontroller script doesn't
201 # This logic is needed because the ipcontroller script doesn't
200 # always get installed in the same way or in the same location.
202 # always get installed in the same way or in the same location.
201 from IPython.kernel.scripts import ipengine
203 from IPython.kernel.scripts import ipengine
202 script_location = ipengine.__file__.replace('.pyc', '.py')
204 script_location = ipengine.__file__.replace('.pyc', '.py')
203 # The -u option here turns on unbuffered output, which is required
205 # The -u option here turns on unbuffered output, which is required
204 # on Win32 to prevent wierd conflict and problems with Twisted
206 # on Win32 to prevent wierd conflict and problems with Twisted
205 args = [find_exe('python'), '-u', script_location]
207 args = [find_exe('python'), '-u', script_location]
206 else:
208 else:
207 args = ['ipengine']
209 args = ['ipengine']
208 self.extra_args = extra_args
210 self.extra_args = extra_args
209 if extra_args is not None:
211 if extra_args is not None:
210 args.extend(extra_args)
212 args.extend(extra_args)
211
213
212 ProcessLauncher.__init__(self, args)
214 ProcessLauncher.__init__(self, args)
213
215
214
216
215 class LocalEngineSet(object):
217 class LocalEngineSet(object):
216
218
217 def __init__(self, extra_args=None):
219 def __init__(self, extra_args=None):
218 self.extra_args = extra_args
220 self.extra_args = extra_args
219 self.launchers = []
221 self.launchers = []
220
222
221 def start(self, n):
223 def start(self, n):
222 dlist = []
224 dlist = []
223 for i in range(n):
225 for i in range(n):
224 el = EngineLauncher(extra_args=self.extra_args)
226 el = EngineLauncher(extra_args=self.extra_args)
225 d = el.start()
227 d = el.start()
226 self.launchers.append(el)
228 self.launchers.append(el)
227 dlist.append(d)
229 dlist.append(d)
228 dfinal = gatherBoth(dlist, consumeErrors=True)
230 dfinal = gatherBoth(dlist, consumeErrors=True)
229 dfinal.addCallback(self._handle_start)
231 dfinal.addCallback(self._handle_start)
230 return dfinal
232 return dfinal
231
233
232 def _handle_start(self, r):
234 def _handle_start(self, r):
233 log.msg('Engines started with pids: %r' % r)
235 log.msg('Engines started with pids: %r' % r)
234 return r
236 return r
235
237
236 def _handle_stop(self, r):
238 def _handle_stop(self, r):
237 log.msg('Engines received signal: %r' % r)
239 log.msg('Engines received signal: %r' % r)
238 return r
240 return r
239
241
240 def signal(self, sig):
242 def signal(self, sig):
241 dlist = []
243 dlist = []
242 for el in self.launchers:
244 for el in self.launchers:
243 d = el.get_stop_deferred()
245 d = el.get_stop_deferred()
244 dlist.append(d)
246 dlist.append(d)
245 el.signal(sig)
247 el.signal(sig)
246 dfinal = gatherBoth(dlist, consumeErrors=True)
248 dfinal = gatherBoth(dlist, consumeErrors=True)
247 dfinal.addCallback(self._handle_stop)
249 dfinal.addCallback(self._handle_stop)
248 return dfinal
250 return dfinal
249
251
250 def interrupt_then_kill(self, delay=1.0):
252 def interrupt_then_kill(self, delay=1.0):
251 dlist = []
253 dlist = []
252 for el in self.launchers:
254 for el in self.launchers:
253 d = el.get_stop_deferred()
255 d = el.get_stop_deferred()
254 dlist.append(d)
256 dlist.append(d)
255 el.interrupt_then_kill(delay)
257 el.interrupt_then_kill(delay)
256 dfinal = gatherBoth(dlist, consumeErrors=True)
258 dfinal = gatherBoth(dlist, consumeErrors=True)
257 dfinal.addCallback(self._handle_stop)
259 dfinal.addCallback(self._handle_stop)
258 return dfinal
260 return dfinal
259
261
260
262
261 class BatchEngineSet(object):
263 class BatchEngineSet(object):
262
264
263 # Subclasses must fill these in. See PBSEngineSet
265 # Subclasses must fill these in. See PBSEngineSet
264 submit_command = ''
266 submit_command = ''
265 delete_command = ''
267 delete_command = ''
266 job_id_regexp = ''
268 job_id_regexp = ''
267
269
268 def __init__(self, template_file, **kwargs):
270 def __init__(self, template_file, **kwargs):
269 self.template_file = template_file
271 self.template_file = template_file
270 self.context = {}
272 self.context = {}
271 self.context.update(kwargs)
273 self.context.update(kwargs)
272 self.batch_file = self.template_file+'-run'
274 self.batch_file = self.template_file+'-run'
273
275
274 def parse_job_id(self, output):
276 def parse_job_id(self, output):
275 m = re.match(self.job_id_regexp, output)
277 m = re.match(self.job_id_regexp, output)
276 if m is not None:
278 if m is not None:
277 job_id = m.group()
279 job_id = m.group()
278 else:
280 else:
279 raise Exception("job id couldn't be determined: %s" % output)
281 raise Exception("job id couldn't be determined: %s" % output)
280 self.job_id = job_id
282 self.job_id = job_id
281 log.msg('Job started with job id: %r' % job_id)
283 log.msg('Job started with job id: %r' % job_id)
282 return job_id
284 return job_id
283
285
284 def write_batch_script(self, n):
286 def write_batch_script(self, n):
285 self.context['n'] = n
287 self.context['n'] = n
286 template = open(self.template_file, 'r').read()
288 template = open(self.template_file, 'r').read()
287 log.msg('Using template for batch script: %s' % self.template_file)
289 log.msg('Using template for batch script: %s' % self.template_file)
288 script_as_string = Itpl.itplns(template, self.context)
290 script_as_string = Itpl.itplns(template, self.context)
289 log.msg('Writing instantiated batch script: %s' % self.batch_file)
291 log.msg('Writing instantiated batch script: %s' % self.batch_file)
290 f = open(self.batch_file,'w')
292 f = open(self.batch_file,'w')
291 f.write(script_as_string)
293 f.write(script_as_string)
292 f.close()
294 f.close()
293
295
294 def handle_error(self, f):
296 def handle_error(self, f):
295 f.printTraceback()
297 f.printTraceback()
296 f.raiseException()
298 f.raiseException()
297
299
298 def start(self, n):
300 def start(self, n):
299 self.write_batch_script(n)
301 self.write_batch_script(n)
300 d = getProcessOutput(self.submit_command,
302 d = getProcessOutput(self.submit_command,
301 [self.batch_file],env=os.environ)
303 [self.batch_file],env=os.environ)
302 d.addCallback(self.parse_job_id)
304 d.addCallback(self.parse_job_id)
303 d.addErrback(self.handle_error)
305 d.addErrback(self.handle_error)
304 return d
306 return d
305
307
306 def kill(self):
308 def kill(self):
307 d = getProcessOutput(self.delete_command,
309 d = getProcessOutput(self.delete_command,
308 [self.job_id],env=os.environ)
310 [self.job_id],env=os.environ)
309 return d
311 return d
310
312
311 class PBSEngineSet(BatchEngineSet):
313 class PBSEngineSet(BatchEngineSet):
312
314
313 submit_command = 'qsub'
315 submit_command = 'qsub'
314 delete_command = 'qdel'
316 delete_command = 'qdel'
315 job_id_regexp = '\d+'
317 job_id_regexp = '\d+'
316
318
317 def __init__(self, template_file, **kwargs):
319 def __init__(self, template_file, **kwargs):
318 BatchEngineSet.__init__(self, template_file, **kwargs)
320 BatchEngineSet.__init__(self, template_file, **kwargs)
319
321
320
322
321 #-----------------------------------------------------------------------------
323 #-----------------------------------------------------------------------------
322 # Main functions for the different types of clusters
324 # Main functions for the different types of clusters
323 #-----------------------------------------------------------------------------
325 #-----------------------------------------------------------------------------
324
326
325 # TODO:
327 # TODO:
326 # The logic in these codes should be moved into classes like LocalCluster
328 # The logic in these codes should be moved into classes like LocalCluster
327 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
329 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
328 # The main functions should then just parse the command line arguments, create
330 # The main functions should then just parse the command line arguments, create
329 # the appropriate class and call a 'start' method.
331 # the appropriate class and call a 'start' method.
330
332
331 def check_security(args, cont_args):
333 def check_security(args, cont_args):
332 if (not args.x or not args.y) and not have_crypto:
334 if (not args.x or not args.y) and not have_crypto:
333 log.err("""
335 log.err("""
334 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
336 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
335 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
337 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
336 reactor.stop()
338 reactor.stop()
337 return False
339 return False
338 if args.x:
340 if args.x:
339 cont_args.append('-x')
341 cont_args.append('-x')
340 if args.y:
342 if args.y:
341 cont_args.append('-y')
343 cont_args.append('-y')
342 return True
344 return True
343
345
344 def main_local(args):
346 def main_local(args):
345 cont_args = []
347 cont_args = []
346 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
347
349
348 # Check security settings before proceeding
350 # Check security settings before proceeding
349 keep_going = check_security(args, cont_args)
351 if not check_security(args, cont_args):
350 if not keep_going: return
352 return
351
353
352 cl = ControllerLauncher(extra_args=cont_args)
354 cl = ControllerLauncher(extra_args=cont_args)
353 dstart = cl.start()
355 dstart = cl.start()
354 def start_engines(cont_pid):
356 def start_engines(cont_pid):
355 engine_args = []
357 engine_args = []
356 engine_args.append('--logfile=%s' % \
358 engine_args.append('--logfile=%s' % \
357 pjoin(args.logdir,'ipengine%s-' % cont_pid))
359 pjoin(args.logdir,'ipengine%s-' % cont_pid))
358 eset = LocalEngineSet(extra_args=engine_args)
360 eset = LocalEngineSet(extra_args=engine_args)
359 def shutdown(signum, frame):
361 def shutdown(signum, frame):
360 log.msg('Stopping local cluster')
362 log.msg('Stopping local cluster')
361 # We are still playing with the times here, but these seem
363 # We are still playing with the times here, but these seem
362 # to be reliable in allowing everything to exit cleanly.
364 # to be reliable in allowing everything to exit cleanly.
363 eset.interrupt_then_kill(0.5)
365 eset.interrupt_then_kill(0.5)
364 cl.interrupt_then_kill(0.5)
366 cl.interrupt_then_kill(0.5)
365 reactor.callLater(1.0, reactor.stop)
367 reactor.callLater(1.0, reactor.stop)
366 signal.signal(signal.SIGINT,shutdown)
368 signal.signal(signal.SIGINT,shutdown)
367 d = eset.start(args.n)
369 d = eset.start(args.n)
368 return d
370 return d
369 def delay_start(cont_pid):
371 def delay_start(cont_pid):
370 # This is needed because the controller doesn't start listening
372 # This is needed because the controller doesn't start listening
371 # right when it starts and the controller needs to write
373 # right when it starts and the controller needs to write
372 # furl files for the engine to pick up
374 # furl files for the engine to pick up
373 reactor.callLater(1.0, start_engines, cont_pid)
375 reactor.callLater(1.0, start_engines, cont_pid)
374 dstart.addCallback(delay_start)
376 dstart.addCallback(delay_start)
375 dstart.addErrback(lambda f: f.raiseException())
377 dstart.addErrback(lambda f: f.raiseException())
376
378
377 def main_mpirun(args):
379 def main_mpirun(args):
378 cont_args = []
380 cont_args = []
379 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
381 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
380
382
381 # Check security settings before proceeding
383 # Check security settings before proceeding
382 keep_going = check_security(args, cont_args)
384 if not check_security(args, cont_args):
383 if not keep_going: return
385 return
384
386
385 cl = ControllerLauncher(extra_args=cont_args)
387 cl = ControllerLauncher(extra_args=cont_args)
386 dstart = cl.start()
388 dstart = cl.start()
387 def start_engines(cont_pid):
389 def start_engines(cont_pid):
388 raw_args = ['mpirun']
390 raw_args = ['mpirun']
389 raw_args.extend(['-n',str(args.n)])
391 raw_args.extend(['-n',str(args.n)])
390 raw_args.append('ipengine')
392 raw_args.append('ipengine')
391 raw_args.append('-l')
393 raw_args.append('-l')
392 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
394 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
393 if args.mpi:
395 if args.mpi:
394 raw_args.append('--mpi=%s' % args.mpi)
396 raw_args.append('--mpi=%s' % args.mpi)
395 eset = ProcessLauncher(raw_args)
397 eset = ProcessLauncher(raw_args)
396 def shutdown(signum, frame):
398 def shutdown(signum, frame):
397 log.msg('Stopping local cluster')
399 log.msg('Stopping local cluster')
398 # We are still playing with the times here, but these seem
400 # We are still playing with the times here, but these seem
399 # to be reliable in allowing everything to exit cleanly.
401 # to be reliable in allowing everything to exit cleanly.
400 eset.interrupt_then_kill(1.0)
402 eset.interrupt_then_kill(1.0)
401 cl.interrupt_then_kill(1.0)
403 cl.interrupt_then_kill(1.0)
402 reactor.callLater(2.0, reactor.stop)
404 reactor.callLater(2.0, reactor.stop)
403 signal.signal(signal.SIGINT,shutdown)
405 signal.signal(signal.SIGINT,shutdown)
404 d = eset.start()
406 d = eset.start()
405 return d
407 return d
406 def delay_start(cont_pid):
408 def delay_start(cont_pid):
407 # This is needed because the controller doesn't start listening
409 # This is needed because the controller doesn't start listening
408 # right when it starts and the controller needs to write
410 # right when it starts and the controller needs to write
409 # furl files for the engine to pick up
411 # furl files for the engine to pick up
410 reactor.callLater(1.0, start_engines, cont_pid)
412 reactor.callLater(1.0, start_engines, cont_pid)
411 dstart.addCallback(delay_start)
413 dstart.addCallback(delay_start)
412 dstart.addErrback(lambda f: f.raiseException())
414 dstart.addErrback(lambda f: f.raiseException())
413
415
414 def main_pbs(args):
416 def main_pbs(args):
415 cont_args = []
417 cont_args = []
416 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
418 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
417
419
418 # Check security settings before proceeding
420 # Check security settings before proceeding
419 keep_going = check_security(args, cont_args)
421 if not check_security(args, cont_args):
420 if not keep_going: return
422 return
421
423
422 cl = ControllerLauncher(extra_args=cont_args)
424 cl = ControllerLauncher(extra_args=cont_args)
423 dstart = cl.start()
425 dstart = cl.start()
424 def start_engines(r):
426 def start_engines(r):
425 pbs_set = PBSEngineSet(args.pbsscript)
427 pbs_set = PBSEngineSet(args.pbsscript)
426 def shutdown(signum, frame):
428 def shutdown(signum, frame):
427 log.msg('Stopping pbs cluster')
429 log.msg('Stopping pbs cluster')
428 d = pbs_set.kill()
430 d = pbs_set.kill()
429 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
431 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
430 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
432 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
431 signal.signal(signal.SIGINT,shutdown)
433 signal.signal(signal.SIGINT,shutdown)
432 d = pbs_set.start(args.n)
434 d = pbs_set.start(args.n)
433 return d
435 return d
434 dstart.addCallback(start_engines)
436 dstart.addCallback(start_engines)
435 dstart.addErrback(lambda f: f.raiseException())
437 dstart.addErrback(lambda f: f.raiseException())
436
438
437
439
438 def get_args():
440 def get_args():
439 base_parser = argparse.ArgumentParser(add_help=False)
441 base_parser = argparse.ArgumentParser(add_help=False)
440 base_parser.add_argument(
442 base_parser.add_argument(
441 '-x',
443 '-x',
442 action='store_true',
444 action='store_true',
443 dest='x',
445 dest='x',
444 help='turn off client security'
446 help='turn off client security'
445 )
447 )
446 base_parser.add_argument(
448 base_parser.add_argument(
447 '-y',
449 '-y',
448 action='store_true',
450 action='store_true',
449 dest='y',
451 dest='y',
450 help='turn off engine security'
452 help='turn off engine security'
451 )
453 )
452 base_parser.add_argument(
454 base_parser.add_argument(
453 "--logdir",
455 "--logdir",
454 type=str,
456 type=str,
455 dest="logdir",
457 dest="logdir",
456 help="directory to put log files (default=$IPYTHONDIR/log)",
458 help="directory to put log files (default=$IPYTHONDIR/log)",
457 default=pjoin(get_ipython_dir(),'log')
459 default=pjoin(get_ipython_dir(),'log')
458 )
460 )
459 base_parser.add_argument(
461 base_parser.add_argument(
460 "-n",
462 "-n",
461 "--num",
463 "--num",
462 type=int,
464 type=int,
463 dest="n",
465 dest="n",
464 default=2,
466 default=2,
465 help="the number of engines to start"
467 help="the number of engines to start"
466 )
468 )
467
469
468 parser = argparse.ArgumentParser(
470 parser = argparse.ArgumentParser(
469 description='IPython cluster startup. This starts a controller and\
471 description='IPython cluster startup. This starts a controller and\
470 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
472 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
471 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
473 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
472 )
474 )
473 subparsers = parser.add_subparsers(
475 subparsers = parser.add_subparsers(
474 help='available cluster types. For help, do "ipcluster TYPE --help"')
476 help='available cluster types. For help, do "ipcluster TYPE --help"')
475
477
476 parser_local = subparsers.add_parser(
478 parser_local = subparsers.add_parser(
477 'local',
479 'local',
478 help='run a local cluster',
480 help='run a local cluster',
479 parents=[base_parser]
481 parents=[base_parser]
480 )
482 )
481 parser_local.set_defaults(func=main_local)
483 parser_local.set_defaults(func=main_local)
482
484
483 parser_mpirun = subparsers.add_parser(
485 parser_mpirun = subparsers.add_parser(
484 'mpirun',
486 'mpirun',
485 help='run a cluster using mpirun',
487 help='run a cluster using mpirun',
486 parents=[base_parser]
488 parents=[base_parser]
487 )
489 )
488 parser_mpirun.add_argument(
490 parser_mpirun.add_argument(
489 "--mpi",
491 "--mpi",
490 type=str,
492 type=str,
491 dest="mpi", # Don't put a default here to allow no MPI support
493 dest="mpi", # Don't put a default here to allow no MPI support
492 help="how to call MPI_Init (default=mpi4py)"
494 help="how to call MPI_Init (default=mpi4py)"
493 )
495 )
494 parser_mpirun.set_defaults(func=main_mpirun)
496 parser_mpirun.set_defaults(func=main_mpirun)
495
497
496 parser_pbs = subparsers.add_parser(
498 parser_pbs = subparsers.add_parser(
497 'pbs',
499 'pbs',
498 help='run a pbs cluster',
500 help='run a pbs cluster',
499 parents=[base_parser]
501 parents=[base_parser]
500 )
502 )
501 parser_pbs.add_argument(
503 parser_pbs.add_argument(
502 '--pbs-script',
504 '--pbs-script',
503 type=str,
505 type=str,
504 dest='pbsscript',
506 dest='pbsscript',
505 help='PBS script template',
507 help='PBS script template',
506 default='pbs.template'
508 default='pbs.template'
507 )
509 )
508 parser_pbs.set_defaults(func=main_pbs)
510 parser_pbs.set_defaults(func=main_pbs)
509 args = parser.parse_args()
511 args = parser.parse_args()
510 return args
512 return args
511
513
512 def main():
514 def main():
513 args = get_args()
515 args = get_args()
514 reactor.callWhenRunning(args.func, args)
516 reactor.callWhenRunning(args.func, args)
515 log.startLogging(sys.stdout)
517 log.startLogging(sys.stdout)
516 reactor.run()
518 reactor.run()
517
519
518 if __name__ == '__main__':
520 if __name__ == '__main__':
519 main()
521 main()
General Comments 0
You need to be logged in to leave comments. Login now