##// END OF EJS Templates
Merging bug fixed to ipcluster in bg-trunk-dev branch....
Brian Granger -
r1828:febb92be merge
parent child Browse files
Show More
@@ -1,753 +1,753
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3 3
4 4 """Adapt the IPython ControllerServer to IMultiEngine.
5 5
6 6 This module provides classes that adapt a ControllerService to the
7 7 IMultiEngine interface. This interface is a basic interactive interface
8 8 for working with a set of engines where it is desired to have explicit
9 9 access to each registered engine.
10 10
11 11 The classes here are exposed to the network in files like:
12 12
13 13 * multienginevanilla.py
14 14 * multienginepb.py
15 15 """
16 16
17 17 __docformat__ = "restructuredtext en"
18 18
19 19 #-------------------------------------------------------------------------------
20 20 # Copyright (C) 2008 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-------------------------------------------------------------------------------
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Imports
28 28 #-------------------------------------------------------------------------------
29 29
30 30 from new import instancemethod
31 31 from types import FunctionType
32 32
33 33 from twisted.application import service
34 34 from twisted.internet import defer, reactor
35 35 from twisted.python import log, components, failure
36 36 from zope.interface import Interface, implements, Attribute
37 37
38 38 from IPython.tools import growl
39 39 from IPython.kernel.util import printer
40 40 from IPython.kernel.twistedutil import gatherBoth
41 41 from IPython.kernel import map as Map
42 42 from IPython.kernel import error
43 43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 44 from IPython.kernel.controllerservice import \
45 45 ControllerAdapterBase, \
46 46 ControllerService, \
47 47 IControllerBase
48 48
49 49
50 50 #-------------------------------------------------------------------------------
51 51 # Interfaces for the MultiEngine representation of a controller
52 52 #-------------------------------------------------------------------------------
53 53
54 54 class IEngineMultiplexer(Interface):
55 55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56 56
57 57 This class simply acts as a multiplexer of methods that are in the
58 58 various IEngines* interfaces. Thus the methods here are jut like those
59 59 in the IEngine* interfaces, but with an extra first argument, targets.
60 60 The targets argument can have the following forms:
61 61
62 62 * targets = 10 # Engines are indexed by ints
63 63 * targets = [0,1,2,3] # A list of ints
64 64 * targets = 'all' # A string to indicate all targets
65 65
66 66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 67 includes engines not being registered.
68 68
69 69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 70 with length equal to the number of targets. The elements of the list will
71 71 correspond to the return of the corresponding IEngine method.
72 72
73 73 Failures are aggressive, meaning that if an action fails for any target,
74 74 the overall action will fail immediately with that Failure.
75 75
76 76 :Parameters:
77 77 targets : int, list of ints, or 'all'
78 78 Engine ids the action will apply to.
79 79
80 80 :Returns: Deferred to a list of results for each engine.
81 81
82 82 :Exception:
83 83 InvalidEngineID
84 84 If the targets argument is bad or engines aren't registered.
85 85 NoEnginesRegistered
86 86 If there are no engines registered and targets='all'
87 87 """
88 88
89 89 #---------------------------------------------------------------------------
90 90 # Mutiplexed methods
91 91 #---------------------------------------------------------------------------
92 92
93 93 def execute(lines, targets='all'):
94 94 """Execute lines of Python code on targets.
95 95
96 96 See the class docstring for information about targets and possible
97 97 exceptions this method can raise.
98 98
99 99 :Parameters:
100 100 lines : str
101 101 String of python code to be executed on targets.
102 102 """
103 103
104 104 def push(namespace, targets='all'):
105 105 """Push dict namespace into the user's namespace on targets.
106 106
107 107 See the class docstring for information about targets and possible
108 108 exceptions this method can raise.
109 109
110 110 :Parameters:
111 111 namspace : dict
112 112 Dict of key value pairs to be put into the users namspace.
113 113 """
114 114
115 115 def pull(keys, targets='all'):
116 116 """Pull values out of the user's namespace on targets by keys.
117 117
118 118 See the class docstring for information about targets and possible
119 119 exceptions this method can raise.
120 120
121 121 :Parameters:
122 122 keys : tuple of strings
123 123 Sequence of keys to be pulled from user's namespace.
124 124 """
125 125
126 126 def push_function(namespace, targets='all'):
127 127 """"""
128 128
129 129 def pull_function(keys, targets='all'):
130 130 """"""
131 131
132 132 def get_result(i=None, targets='all'):
133 133 """Get the result for command i from targets.
134 134
135 135 See the class docstring for information about targets and possible
136 136 exceptions this method can raise.
137 137
138 138 :Parameters:
139 139 i : int or None
140 140 Command index or None to indicate most recent command.
141 141 """
142 142
143 143 def reset(targets='all'):
144 144 """Reset targets.
145 145
146 146 This clears the users namespace of the Engines, but won't cause
147 147 modules to be reloaded.
148 148 """
149 149
150 150 def keys(targets='all'):
151 151 """Get variable names defined in user's namespace on targets."""
152 152
153 153 def kill(controller=False, targets='all'):
154 154 """Kill the targets Engines and possibly the controller.
155 155
156 156 :Parameters:
157 157 controller : boolean
158 158 Should the controller be killed as well. If so all the
159 159 engines will be killed first no matter what targets is.
160 160 """
161 161
162 162 def push_serialized(namespace, targets='all'):
163 163 """Push a namespace of Serialized objects to targets.
164 164
165 165 :Parameters:
166 166 namespace : dict
167 167 A dict whose keys are the variable names and whose values
168 168 are serialized version of the objects.
169 169 """
170 170
171 171 def pull_serialized(keys, targets='all'):
172 172 """Pull Serialized objects by keys from targets.
173 173
174 174 :Parameters:
175 175 keys : tuple of strings
176 176 Sequence of variable names to pull as serialized objects.
177 177 """
178 178
179 179 def clear_queue(targets='all'):
180 180 """Clear the queue of pending command for targets."""
181 181
182 182 def queue_status(targets='all'):
183 183 """Get the status of the queue on the targets."""
184 184
185 185 def set_properties(properties, targets='all'):
186 186 """set properties by key and value"""
187 187
188 188 def get_properties(keys=None, targets='all'):
189 189 """get a list of properties by `keys`, if no keys specified, get all"""
190 190
191 191 def del_properties(keys, targets='all'):
192 192 """delete properties by `keys`"""
193 193
194 194 def has_properties(keys, targets='all'):
195 195 """get a list of bool values for whether `properties` has `keys`"""
196 196
197 197 def clear_properties(targets='all'):
198 198 """clear the properties dict"""
199 199
200 200
201 201 class IMultiEngine(IEngineMultiplexer):
202 202 """A controller that exposes an explicit interface to all of its engines.
203 203
204 204 This is the primary inteface for interactive usage.
205 205 """
206 206
207 207 def get_ids():
208 208 """Return list of currently registered ids.
209 209
210 210 :Returns: A Deferred to a list of registered engine ids.
211 211 """
212 212
213 213
214 214
215 215 #-------------------------------------------------------------------------------
216 216 # Implementation of the core MultiEngine classes
217 217 #-------------------------------------------------------------------------------
218 218
219 219 class MultiEngine(ControllerAdapterBase):
220 220 """The representation of a ControllerService as a IMultiEngine.
221 221
222 222 Although it is not implemented currently, this class would be where a
223 223 client/notification API is implemented. It could inherit from something
224 224 like results.NotifierParent and then use the notify method to send
225 225 notifications.
226 226 """
227 227
228 228 implements(IMultiEngine)
229 229
230 230 def __init(self, controller):
231 231 ControllerAdapterBase.__init__(self, controller)
232 232
233 233 #---------------------------------------------------------------------------
234 234 # Helper methods
235 235 #---------------------------------------------------------------------------
236 236
237 237 def engineList(self, targets):
238 238 """Parse the targets argument into a list of valid engine objects.
239 239
240 240 :Parameters:
241 241 targets : int, list of ints or 'all'
242 242 The targets argument to be parsed.
243 243
244 244 :Returns: List of engine objects.
245 245
246 246 :Exception:
247 247 InvalidEngineID
248 248 If targets is not valid or if an engine is not registered.
249 249 """
250 250 if isinstance(targets, int):
251 251 if targets not in self.engines.keys():
252 252 log.msg("Engine with id %i is not registered" % targets)
253 253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 254 else:
255 255 return [self.engines[targets]]
256 256 elif isinstance(targets, (list, tuple)):
257 257 for id in targets:
258 258 if id not in self.engines.keys():
259 259 log.msg("Engine with id %r is not registered" % id)
260 260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 261 return map(self.engines.get, targets)
262 262 elif targets == 'all':
263 263 eList = self.engines.values()
264 264 if len(eList) == 0:
265 265 msg = """There are no engines registered.
266 266 Check the logs in ~/.ipython/log if you think there should have been."""
267 267 raise error.NoEnginesRegistered(msg)
268 268 else:
269 269 return eList
270 270 else:
271 271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272 272
273 273 def _performOnEngines(self, methodName, *args, **kwargs):
274 274 """Calls a method on engines and returns deferred to list of results.
275 275
276 276 :Parameters:
277 277 methodName : str
278 278 Name of the method to be called.
279 279 targets : int, list of ints, 'all'
280 280 The targets argument to be parsed into a list of engine objects.
281 281 args
282 282 The positional keyword arguments to be passed to the engines.
283 283 kwargs
284 284 The keyword arguments passed to the method
285 285
286 286 :Returns: List of deferreds to the results on each engine
287 287
288 288 :Exception:
289 289 InvalidEngineID
290 290 If the targets argument is bad in any way.
291 291 AttributeError
292 292 If the method doesn't exist on one of the engines.
293 293 """
294 294 targets = kwargs.pop('targets')
295 295 log.msg("Performing %s on %r" % (methodName, targets))
296 296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 297 # This will and should raise if targets is not valid!
298 298 engines = self.engineList(targets)
299 299 dList = []
300 300 for e in engines:
301 301 meth = getattr(e, methodName, None)
302 302 if meth is not None:
303 303 dList.append(meth(*args, **kwargs))
304 304 else:
305 305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 306 return dList
307 307
308 308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 309 """Called _performOnEngines and wraps result/exception into deferred."""
310 310 try:
311 311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 313 return defer.fail(failure.Failure())
314 314 else:
315 315 # Having fireOnOneErrback is causing problems with the determinacy
316 316 # of the system. Basically, once a single engine has errbacked, this
317 317 # method returns. In some cases, this will cause client to submit
318 318 # another command. Because the previous command is still running
319 319 # on some engines, this command will be queued. When those commands
320 320 # then errback, the second command will raise QueueCleared. Ahhh!
321 321 d = gatherBoth(dList,
322 322 fireOnOneErrback=0,
323 323 consumeErrors=1,
324 324 logErrors=0)
325 325 d.addCallback(error.collect_exceptions, methodName)
326 326 return d
327 327
328 328 #---------------------------------------------------------------------------
329 329 # General IMultiEngine methods
330 330 #---------------------------------------------------------------------------
331 331
332 332 def get_ids(self):
333 333 return defer.succeed(self.engines.keys())
334 334
335 335 #---------------------------------------------------------------------------
336 336 # IEngineMultiplexer methods
337 337 #---------------------------------------------------------------------------
338
338
339 339 def execute(self, lines, targets='all'):
340 340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341 341
342 342 def push(self, ns, targets='all'):
343 343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344 344
345 345 def pull(self, keys, targets='all'):
346 346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347 347
348 348 def push_function(self, ns, targets='all'):
349 349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350 350
351 351 def pull_function(self, keys, targets='all'):
352 352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353 353
354 354 def get_result(self, i=None, targets='all'):
355 355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356 356
357 357 def reset(self, targets='all'):
358 358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359 359
360 360 def keys(self, targets='all'):
361 361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362 362
363 363 def kill(self, controller=False, targets='all'):
364 364 if controller:
365 365 targets = 'all'
366 366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 367 if controller:
368 368 log.msg("Killing controller")
369 369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 370 # Consume any weird stuff coming back
371 371 d.addBoth(lambda _: None)
372 372 return d
373 373
374 374 def push_serialized(self, namespace, targets='all'):
375 375 for k, v in namespace.iteritems():
376 376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 378 return d
379 379
380 380 def pull_serialized(self, keys, targets='all'):
381 381 try:
382 382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 384 return defer.fail(failure.Failure())
385 385 else:
386 386 for d in dList:
387 387 d.addCallback(self._logSizes)
388 388 d = gatherBoth(dList,
389 389 fireOnOneErrback=0,
390 390 consumeErrors=1,
391 391 logErrors=0)
392 392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 393 return d
394 394
395 395 def _logSizes(self, listOfSerialized):
396 396 if isinstance(listOfSerialized, (list, tuple)):
397 397 for s in listOfSerialized:
398 398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 399 else:
400 400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 401 return listOfSerialized
402 402
403 403 def clear_queue(self, targets='all'):
404 404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405 405
406 406 def queue_status(self, targets='all'):
407 407 log.msg("Getting queue status on %r" % targets)
408 408 try:
409 409 engines = self.engineList(targets)
410 410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 411 return defer.fail(failure.Failure())
412 412 else:
413 413 dList = []
414 414 for e in engines:
415 415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 416 d = gatherBoth(dList,
417 417 fireOnOneErrback=0,
418 418 consumeErrors=1,
419 419 logErrors=0)
420 420 d.addCallback(error.collect_exceptions, 'queue_status')
421 421 return d
422 422
423 423 def get_properties(self, keys=None, targets='all'):
424 424 log.msg("Getting properties on %r" % targets)
425 425 try:
426 426 engines = self.engineList(targets)
427 427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 428 return defer.fail(failure.Failure())
429 429 else:
430 430 dList = [e.get_properties(keys) for e in engines]
431 431 d = gatherBoth(dList,
432 432 fireOnOneErrback=0,
433 433 consumeErrors=1,
434 434 logErrors=0)
435 435 d.addCallback(error.collect_exceptions, 'get_properties')
436 436 return d
437 437
438 438 def set_properties(self, properties, targets='all'):
439 439 log.msg("Setting properties on %r" % targets)
440 440 try:
441 441 engines = self.engineList(targets)
442 442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 443 return defer.fail(failure.Failure())
444 444 else:
445 445 dList = [e.set_properties(properties) for e in engines]
446 446 d = gatherBoth(dList,
447 447 fireOnOneErrback=0,
448 448 consumeErrors=1,
449 449 logErrors=0)
450 450 d.addCallback(error.collect_exceptions, 'set_properties')
451 451 return d
452 452
453 453 def has_properties(self, keys, targets='all'):
454 454 log.msg("Checking properties on %r" % targets)
455 455 try:
456 456 engines = self.engineList(targets)
457 457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 458 return defer.fail(failure.Failure())
459 459 else:
460 460 dList = [e.has_properties(keys) for e in engines]
461 461 d = gatherBoth(dList,
462 462 fireOnOneErrback=0,
463 463 consumeErrors=1,
464 464 logErrors=0)
465 465 d.addCallback(error.collect_exceptions, 'has_properties')
466 466 return d
467 467
468 468 def del_properties(self, keys, targets='all'):
469 469 log.msg("Deleting properties on %r" % targets)
470 470 try:
471 471 engines = self.engineList(targets)
472 472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 473 return defer.fail(failure.Failure())
474 474 else:
475 475 dList = [e.del_properties(keys) for e in engines]
476 476 d = gatherBoth(dList,
477 477 fireOnOneErrback=0,
478 478 consumeErrors=1,
479 479 logErrors=0)
480 480 d.addCallback(error.collect_exceptions, 'del_properties')
481 481 return d
482 482
483 483 def clear_properties(self, targets='all'):
484 484 log.msg("Clearing properties on %r" % targets)
485 485 try:
486 486 engines = self.engineList(targets)
487 487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 488 return defer.fail(failure.Failure())
489 489 else:
490 490 dList = [e.clear_properties() for e in engines]
491 491 d = gatherBoth(dList,
492 492 fireOnOneErrback=0,
493 493 consumeErrors=1,
494 494 logErrors=0)
495 495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 496 return d
497 497
498 498
499 499 components.registerAdapter(MultiEngine,
500 500 IControllerBase,
501 501 IMultiEngine)
502 502
503 503
504 504 #-------------------------------------------------------------------------------
505 505 # Interfaces for the Synchronous MultiEngine
506 506 #-------------------------------------------------------------------------------
507 507
508 508 class ISynchronousEngineMultiplexer(Interface):
509 509 pass
510 510
511 511
512 512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 513 """Synchronous, two-phase version of IMultiEngine.
514 514
515 515 Methods in this interface are identical to those of IMultiEngine, but they
516 516 take one additional argument:
517 517
518 518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519 519
520 520 :Parameters:
521 521 block : boolean
522 522 Should the method return a deferred to a deferredID or the
523 523 actual result. If block=False a deferred to a deferredID is
524 524 returned and the user must call `get_pending_deferred` at a later
525 525 point. If block=True, a deferred to the actual result comes back.
526 526 """
527 527 def get_pending_deferred(deferredID, block=True):
528 528 """"""
529 529
530 530 def clear_pending_deferreds():
531 531 """"""
532 532
533 533
534 534 #-------------------------------------------------------------------------------
535 535 # Implementation of the Synchronous MultiEngine
536 536 #-------------------------------------------------------------------------------
537 537
538 538 class SynchronousMultiEngine(PendingDeferredManager):
539 539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540 540
541 541 Warning, this class uses a decorator that currently uses **kwargs.
542 542 Because of this block must be passed as a kwarg, not positionally.
543 543 """
544 544
545 545 implements(ISynchronousMultiEngine)
546 546
547 547 def __init__(self, multiengine):
548 548 self.multiengine = multiengine
549 549 PendingDeferredManager.__init__(self)
550 550
551 551 #---------------------------------------------------------------------------
552 552 # Decorated pending deferred methods
553 553 #---------------------------------------------------------------------------
554 554
555 555 @two_phase
556 556 def execute(self, lines, targets='all'):
557 557 d = self.multiengine.execute(lines, targets)
558 558 return d
559 559
560 560 @two_phase
561 561 def push(self, namespace, targets='all'):
562 562 return self.multiengine.push(namespace, targets)
563 563
564 564 @two_phase
565 565 def pull(self, keys, targets='all'):
566 566 d = self.multiengine.pull(keys, targets)
567 567 return d
568 568
569 569 @two_phase
570 570 def push_function(self, namespace, targets='all'):
571 571 return self.multiengine.push_function(namespace, targets)
572 572
573 573 @two_phase
574 574 def pull_function(self, keys, targets='all'):
575 575 d = self.multiengine.pull_function(keys, targets)
576 576 return d
577 577
578 578 @two_phase
579 579 def get_result(self, i=None, targets='all'):
580 580 return self.multiengine.get_result(i, targets='all')
581 581
582 582 @two_phase
583 583 def reset(self, targets='all'):
584 584 return self.multiengine.reset(targets)
585 585
586 586 @two_phase
587 587 def keys(self, targets='all'):
588 588 return self.multiengine.keys(targets)
589 589
590 590 @two_phase
591 591 def kill(self, controller=False, targets='all'):
592 592 return self.multiengine.kill(controller, targets)
593 593
594 594 @two_phase
595 595 def push_serialized(self, namespace, targets='all'):
596 596 return self.multiengine.push_serialized(namespace, targets)
597 597
598 598 @two_phase
599 599 def pull_serialized(self, keys, targets='all'):
600 600 return self.multiengine.pull_serialized(keys, targets)
601 601
602 602 @two_phase
603 603 def clear_queue(self, targets='all'):
604 604 return self.multiengine.clear_queue(targets)
605 605
606 606 @two_phase
607 607 def queue_status(self, targets='all'):
608 608 return self.multiengine.queue_status(targets)
609 609
610 610 @two_phase
611 611 def set_properties(self, properties, targets='all'):
612 612 return self.multiengine.set_properties(properties, targets)
613 613
614 614 @two_phase
615 615 def get_properties(self, keys=None, targets='all'):
616 616 return self.multiengine.get_properties(keys, targets)
617 617
618 618 @two_phase
619 619 def has_properties(self, keys, targets='all'):
620 620 return self.multiengine.has_properties(keys, targets)
621 621
622 622 @two_phase
623 623 def del_properties(self, keys, targets='all'):
624 624 return self.multiengine.del_properties(keys, targets)
625 625
626 626 @two_phase
627 627 def clear_properties(self, targets='all'):
628 628 return self.multiengine.clear_properties(targets)
629 629
630 630 #---------------------------------------------------------------------------
631 631 # IMultiEngine methods
632 632 #---------------------------------------------------------------------------
633 633
634 634 def get_ids(self):
635 635 """Return a list of registered engine ids.
636 636
637 637 Never use the two phase block/non-block stuff for this.
638 638 """
639 639 return self.multiengine.get_ids()
640 640
641 641
642 642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643 643
644 644
645 645 #-------------------------------------------------------------------------------
646 646 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 647 #-------------------------------------------------------------------------------
648 648
649 649 #-------------------------------------------------------------------------------
650 650 # IMultiEngineCoordinator
651 651 #-------------------------------------------------------------------------------
652 652
653 653 class IMultiEngineCoordinator(Interface):
654 654 """Methods that work on multiple engines explicitly."""
655 655
656 656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 657 """Partition and distribute a sequence to targets."""
658 658
659 659 def gather(key, dist='b', targets='all'):
660 660 """Gather object key from targets."""
661 661
662 662 def raw_map(func, seqs, dist='b', targets='all'):
663 663 """
664 664 A parallelized version of Python's builtin `map` function.
665 665
666 666 This has a slightly different syntax than the builtin `map`.
667 667 This is needed because we need to have keyword arguments and thus
668 668 can't use *args to capture all the sequences. Instead, they must
669 669 be passed in a list or tuple.
670 670
671 671 The equivalence is:
672 672
673 673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674 674
675 675 Most users will want to use parallel functions or the `mapper`
676 676 and `map` methods for an API that follows that of the builtin
677 677 `map`.
678 678 """
679 679
680 680
681 681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 682 """Methods that work on multiple engines explicitly."""
683 683
684 684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 685 """Partition and distribute a sequence to targets."""
686 686
687 687 def gather(key, dist='b', targets='all', block=True):
688 688 """Gather object key from targets"""
689 689
690 690 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 691 """
692 692 A parallelized version of Python's builtin map.
693 693
694 694 This has a slightly different syntax than the builtin `map`.
695 695 This is needed because we need to have keyword arguments and thus
696 696 can't use *args to capture all the sequences. Instead, they must
697 697 be passed in a list or tuple.
698 698
699 699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700 700
701 701 Most users will want to use parallel functions or the `mapper`
702 702 and `map` methods for an API that follows that of the builtin
703 703 `map`.
704 704 """
705 705
706 706
707 707 #-------------------------------------------------------------------------------
708 708 # IMultiEngineExtras
709 709 #-------------------------------------------------------------------------------
710 710
711 711 class IMultiEngineExtras(Interface):
712 712
713 713 def zip_pull(targets, keys):
714 714 """
715 715 Pull, but return results in a different format from `pull`.
716 716
717 717 This method basically returns zip(pull(targets, *keys)), with a few
718 718 edge cases handled differently. Users of chainsaw will find this format
719 719 familiar.
720 720 """
721 721
722 722 def run(targets, fname):
723 723 """Run a .py file on targets."""
724 724
725 725
726 726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 727 def zip_pull(targets, keys, block=True):
728 728 """
729 729 Pull, but return results in a different format from `pull`.
730 730
731 731 This method basically returns zip(pull(targets, *keys)), with a few
732 732 edge cases handled differently. Users of chainsaw will find this format
733 733 familiar.
734 734 """
735 735
736 736 def run(targets, fname, block=True):
737 737 """Run a .py file on targets."""
738 738
739 739 #-------------------------------------------------------------------------------
740 740 # The full MultiEngine interface
741 741 #-------------------------------------------------------------------------------
742 742
743 743 class IFullMultiEngine(IMultiEngine,
744 744 IMultiEngineCoordinator,
745 745 IMultiEngineExtras):
746 746 pass
747 747
748 748
749 749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 750 ISynchronousMultiEngineCoordinator,
751 751 ISynchronousMultiEngineExtras):
752 752 pass
753 753
@@ -1,486 +1,521
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 pjoin = os.path.join
22 22
23 23 from twisted.internet import reactor, defer
24 24 from twisted.internet.protocol import ProcessProtocol
25 from twisted.python import failure, log
26 25 from twisted.internet.error import ProcessDone, ProcessTerminated
27 26 from twisted.internet.utils import getProcessOutput
27 from twisted.python import failure, log
28 28
29 29 from IPython.external import argparse
30 30 from IPython.external import Itpl
31 from IPython.genutils import get_ipython_dir, num_cpus
32 from IPython.kernel.fcutil import have_crypto
33 from IPython.kernel.error import SecurityError
34 from IPython.kernel.fcutil import have_crypto
31 35 from IPython.kernel.twistedutil import gatherBoth
32 36 from IPython.kernel.util import printer
33 from IPython.genutils import get_ipython_dir, num_cpus
37
34 38
35 39 #-----------------------------------------------------------------------------
36 40 # General process handling code
37 41 #-----------------------------------------------------------------------------
38 42
39 43 def find_exe(cmd):
40 44 try:
41 45 import win32api
42 46 except ImportError:
43 47 raise ImportError('you need to have pywin32 installed for this to work')
44 48 else:
45 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
46 return path
49 try:
50 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 except:
52 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
53 return path
47 54
48 55 class ProcessStateError(Exception):
49 56 pass
50 57
51 58 class UnknownStatus(Exception):
52 59 pass
53 60
54 61 class LauncherProcessProtocol(ProcessProtocol):
55 62 """
56 63 A ProcessProtocol to go with the ProcessLauncher.
57 64 """
58 65 def __init__(self, process_launcher):
59 66 self.process_launcher = process_launcher
60 67
61 68 def connectionMade(self):
62 69 self.process_launcher.fire_start_deferred(self.transport.pid)
63 70
64 71 def processEnded(self, status):
65 72 value = status.value
66 73 if isinstance(value, ProcessDone):
67 74 self.process_launcher.fire_stop_deferred(0)
68 75 elif isinstance(value, ProcessTerminated):
69 76 self.process_launcher.fire_stop_deferred(
70 77 {'exit_code':value.exitCode,
71 78 'signal':value.signal,
72 79 'status':value.status
73 80 }
74 81 )
75 82 else:
76 83 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
77 84
78 85 def outReceived(self, data):
79 86 log.msg(data)
80 87
81 88 def errReceived(self, data):
82 89 log.err(data)
83 90
84 91 class ProcessLauncher(object):
85 92 """
86 93 Start and stop an external process in an asynchronous manner.
87 94
88 95 Currently this uses deferreds to notify other parties of process state
89 96 changes. This is an awkward design and should be moved to using
90 97 a formal NotificationCenter.
91 98 """
92 99 def __init__(self, cmd_and_args):
93 100 self.cmd = cmd_and_args[0]
94 101 self.args = cmd_and_args
95 102 self._reset()
96 103
97 104 def _reset(self):
98 105 self.process_protocol = None
99 106 self.pid = None
100 107 self.start_deferred = None
101 108 self.stop_deferreds = []
102 109 self.state = 'before' # before, running, or after
103 110
104 111 @property
105 112 def running(self):
106 113 if self.state == 'running':
107 114 return True
108 115 else:
109 116 return False
110 117
111 118 def fire_start_deferred(self, pid):
112 119 self.pid = pid
113 120 self.state = 'running'
114 121 log.msg('Process %r has started with pid=%i' % (self.args, pid))
115 122 self.start_deferred.callback(pid)
116 123
117 124 def start(self):
118 125 if self.state == 'before':
119 126 self.process_protocol = LauncherProcessProtocol(self)
120 127 self.start_deferred = defer.Deferred()
121 128 self.process_transport = reactor.spawnProcess(
122 129 self.process_protocol,
123 130 self.cmd,
124 131 self.args,
125 132 env=os.environ
126 133 )
127 134 return self.start_deferred
128 135 else:
129 136 s = 'the process has already been started and has state: %r' % \
130 137 self.state
131 138 return defer.fail(ProcessStateError(s))
132 139
133 140 def get_stop_deferred(self):
134 141 if self.state == 'running' or self.state == 'before':
135 142 d = defer.Deferred()
136 143 self.stop_deferreds.append(d)
137 144 return d
138 145 else:
139 146 s = 'this process is already complete'
140 147 return defer.fail(ProcessStateError(s))
141 148
142 149 def fire_stop_deferred(self, exit_code):
143 150 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
144 151 self.state = 'after'
145 152 for d in self.stop_deferreds:
146 153 d.callback(exit_code)
147 154
148 155 def signal(self, sig):
149 156 """
150 157 Send a signal to the process.
151 158
152 159 The argument sig can be ('KILL','INT', etc.) or any signal number.
153 160 """
154 161 if self.state == 'running':
155 162 self.process_transport.signalProcess(sig)
156 163
157 164 # def __del__(self):
158 165 # self.signal('KILL')
159 166
160 167 def interrupt_then_kill(self, delay=1.0):
161 168 self.signal('INT')
162 169 reactor.callLater(delay, self.signal, 'KILL')
163 170
164 171
165 172 #-----------------------------------------------------------------------------
166 173 # Code for launching controller and engines
167 174 #-----------------------------------------------------------------------------
168 175
169 176
170 177 class ControllerLauncher(ProcessLauncher):
171 178
172 179 def __init__(self, extra_args=None):
173 180 if sys.platform == 'win32':
174 args = [find_exe('ipcontroller.bat')]
181 # This logic is needed because the ipcontroller script doesn't
182 # always get installed in the same way or in the same location.
183 from IPython.kernel.scripts import ipcontroller
184 script_location = ipcontroller.__file__.replace('.pyc', '.py')
185 # The -u option here turns on unbuffered output, which is required
186 # on Win32 to prevent wierd conflict and problems with Twisted
187 args = [find_exe('python'), '-u', script_location]
175 188 else:
176 189 args = ['ipcontroller']
177 190 self.extra_args = extra_args
178 191 if extra_args is not None:
179 192 args.extend(extra_args)
180 193
181 194 ProcessLauncher.__init__(self, args)
182 195
183 196
184 197 class EngineLauncher(ProcessLauncher):
185 198
186 199 def __init__(self, extra_args=None):
187 200 if sys.platform == 'win32':
188 args = [find_exe('ipengine.bat')]
201 # This logic is needed because the ipcontroller script doesn't
202 # always get installed in the same way or in the same location.
203 from IPython.kernel.scripts import ipengine
204 script_location = ipengine.__file__.replace('.pyc', '.py')
205 # The -u option here turns on unbuffered output, which is required
206 # on Win32 to prevent wierd conflict and problems with Twisted
207 args = [find_exe('python'), '-u', script_location]
189 208 else:
190 209 args = ['ipengine']
191 210 self.extra_args = extra_args
192 211 if extra_args is not None:
193 212 args.extend(extra_args)
194 213
195 214 ProcessLauncher.__init__(self, args)
196 215
197 216
198 217 class LocalEngineSet(object):
199 218
200 219 def __init__(self, extra_args=None):
201 220 self.extra_args = extra_args
202 221 self.launchers = []
203 222
204 223 def start(self, n):
205 224 dlist = []
206 225 for i in range(n):
207 226 el = EngineLauncher(extra_args=self.extra_args)
208 227 d = el.start()
209 228 self.launchers.append(el)
210 229 dlist.append(d)
211 230 dfinal = gatherBoth(dlist, consumeErrors=True)
212 231 dfinal.addCallback(self._handle_start)
213 232 return dfinal
214 233
215 234 def _handle_start(self, r):
216 235 log.msg('Engines started with pids: %r' % r)
217 236 return r
218 237
219 238 def _handle_stop(self, r):
220 239 log.msg('Engines received signal: %r' % r)
221 240 return r
222 241
223 242 def signal(self, sig):
224 243 dlist = []
225 244 for el in self.launchers:
226 245 d = el.get_stop_deferred()
227 246 dlist.append(d)
228 247 el.signal(sig)
229 248 dfinal = gatherBoth(dlist, consumeErrors=True)
230 249 dfinal.addCallback(self._handle_stop)
231 250 return dfinal
232 251
233 252 def interrupt_then_kill(self, delay=1.0):
234 253 dlist = []
235 254 for el in self.launchers:
236 255 d = el.get_stop_deferred()
237 256 dlist.append(d)
238 257 el.interrupt_then_kill(delay)
239 258 dfinal = gatherBoth(dlist, consumeErrors=True)
240 259 dfinal.addCallback(self._handle_stop)
241 260 return dfinal
242 261
243 262
244 263 class BatchEngineSet(object):
245 264
246 265 # Subclasses must fill these in. See PBSEngineSet
247 266 submit_command = ''
248 267 delete_command = ''
249 268 job_id_regexp = ''
250 269
251 270 def __init__(self, template_file, **kwargs):
252 271 self.template_file = template_file
253 272 self.context = {}
254 273 self.context.update(kwargs)
255 274 self.batch_file = self.template_file+'-run'
256 275
257 276 def parse_job_id(self, output):
258 277 m = re.match(self.job_id_regexp, output)
259 278 if m is not None:
260 279 job_id = m.group()
261 280 else:
262 281 raise Exception("job id couldn't be determined: %s" % output)
263 282 self.job_id = job_id
264 283 log.msg('Job started with job id: %r' % job_id)
265 284 return job_id
266 285
267 286 def write_batch_script(self, n):
268 287 self.context['n'] = n
269 288 template = open(self.template_file, 'r').read()
270 289 log.msg('Using template for batch script: %s' % self.template_file)
271 290 script_as_string = Itpl.itplns(template, self.context)
272 291 log.msg('Writing instantiated batch script: %s' % self.batch_file)
273 292 f = open(self.batch_file,'w')
274 293 f.write(script_as_string)
275 294 f.close()
276
295
277 296 def handle_error(self, f):
278 297 f.printTraceback()
279 298 f.raiseException()
280 299
281 300 def start(self, n):
282 301 self.write_batch_script(n)
283 302 d = getProcessOutput(self.submit_command,
284 303 [self.batch_file],env=os.environ)
285 304 d.addCallback(self.parse_job_id)
286 305 d.addErrback(self.handle_error)
287 306 return d
288
307
289 308 def kill(self):
290 309 d = getProcessOutput(self.delete_command,
291 310 [self.job_id],env=os.environ)
292 311 return d
293 312
294 313 class PBSEngineSet(BatchEngineSet):
295 314
296 315 submit_command = 'qsub'
297 316 delete_command = 'qdel'
298 317 job_id_regexp = '\d+'
299 318
300 319 def __init__(self, template_file, **kwargs):
301 320 BatchEngineSet.__init__(self, template_file, **kwargs)
302 321
303 322
304 323 #-----------------------------------------------------------------------------
305 324 # Main functions for the different types of clusters
306 325 #-----------------------------------------------------------------------------
307 326
308 327 # TODO:
309 328 # The logic in these codes should be moved into classes like LocalCluster
310 329 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
311 330 # The main functions should then just parse the command line arguments, create
312 331 # the appropriate class and call a 'start' method.
313 332
314 def main_local(args):
315 cont_args = []
316 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
333 def check_security(args, cont_args):
334 if (not args.x or not args.y) and not have_crypto:
335 log.err("""
336 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
337 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
338 reactor.stop()
339 return False
317 340 if args.x:
318 341 cont_args.append('-x')
319 342 if args.y:
320 343 cont_args.append('-y')
344 return True
345
346 def main_local(args):
347 cont_args = []
348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
349
350 # Check security settings before proceeding
351 if not check_security(args, cont_args):
352 return
353
321 354 cl = ControllerLauncher(extra_args=cont_args)
322 355 dstart = cl.start()
323 356 def start_engines(cont_pid):
324 357 engine_args = []
325 358 engine_args.append('--logfile=%s' % \
326 359 pjoin(args.logdir,'ipengine%s-' % cont_pid))
327 360 eset = LocalEngineSet(extra_args=engine_args)
328 361 def shutdown(signum, frame):
329 362 log.msg('Stopping local cluster')
330 363 # We are still playing with the times here, but these seem
331 364 # to be reliable in allowing everything to exit cleanly.
332 365 eset.interrupt_then_kill(0.5)
333 366 cl.interrupt_then_kill(0.5)
334 367 reactor.callLater(1.0, reactor.stop)
335 368 signal.signal(signal.SIGINT,shutdown)
336 369 d = eset.start(args.n)
337 370 return d
338 371 def delay_start(cont_pid):
339 372 # This is needed because the controller doesn't start listening
340 373 # right when it starts and the controller needs to write
341 374 # furl files for the engine to pick up
342 375 reactor.callLater(1.0, start_engines, cont_pid)
343 376 dstart.addCallback(delay_start)
344 377 dstart.addErrback(lambda f: f.raiseException())
345 378
346 379 def main_mpirun(args):
347 380 cont_args = []
348 381 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
349 if args.x:
350 cont_args.append('-x')
351 if args.y:
352 cont_args.append('-y')
382
383 # Check security settings before proceeding
384 if not check_security(args, cont_args):
385 return
386
353 387 cl = ControllerLauncher(extra_args=cont_args)
354 388 dstart = cl.start()
355 389 def start_engines(cont_pid):
356 390 raw_args = ['mpirun']
357 391 raw_args.extend(['-n',str(args.n)])
358 392 raw_args.append('ipengine')
359 393 raw_args.append('-l')
360 394 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
361 395 if args.mpi:
362 396 raw_args.append('--mpi=%s' % args.mpi)
363 397 eset = ProcessLauncher(raw_args)
364 398 def shutdown(signum, frame):
365 399 log.msg('Stopping local cluster')
366 400 # We are still playing with the times here, but these seem
367 401 # to be reliable in allowing everything to exit cleanly.
368 402 eset.interrupt_then_kill(1.0)
369 403 cl.interrupt_then_kill(1.0)
370 404 reactor.callLater(2.0, reactor.stop)
371 405 signal.signal(signal.SIGINT,shutdown)
372 406 d = eset.start()
373 407 return d
374 408 def delay_start(cont_pid):
375 409 # This is needed because the controller doesn't start listening
376 410 # right when it starts and the controller needs to write
377 411 # furl files for the engine to pick up
378 412 reactor.callLater(1.0, start_engines, cont_pid)
379 413 dstart.addCallback(delay_start)
380 414 dstart.addErrback(lambda f: f.raiseException())
381 415
382 416 def main_pbs(args):
383 417 cont_args = []
384 418 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
385 if args.x:
386 cont_args.append('-x')
387 if args.y:
388 cont_args.append('-y')
419
420 # Check security settings before proceeding
421 if not check_security(args, cont_args):
422 return
423
389 424 cl = ControllerLauncher(extra_args=cont_args)
390 425 dstart = cl.start()
391 426 def start_engines(r):
392 427 pbs_set = PBSEngineSet(args.pbsscript)
393 428 def shutdown(signum, frame):
394 429 log.msg('Stopping pbs cluster')
395 430 d = pbs_set.kill()
396 431 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
397 432 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
398 433 signal.signal(signal.SIGINT,shutdown)
399 434 d = pbs_set.start(args.n)
400 435 return d
401 436 dstart.addCallback(start_engines)
402 437 dstart.addErrback(lambda f: f.raiseException())
403 438
404 439
405 440 def get_args():
406 441 base_parser = argparse.ArgumentParser(add_help=False)
407 442 base_parser.add_argument(
408 443 '-x',
409 444 action='store_true',
410 445 dest='x',
411 446 help='turn off client security'
412 447 )
413 448 base_parser.add_argument(
414 449 '-y',
415 450 action='store_true',
416 451 dest='y',
417 452 help='turn off engine security'
418 453 )
419 454 base_parser.add_argument(
420 455 "--logdir",
421 456 type=str,
422 457 dest="logdir",
423 458 help="directory to put log files (default=$IPYTHONDIR/log)",
424 459 default=pjoin(get_ipython_dir(),'log')
425 460 )
426 461 base_parser.add_argument(
427 462 "-n",
428 463 "--num",
429 464 type=int,
430 465 dest="n",
431 466 default=2,
432 467 help="the number of engines to start"
433 468 )
434 469
435 470 parser = argparse.ArgumentParser(
436 471 description='IPython cluster startup. This starts a controller and\
437 472 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
438 473 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
439 474 )
440 475 subparsers = parser.add_subparsers(
441 476 help='available cluster types. For help, do "ipcluster TYPE --help"')
442 477
443 478 parser_local = subparsers.add_parser(
444 479 'local',
445 480 help='run a local cluster',
446 481 parents=[base_parser]
447 482 )
448 483 parser_local.set_defaults(func=main_local)
449 484
450 485 parser_mpirun = subparsers.add_parser(
451 486 'mpirun',
452 487 help='run a cluster using mpirun',
453 488 parents=[base_parser]
454 489 )
455 490 parser_mpirun.add_argument(
456 491 "--mpi",
457 492 type=str,
458 493 dest="mpi", # Don't put a default here to allow no MPI support
459 494 help="how to call MPI_Init (default=mpi4py)"
460 495 )
461 496 parser_mpirun.set_defaults(func=main_mpirun)
462 497
463 498 parser_pbs = subparsers.add_parser(
464 499 'pbs',
465 500 help='run a pbs cluster',
466 501 parents=[base_parser]
467 502 )
468 503 parser_pbs.add_argument(
469 504 '--pbs-script',
470 505 type=str,
471 506 dest='pbsscript',
472 507 help='PBS script template',
473 508 default='pbs.template'
474 509 )
475 510 parser_pbs.set_defaults(func=main_pbs)
476 511 args = parser.parse_args()
477 512 return args
478 513
479 514 def main():
480 515 args = get_args()
481 516 reactor.callWhenRunning(args.func, args)
482 517 log.startLogging(sys.stdout)
483 518 reactor.run()
484 519
485 520 if __name__ == '__main__':
486 521 main()
General Comments 0
You need to be logged in to leave comments. Login now