##// END OF EJS Templates
Fixing small things in response to review.
Brian Granger -
Show More
@@ -1,753 +1,753 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3 3
4 4 """Adapt the IPython ControllerServer to IMultiEngine.
5 5
6 6 This module provides classes that adapt a ControllerService to the
7 7 IMultiEngine interface. This interface is a basic interactive interface
8 8 for working with a set of engines where it is desired to have explicit
9 9 access to each registered engine.
10 10
11 11 The classes here are exposed to the network in files like:
12 12
13 13 * multienginevanilla.py
14 14 * multienginepb.py
15 15 """
16 16
17 17 __docformat__ = "restructuredtext en"
18 18
19 19 #-------------------------------------------------------------------------------
20 20 # Copyright (C) 2008 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-------------------------------------------------------------------------------
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Imports
28 28 #-------------------------------------------------------------------------------
29 29
30 30 from new import instancemethod
31 31 from types import FunctionType
32 32
33 33 from twisted.application import service
34 34 from twisted.internet import defer, reactor
35 35 from twisted.python import log, components, failure
36 36 from zope.interface import Interface, implements, Attribute
37 37
38 38 from IPython.tools import growl
39 39 from IPython.kernel.util import printer
40 40 from IPython.kernel.twistedutil import gatherBoth
41 41 from IPython.kernel import map as Map
42 42 from IPython.kernel import error
43 43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 44 from IPython.kernel.controllerservice import \
45 45 ControllerAdapterBase, \
46 46 ControllerService, \
47 47 IControllerBase
48 48
49 49
50 50 #-------------------------------------------------------------------------------
51 51 # Interfaces for the MultiEngine representation of a controller
52 52 #-------------------------------------------------------------------------------
53 53
54 54 class IEngineMultiplexer(Interface):
55 55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56 56
57 57 This class simply acts as a multiplexer of methods that are in the
58 58 various IEngines* interfaces. Thus the methods here are jut like those
59 59 in the IEngine* interfaces, but with an extra first argument, targets.
60 60 The targets argument can have the following forms:
61 61
62 62 * targets = 10 # Engines are indexed by ints
63 63 * targets = [0,1,2,3] # A list of ints
64 64 * targets = 'all' # A string to indicate all targets
65 65
66 66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 67 includes engines not being registered.
68 68
69 69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 70 with length equal to the number of targets. The elements of the list will
71 71 correspond to the return of the corresponding IEngine method.
72 72
73 73 Failures are aggressive, meaning that if an action fails for any target,
74 74 the overall action will fail immediately with that Failure.
75 75
76 76 :Parameters:
77 77 targets : int, list of ints, or 'all'
78 78 Engine ids the action will apply to.
79 79
80 80 :Returns: Deferred to a list of results for each engine.
81 81
82 82 :Exception:
83 83 InvalidEngineID
84 84 If the targets argument is bad or engines aren't registered.
85 85 NoEnginesRegistered
86 86 If there are no engines registered and targets='all'
87 87 """
88 88
89 89 #---------------------------------------------------------------------------
90 90 # Mutiplexed methods
91 91 #---------------------------------------------------------------------------
92 92
93 93 def execute(lines, targets='all'):
94 94 """Execute lines of Python code on targets.
95 95
96 96 See the class docstring for information about targets and possible
97 97 exceptions this method can raise.
98 98
99 99 :Parameters:
100 100 lines : str
101 101 String of python code to be executed on targets.
102 102 """
103 103
104 104 def push(namespace, targets='all'):
105 105 """Push dict namespace into the user's namespace on targets.
106 106
107 107 See the class docstring for information about targets and possible
108 108 exceptions this method can raise.
109 109
110 110 :Parameters:
111 111 namspace : dict
112 112 Dict of key value pairs to be put into the users namspace.
113 113 """
114 114
115 115 def pull(keys, targets='all'):
116 116 """Pull values out of the user's namespace on targets by keys.
117 117
118 118 See the class docstring for information about targets and possible
119 119 exceptions this method can raise.
120 120
121 121 :Parameters:
122 122 keys : tuple of strings
123 123 Sequence of keys to be pulled from user's namespace.
124 124 """
125 125
126 126 def push_function(namespace, targets='all'):
127 127 """"""
128 128
129 129 def pull_function(keys, targets='all'):
130 130 """"""
131 131
132 132 def get_result(i=None, targets='all'):
133 133 """Get the result for command i from targets.
134 134
135 135 See the class docstring for information about targets and possible
136 136 exceptions this method can raise.
137 137
138 138 :Parameters:
139 139 i : int or None
140 140 Command index or None to indicate most recent command.
141 141 """
142 142
143 143 def reset(targets='all'):
144 144 """Reset targets.
145 145
146 146 This clears the users namespace of the Engines, but won't cause
147 147 modules to be reloaded.
148 148 """
149 149
150 150 def keys(targets='all'):
151 151 """Get variable names defined in user's namespace on targets."""
152 152
153 153 def kill(controller=False, targets='all'):
154 154 """Kill the targets Engines and possibly the controller.
155 155
156 156 :Parameters:
157 157 controller : boolean
158 158 Should the controller be killed as well. If so all the
159 159 engines will be killed first no matter what targets is.
160 160 """
161 161
162 162 def push_serialized(namespace, targets='all'):
163 163 """Push a namespace of Serialized objects to targets.
164 164
165 165 :Parameters:
166 166 namespace : dict
167 167 A dict whose keys are the variable names and whose values
168 168 are serialized version of the objects.
169 169 """
170 170
171 171 def pull_serialized(keys, targets='all'):
172 172 """Pull Serialized objects by keys from targets.
173 173
174 174 :Parameters:
175 175 keys : tuple of strings
176 176 Sequence of variable names to pull as serialized objects.
177 177 """
178 178
179 179 def clear_queue(targets='all'):
180 180 """Clear the queue of pending command for targets."""
181 181
182 182 def queue_status(targets='all'):
183 183 """Get the status of the queue on the targets."""
184 184
185 185 def set_properties(properties, targets='all'):
186 186 """set properties by key and value"""
187 187
188 188 def get_properties(keys=None, targets='all'):
189 189 """get a list of properties by `keys`, if no keys specified, get all"""
190 190
191 191 def del_properties(keys, targets='all'):
192 192 """delete properties by `keys`"""
193 193
194 194 def has_properties(keys, targets='all'):
195 195 """get a list of bool values for whether `properties` has `keys`"""
196 196
197 197 def clear_properties(targets='all'):
198 198 """clear the properties dict"""
199 199
200 200
201 201 class IMultiEngine(IEngineMultiplexer):
202 202 """A controller that exposes an explicit interface to all of its engines.
203 203
204 204 This is the primary inteface for interactive usage.
205 205 """
206 206
207 207 def get_ids():
208 208 """Return list of currently registered ids.
209 209
210 210 :Returns: A Deferred to a list of registered engine ids.
211 211 """
212 212
213 213
214 214
215 215 #-------------------------------------------------------------------------------
216 216 # Implementation of the core MultiEngine classes
217 217 #-------------------------------------------------------------------------------
218 218
219 219 class MultiEngine(ControllerAdapterBase):
220 220 """The representation of a ControllerService as a IMultiEngine.
221 221
222 222 Although it is not implemented currently, this class would be where a
223 223 client/notification API is implemented. It could inherit from something
224 224 like results.NotifierParent and then use the notify method to send
225 225 notifications.
226 226 """
227 227
228 228 implements(IMultiEngine)
229 229
230 230 def __init(self, controller):
231 231 ControllerAdapterBase.__init__(self, controller)
232 232
233 233 #---------------------------------------------------------------------------
234 234 # Helper methods
235 235 #---------------------------------------------------------------------------
236 236
237 237 def engineList(self, targets):
238 238 """Parse the targets argument into a list of valid engine objects.
239 239
240 240 :Parameters:
241 241 targets : int, list of ints or 'all'
242 242 The targets argument to be parsed.
243 243
244 244 :Returns: List of engine objects.
245 245
246 246 :Exception:
247 247 InvalidEngineID
248 248 If targets is not valid or if an engine is not registered.
249 249 """
250 250 if isinstance(targets, int):
251 251 if targets not in self.engines.keys():
252 252 log.msg("Engine with id %i is not registered" % targets)
253 253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 254 else:
255 255 return [self.engines[targets]]
256 256 elif isinstance(targets, (list, tuple)):
257 257 for id in targets:
258 258 if id not in self.engines.keys():
259 259 log.msg("Engine with id %r is not registered" % id)
260 260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 261 return map(self.engines.get, targets)
262 262 elif targets == 'all':
263 263 eList = self.engines.values()
264 264 if len(eList) == 0:
265 265 msg = """There are no engines registered.
266 266 Check the logs in ~/.ipython/log if you think there should have been."""
267 267 raise error.NoEnginesRegistered(msg)
268 268 else:
269 269 return eList
270 270 else:
271 271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272 272
273 273 def _performOnEngines(self, methodName, *args, **kwargs):
274 274 """Calls a method on engines and returns deferred to list of results.
275 275
276 276 :Parameters:
277 277 methodName : str
278 278 Name of the method to be called.
279 279 targets : int, list of ints, 'all'
280 280 The targets argument to be parsed into a list of engine objects.
281 281 args
282 282 The positional keyword arguments to be passed to the engines.
283 283 kwargs
284 284 The keyword arguments passed to the method
285 285
286 286 :Returns: List of deferreds to the results on each engine
287 287
288 288 :Exception:
289 289 InvalidEngineID
290 290 If the targets argument is bad in any way.
291 291 AttributeError
292 292 If the method doesn't exist on one of the engines.
293 293 """
294 294 targets = kwargs.pop('targets')
295 295 log.msg("Performing %s on %r" % (methodName, targets))
296 296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 297 # This will and should raise if targets is not valid!
298 298 engines = self.engineList(targets)
299 299 dList = []
300 300 for e in engines:
301 301 meth = getattr(e, methodName, None)
302 302 if meth is not None:
303 303 dList.append(meth(*args, **kwargs))
304 304 else:
305 305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 306 return dList
307 307
308 308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 309 """Called _performOnEngines and wraps result/exception into deferred."""
310 310 try:
311 311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 313 return defer.fail(failure.Failure())
314 314 else:
315 315 # Having fireOnOneErrback is causing problems with the determinacy
316 316 # of the system. Basically, once a single engine has errbacked, this
317 317 # method returns. In some cases, this will cause client to submit
318 318 # another command. Because the previous command is still running
319 319 # on some engines, this command will be queued. When those commands
320 320 # then errback, the second command will raise QueueCleared. Ahhh!
321 321 d = gatherBoth(dList,
322 322 fireOnOneErrback=0,
323 323 consumeErrors=1,
324 324 logErrors=0)
325 325 d.addCallback(error.collect_exceptions, methodName)
326 326 return d
327 327
328 328 #---------------------------------------------------------------------------
329 329 # General IMultiEngine methods
330 330 #---------------------------------------------------------------------------
331 331
332 332 def get_ids(self):
333 333 return defer.succeed(self.engines.keys())
334 334
335 335 #---------------------------------------------------------------------------
336 336 # IEngineMultiplexer methods
337 337 #---------------------------------------------------------------------------
338
338
339 339 def execute(self, lines, targets='all'):
340 340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341 341
342 342 def push(self, ns, targets='all'):
343 343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344 344
345 345 def pull(self, keys, targets='all'):
346 346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347 347
348 348 def push_function(self, ns, targets='all'):
349 349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350 350
351 351 def pull_function(self, keys, targets='all'):
352 352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353 353
354 354 def get_result(self, i=None, targets='all'):
355 355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356 356
357 357 def reset(self, targets='all'):
358 358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359 359
360 360 def keys(self, targets='all'):
361 361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362 362
363 363 def kill(self, controller=False, targets='all'):
364 364 if controller:
365 365 targets = 'all'
366 366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 367 if controller:
368 368 log.msg("Killing controller")
369 369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 370 # Consume any weird stuff coming back
371 371 d.addBoth(lambda _: None)
372 372 return d
373 373
374 374 def push_serialized(self, namespace, targets='all'):
375 375 for k, v in namespace.iteritems():
376 376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 378 return d
379 379
380 380 def pull_serialized(self, keys, targets='all'):
381 381 try:
382 382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 384 return defer.fail(failure.Failure())
385 385 else:
386 386 for d in dList:
387 387 d.addCallback(self._logSizes)
388 388 d = gatherBoth(dList,
389 389 fireOnOneErrback=0,
390 390 consumeErrors=1,
391 391 logErrors=0)
392 392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 393 return d
394 394
395 395 def _logSizes(self, listOfSerialized):
396 396 if isinstance(listOfSerialized, (list, tuple)):
397 397 for s in listOfSerialized:
398 398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 399 else:
400 400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 401 return listOfSerialized
402 402
403 403 def clear_queue(self, targets='all'):
404 404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405 405
406 406 def queue_status(self, targets='all'):
407 407 log.msg("Getting queue status on %r" % targets)
408 408 try:
409 409 engines = self.engineList(targets)
410 410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 411 return defer.fail(failure.Failure())
412 412 else:
413 413 dList = []
414 414 for e in engines:
415 415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 416 d = gatherBoth(dList,
417 417 fireOnOneErrback=0,
418 418 consumeErrors=1,
419 419 logErrors=0)
420 420 d.addCallback(error.collect_exceptions, 'queue_status')
421 421 return d
422 422
423 423 def get_properties(self, keys=None, targets='all'):
424 424 log.msg("Getting properties on %r" % targets)
425 425 try:
426 426 engines = self.engineList(targets)
427 427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 428 return defer.fail(failure.Failure())
429 429 else:
430 430 dList = [e.get_properties(keys) for e in engines]
431 431 d = gatherBoth(dList,
432 432 fireOnOneErrback=0,
433 433 consumeErrors=1,
434 434 logErrors=0)
435 435 d.addCallback(error.collect_exceptions, 'get_properties')
436 436 return d
437 437
438 438 def set_properties(self, properties, targets='all'):
439 439 log.msg("Setting properties on %r" % targets)
440 440 try:
441 441 engines = self.engineList(targets)
442 442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 443 return defer.fail(failure.Failure())
444 444 else:
445 445 dList = [e.set_properties(properties) for e in engines]
446 446 d = gatherBoth(dList,
447 447 fireOnOneErrback=0,
448 448 consumeErrors=1,
449 449 logErrors=0)
450 450 d.addCallback(error.collect_exceptions, 'set_properties')
451 451 return d
452 452
453 453 def has_properties(self, keys, targets='all'):
454 454 log.msg("Checking properties on %r" % targets)
455 455 try:
456 456 engines = self.engineList(targets)
457 457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 458 return defer.fail(failure.Failure())
459 459 else:
460 460 dList = [e.has_properties(keys) for e in engines]
461 461 d = gatherBoth(dList,
462 462 fireOnOneErrback=0,
463 463 consumeErrors=1,
464 464 logErrors=0)
465 465 d.addCallback(error.collect_exceptions, 'has_properties')
466 466 return d
467 467
468 468 def del_properties(self, keys, targets='all'):
469 469 log.msg("Deleting properties on %r" % targets)
470 470 try:
471 471 engines = self.engineList(targets)
472 472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 473 return defer.fail(failure.Failure())
474 474 else:
475 475 dList = [e.del_properties(keys) for e in engines]
476 476 d = gatherBoth(dList,
477 477 fireOnOneErrback=0,
478 478 consumeErrors=1,
479 479 logErrors=0)
480 480 d.addCallback(error.collect_exceptions, 'del_properties')
481 481 return d
482 482
483 483 def clear_properties(self, targets='all'):
484 484 log.msg("Clearing properties on %r" % targets)
485 485 try:
486 486 engines = self.engineList(targets)
487 487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 488 return defer.fail(failure.Failure())
489 489 else:
490 490 dList = [e.clear_properties() for e in engines]
491 491 d = gatherBoth(dList,
492 492 fireOnOneErrback=0,
493 493 consumeErrors=1,
494 494 logErrors=0)
495 495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 496 return d
497 497
498 498
499 499 components.registerAdapter(MultiEngine,
500 500 IControllerBase,
501 501 IMultiEngine)
502 502
503 503
504 504 #-------------------------------------------------------------------------------
505 505 # Interfaces for the Synchronous MultiEngine
506 506 #-------------------------------------------------------------------------------
507 507
508 508 class ISynchronousEngineMultiplexer(Interface):
509 509 pass
510 510
511 511
512 512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 513 """Synchronous, two-phase version of IMultiEngine.
514 514
515 515 Methods in this interface are identical to those of IMultiEngine, but they
516 516 take one additional argument:
517 517
518 518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519 519
520 520 :Parameters:
521 521 block : boolean
522 522 Should the method return a deferred to a deferredID or the
523 523 actual result. If block=False a deferred to a deferredID is
524 524 returned and the user must call `get_pending_deferred` at a later
525 525 point. If block=True, a deferred to the actual result comes back.
526 526 """
527 527 def get_pending_deferred(deferredID, block=True):
528 528 """"""
529 529
530 530 def clear_pending_deferreds():
531 531 """"""
532 532
533 533
534 534 #-------------------------------------------------------------------------------
535 535 # Implementation of the Synchronous MultiEngine
536 536 #-------------------------------------------------------------------------------
537 537
538 538 class SynchronousMultiEngine(PendingDeferredManager):
539 539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540 540
541 541 Warning, this class uses a decorator that currently uses **kwargs.
542 542 Because of this block must be passed as a kwarg, not positionally.
543 543 """
544 544
545 545 implements(ISynchronousMultiEngine)
546 546
547 547 def __init__(self, multiengine):
548 548 self.multiengine = multiengine
549 549 PendingDeferredManager.__init__(self)
550 550
551 551 #---------------------------------------------------------------------------
552 552 # Decorated pending deferred methods
553 553 #---------------------------------------------------------------------------
554 554
555 555 @two_phase
556 556 def execute(self, lines, targets='all'):
557 557 d = self.multiengine.execute(lines, targets)
558 558 return d
559 559
560 560 @two_phase
561 561 def push(self, namespace, targets='all'):
562 562 return self.multiengine.push(namespace, targets)
563 563
564 564 @two_phase
565 565 def pull(self, keys, targets='all'):
566 566 d = self.multiengine.pull(keys, targets)
567 567 return d
568 568
569 569 @two_phase
570 570 def push_function(self, namespace, targets='all'):
571 571 return self.multiengine.push_function(namespace, targets)
572 572
573 573 @two_phase
574 574 def pull_function(self, keys, targets='all'):
575 575 d = self.multiengine.pull_function(keys, targets)
576 576 return d
577 577
578 578 @two_phase
579 579 def get_result(self, i=None, targets='all'):
580 580 return self.multiengine.get_result(i, targets='all')
581 581
582 582 @two_phase
583 583 def reset(self, targets='all'):
584 584 return self.multiengine.reset(targets)
585 585
586 586 @two_phase
587 587 def keys(self, targets='all'):
588 588 return self.multiengine.keys(targets)
589 589
590 590 @two_phase
591 591 def kill(self, controller=False, targets='all'):
592 592 return self.multiengine.kill(controller, targets)
593 593
594 594 @two_phase
595 595 def push_serialized(self, namespace, targets='all'):
596 596 return self.multiengine.push_serialized(namespace, targets)
597 597
598 598 @two_phase
599 599 def pull_serialized(self, keys, targets='all'):
600 600 return self.multiengine.pull_serialized(keys, targets)
601 601
602 602 @two_phase
603 603 def clear_queue(self, targets='all'):
604 604 return self.multiengine.clear_queue(targets)
605 605
606 606 @two_phase
607 607 def queue_status(self, targets='all'):
608 608 return self.multiengine.queue_status(targets)
609 609
610 610 @two_phase
611 611 def set_properties(self, properties, targets='all'):
612 612 return self.multiengine.set_properties(properties, targets)
613 613
614 614 @two_phase
615 615 def get_properties(self, keys=None, targets='all'):
616 616 return self.multiengine.get_properties(keys, targets)
617 617
618 618 @two_phase
619 619 def has_properties(self, keys, targets='all'):
620 620 return self.multiengine.has_properties(keys, targets)
621 621
622 622 @two_phase
623 623 def del_properties(self, keys, targets='all'):
624 624 return self.multiengine.del_properties(keys, targets)
625 625
626 626 @two_phase
627 627 def clear_properties(self, targets='all'):
628 628 return self.multiengine.clear_properties(targets)
629 629
630 630 #---------------------------------------------------------------------------
631 631 # IMultiEngine methods
632 632 #---------------------------------------------------------------------------
633 633
634 634 def get_ids(self):
635 635 """Return a list of registered engine ids.
636 636
637 637 Never use the two phase block/non-block stuff for this.
638 638 """
639 639 return self.multiengine.get_ids()
640 640
641 641
642 642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643 643
644 644
645 645 #-------------------------------------------------------------------------------
646 646 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 647 #-------------------------------------------------------------------------------
648 648
649 649 #-------------------------------------------------------------------------------
650 650 # IMultiEngineCoordinator
651 651 #-------------------------------------------------------------------------------
652 652
653 653 class IMultiEngineCoordinator(Interface):
654 654 """Methods that work on multiple engines explicitly."""
655 655
656 656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 657 """Partition and distribute a sequence to targets."""
658 658
659 659 def gather(key, dist='b', targets='all'):
660 660 """Gather object key from targets."""
661 661
662 662 def raw_map(func, seqs, dist='b', targets='all'):
663 663 """
664 664 A parallelized version of Python's builtin `map` function.
665 665
666 666 This has a slightly different syntax than the builtin `map`.
667 667 This is needed because we need to have keyword arguments and thus
668 668 can't use *args to capture all the sequences. Instead, they must
669 669 be passed in a list or tuple.
670 670
671 671 The equivalence is:
672 672
673 673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674 674
675 675 Most users will want to use parallel functions or the `mapper`
676 676 and `map` methods for an API that follows that of the builtin
677 677 `map`.
678 678 """
679 679
680 680
681 681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 682 """Methods that work on multiple engines explicitly."""
683 683
684 684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 685 """Partition and distribute a sequence to targets."""
686 686
687 687 def gather(key, dist='b', targets='all', block=True):
688 688 """Gather object key from targets"""
689 689
690 690 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 691 """
692 692 A parallelized version of Python's builtin map.
693 693
694 694 This has a slightly different syntax than the builtin `map`.
695 695 This is needed because we need to have keyword arguments and thus
696 696 can't use *args to capture all the sequences. Instead, they must
697 697 be passed in a list or tuple.
698 698
699 699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700 700
701 701 Most users will want to use parallel functions or the `mapper`
702 702 and `map` methods for an API that follows that of the builtin
703 703 `map`.
704 704 """
705 705
706 706
707 707 #-------------------------------------------------------------------------------
708 708 # IMultiEngineExtras
709 709 #-------------------------------------------------------------------------------
710 710
711 711 class IMultiEngineExtras(Interface):
712 712
713 713 def zip_pull(targets, keys):
714 714 """
715 715 Pull, but return results in a different format from `pull`.
716 716
717 717 This method basically returns zip(pull(targets, *keys)), with a few
718 718 edge cases handled differently. Users of chainsaw will find this format
719 719 familiar.
720 720 """
721 721
722 722 def run(targets, fname):
723 723 """Run a .py file on targets."""
724 724
725 725
726 726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 727 def zip_pull(targets, keys, block=True):
728 728 """
729 729 Pull, but return results in a different format from `pull`.
730 730
731 731 This method basically returns zip(pull(targets, *keys)), with a few
732 732 edge cases handled differently. Users of chainsaw will find this format
733 733 familiar.
734 734 """
735 735
736 736 def run(targets, fname, block=True):
737 737 """Run a .py file on targets."""
738 738
739 739 #-------------------------------------------------------------------------------
740 740 # The full MultiEngine interface
741 741 #-------------------------------------------------------------------------------
742 742
743 743 class IFullMultiEngine(IMultiEngine,
744 744 IMultiEngineCoordinator,
745 745 IMultiEngineExtras):
746 746 pass
747 747
748 748
749 749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 750 ISynchronousMultiEngineCoordinator,
751 751 ISynchronousMultiEngineExtras):
752 752 pass
753 753
@@ -1,519 +1,521 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 pjoin = os.path.join
22 22
23 23 from twisted.internet import reactor, defer
24 24 from twisted.internet.protocol import ProcessProtocol
25 from twisted.python import failure, log
26 25 from twisted.internet.error import ProcessDone, ProcessTerminated
27 26 from twisted.internet.utils import getProcessOutput
27 from twisted.python import failure, log
28 28
29 29 from IPython.external import argparse
30 30 from IPython.external import Itpl
31 from IPython.kernel.twistedutil import gatherBoth
32 from IPython.kernel.util import printer
33 31 from IPython.genutils import get_ipython_dir, num_cpus
34 32 from IPython.kernel.fcutil import have_crypto
35 33 from IPython.kernel.error import SecurityError
34 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.twistedutil import gatherBoth
36 from IPython.kernel.util import printer
37
36 38
37 39 #-----------------------------------------------------------------------------
38 40 # General process handling code
39 41 #-----------------------------------------------------------------------------
40 42
41 43 def find_exe(cmd):
42 44 try:
43 45 import win32api
44 46 except ImportError:
45 47 raise ImportError('you need to have pywin32 installed for this to work')
46 48 else:
47 try:
48 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
49 except:
50 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
49 try:
50 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 except:
52 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
51 53 return path
52 54
53 55 class ProcessStateError(Exception):
54 56 pass
55 57
56 58 class UnknownStatus(Exception):
57 59 pass
58 60
59 61 class LauncherProcessProtocol(ProcessProtocol):
60 62 """
61 63 A ProcessProtocol to go with the ProcessLauncher.
62 64 """
63 65 def __init__(self, process_launcher):
64 66 self.process_launcher = process_launcher
65 67
66 68 def connectionMade(self):
67 69 self.process_launcher.fire_start_deferred(self.transport.pid)
68 70
69 71 def processEnded(self, status):
70 72 value = status.value
71 73 if isinstance(value, ProcessDone):
72 74 self.process_launcher.fire_stop_deferred(0)
73 75 elif isinstance(value, ProcessTerminated):
74 76 self.process_launcher.fire_stop_deferred(
75 77 {'exit_code':value.exitCode,
76 78 'signal':value.signal,
77 79 'status':value.status
78 80 }
79 81 )
80 82 else:
81 83 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
82 84
83 85 def outReceived(self, data):
84 86 log.msg(data)
85 87
86 88 def errReceived(self, data):
87 89 log.err(data)
88 90
89 91 class ProcessLauncher(object):
90 92 """
91 93 Start and stop an external process in an asynchronous manner.
92 94
93 95 Currently this uses deferreds to notify other parties of process state
94 96 changes. This is an awkward design and should be moved to using
95 97 a formal NotificationCenter.
96 98 """
97 99 def __init__(self, cmd_and_args):
98 100 self.cmd = cmd_and_args[0]
99 101 self.args = cmd_and_args
100 102 self._reset()
101 103
102 104 def _reset(self):
103 105 self.process_protocol = None
104 106 self.pid = None
105 107 self.start_deferred = None
106 108 self.stop_deferreds = []
107 109 self.state = 'before' # before, running, or after
108 110
109 111 @property
110 112 def running(self):
111 113 if self.state == 'running':
112 114 return True
113 115 else:
114 116 return False
115 117
116 118 def fire_start_deferred(self, pid):
117 119 self.pid = pid
118 120 self.state = 'running'
119 121 log.msg('Process %r has started with pid=%i' % (self.args, pid))
120 122 self.start_deferred.callback(pid)
121 123
122 124 def start(self):
123 125 if self.state == 'before':
124 126 self.process_protocol = LauncherProcessProtocol(self)
125 127 self.start_deferred = defer.Deferred()
126 128 self.process_transport = reactor.spawnProcess(
127 129 self.process_protocol,
128 130 self.cmd,
129 131 self.args,
130 132 env=os.environ
131 133 )
132 134 return self.start_deferred
133 135 else:
134 136 s = 'the process has already been started and has state: %r' % \
135 137 self.state
136 138 return defer.fail(ProcessStateError(s))
137 139
138 140 def get_stop_deferred(self):
139 141 if self.state == 'running' or self.state == 'before':
140 142 d = defer.Deferred()
141 143 self.stop_deferreds.append(d)
142 144 return d
143 145 else:
144 146 s = 'this process is already complete'
145 147 return defer.fail(ProcessStateError(s))
146 148
147 149 def fire_stop_deferred(self, exit_code):
148 150 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
149 151 self.state = 'after'
150 152 for d in self.stop_deferreds:
151 153 d.callback(exit_code)
152 154
153 155 def signal(self, sig):
154 156 """
155 157 Send a signal to the process.
156 158
157 159 The argument sig can be ('KILL','INT', etc.) or any signal number.
158 160 """
159 161 if self.state == 'running':
160 162 self.process_transport.signalProcess(sig)
161 163
162 164 # def __del__(self):
163 165 # self.signal('KILL')
164 166
165 167 def interrupt_then_kill(self, delay=1.0):
166 168 self.signal('INT')
167 169 reactor.callLater(delay, self.signal, 'KILL')
168 170
169 171
170 172 #-----------------------------------------------------------------------------
171 173 # Code for launching controller and engines
172 174 #-----------------------------------------------------------------------------
173 175
174 176
175 177 class ControllerLauncher(ProcessLauncher):
176 178
177 179 def __init__(self, extra_args=None):
178 180 if sys.platform == 'win32':
179 181 # This logic is needed because the ipcontroller script doesn't
180 182 # always get installed in the same way or in the same location.
181 183 from IPython.kernel.scripts import ipcontroller
182 184 script_location = ipcontroller.__file__.replace('.pyc', '.py')
183 185 # The -u option here turns on unbuffered output, which is required
184 186 # on Win32 to prevent wierd conflict and problems with Twisted
185 187 args = [find_exe('python'), '-u', script_location]
186 188 else:
187 189 args = ['ipcontroller']
188 190 self.extra_args = extra_args
189 191 if extra_args is not None:
190 192 args.extend(extra_args)
191 193
192 194 ProcessLauncher.__init__(self, args)
193 195
194 196
195 197 class EngineLauncher(ProcessLauncher):
196 198
197 199 def __init__(self, extra_args=None):
198 200 if sys.platform == 'win32':
199 201 # This logic is needed because the ipcontroller script doesn't
200 202 # always get installed in the same way or in the same location.
201 203 from IPython.kernel.scripts import ipengine
202 204 script_location = ipengine.__file__.replace('.pyc', '.py')
203 205 # The -u option here turns on unbuffered output, which is required
204 206 # on Win32 to prevent wierd conflict and problems with Twisted
205 207 args = [find_exe('python'), '-u', script_location]
206 208 else:
207 209 args = ['ipengine']
208 210 self.extra_args = extra_args
209 211 if extra_args is not None:
210 212 args.extend(extra_args)
211 213
212 214 ProcessLauncher.__init__(self, args)
213 215
214 216
215 217 class LocalEngineSet(object):
216 218
217 219 def __init__(self, extra_args=None):
218 220 self.extra_args = extra_args
219 221 self.launchers = []
220 222
221 223 def start(self, n):
222 224 dlist = []
223 225 for i in range(n):
224 226 el = EngineLauncher(extra_args=self.extra_args)
225 227 d = el.start()
226 228 self.launchers.append(el)
227 229 dlist.append(d)
228 230 dfinal = gatherBoth(dlist, consumeErrors=True)
229 231 dfinal.addCallback(self._handle_start)
230 232 return dfinal
231 233
232 234 def _handle_start(self, r):
233 235 log.msg('Engines started with pids: %r' % r)
234 236 return r
235 237
236 238 def _handle_stop(self, r):
237 239 log.msg('Engines received signal: %r' % r)
238 240 return r
239 241
240 242 def signal(self, sig):
241 243 dlist = []
242 244 for el in self.launchers:
243 245 d = el.get_stop_deferred()
244 246 dlist.append(d)
245 247 el.signal(sig)
246 248 dfinal = gatherBoth(dlist, consumeErrors=True)
247 249 dfinal.addCallback(self._handle_stop)
248 250 return dfinal
249 251
250 252 def interrupt_then_kill(self, delay=1.0):
251 253 dlist = []
252 254 for el in self.launchers:
253 255 d = el.get_stop_deferred()
254 256 dlist.append(d)
255 257 el.interrupt_then_kill(delay)
256 258 dfinal = gatherBoth(dlist, consumeErrors=True)
257 259 dfinal.addCallback(self._handle_stop)
258 260 return dfinal
259 261
260 262
261 263 class BatchEngineSet(object):
262 264
263 265 # Subclasses must fill these in. See PBSEngineSet
264 266 submit_command = ''
265 267 delete_command = ''
266 268 job_id_regexp = ''
267 269
268 270 def __init__(self, template_file, **kwargs):
269 271 self.template_file = template_file
270 272 self.context = {}
271 273 self.context.update(kwargs)
272 274 self.batch_file = self.template_file+'-run'
273 275
274 276 def parse_job_id(self, output):
275 277 m = re.match(self.job_id_regexp, output)
276 278 if m is not None:
277 279 job_id = m.group()
278 280 else:
279 281 raise Exception("job id couldn't be determined: %s" % output)
280 282 self.job_id = job_id
281 283 log.msg('Job started with job id: %r' % job_id)
282 284 return job_id
283 285
284 286 def write_batch_script(self, n):
285 287 self.context['n'] = n
286 288 template = open(self.template_file, 'r').read()
287 289 log.msg('Using template for batch script: %s' % self.template_file)
288 290 script_as_string = Itpl.itplns(template, self.context)
289 291 log.msg('Writing instantiated batch script: %s' % self.batch_file)
290 292 f = open(self.batch_file,'w')
291 293 f.write(script_as_string)
292 294 f.close()
293
295
294 296 def handle_error(self, f):
295 297 f.printTraceback()
296 298 f.raiseException()
297 299
298 300 def start(self, n):
299 301 self.write_batch_script(n)
300 302 d = getProcessOutput(self.submit_command,
301 303 [self.batch_file],env=os.environ)
302 304 d.addCallback(self.parse_job_id)
303 305 d.addErrback(self.handle_error)
304 306 return d
305
307
306 308 def kill(self):
307 309 d = getProcessOutput(self.delete_command,
308 310 [self.job_id],env=os.environ)
309 311 return d
310 312
311 313 class PBSEngineSet(BatchEngineSet):
312 314
313 315 submit_command = 'qsub'
314 316 delete_command = 'qdel'
315 317 job_id_regexp = '\d+'
316 318
317 319 def __init__(self, template_file, **kwargs):
318 320 BatchEngineSet.__init__(self, template_file, **kwargs)
319 321
320 322
321 323 #-----------------------------------------------------------------------------
322 324 # Main functions for the different types of clusters
323 325 #-----------------------------------------------------------------------------
324 326
325 327 # TODO:
326 328 # The logic in these codes should be moved into classes like LocalCluster
327 329 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
328 330 # The main functions should then just parse the command line arguments, create
329 331 # the appropriate class and call a 'start' method.
330 332
331 333 def check_security(args, cont_args):
332 334 if (not args.x or not args.y) and not have_crypto:
333 335 log.err("""
334 336 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
335 337 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
336 338 reactor.stop()
337 339 return False
338 340 if args.x:
339 341 cont_args.append('-x')
340 342 if args.y:
341 343 cont_args.append('-y')
342 344 return True
343 345
344 346 def main_local(args):
345 347 cont_args = []
346 348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
347 349
348 350 # Check security settings before proceeding
349 keep_going = check_security(args, cont_args)
350 if not keep_going: return
351 if not check_security(args, cont_args):
352 return
351 353
352 354 cl = ControllerLauncher(extra_args=cont_args)
353 355 dstart = cl.start()
354 356 def start_engines(cont_pid):
355 357 engine_args = []
356 358 engine_args.append('--logfile=%s' % \
357 359 pjoin(args.logdir,'ipengine%s-' % cont_pid))
358 360 eset = LocalEngineSet(extra_args=engine_args)
359 361 def shutdown(signum, frame):
360 362 log.msg('Stopping local cluster')
361 363 # We are still playing with the times here, but these seem
362 364 # to be reliable in allowing everything to exit cleanly.
363 365 eset.interrupt_then_kill(0.5)
364 366 cl.interrupt_then_kill(0.5)
365 367 reactor.callLater(1.0, reactor.stop)
366 368 signal.signal(signal.SIGINT,shutdown)
367 369 d = eset.start(args.n)
368 370 return d
369 371 def delay_start(cont_pid):
370 372 # This is needed because the controller doesn't start listening
371 373 # right when it starts and the controller needs to write
372 374 # furl files for the engine to pick up
373 375 reactor.callLater(1.0, start_engines, cont_pid)
374 376 dstart.addCallback(delay_start)
375 377 dstart.addErrback(lambda f: f.raiseException())
376 378
377 379 def main_mpirun(args):
378 380 cont_args = []
379 381 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
380 382
381 383 # Check security settings before proceeding
382 keep_going = check_security(args, cont_args)
383 if not keep_going: return
384 if not check_security(args, cont_args):
385 return
384 386
385 387 cl = ControllerLauncher(extra_args=cont_args)
386 388 dstart = cl.start()
387 389 def start_engines(cont_pid):
388 390 raw_args = ['mpirun']
389 391 raw_args.extend(['-n',str(args.n)])
390 392 raw_args.append('ipengine')
391 393 raw_args.append('-l')
392 394 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
393 395 if args.mpi:
394 396 raw_args.append('--mpi=%s' % args.mpi)
395 397 eset = ProcessLauncher(raw_args)
396 398 def shutdown(signum, frame):
397 399 log.msg('Stopping local cluster')
398 400 # We are still playing with the times here, but these seem
399 401 # to be reliable in allowing everything to exit cleanly.
400 402 eset.interrupt_then_kill(1.0)
401 403 cl.interrupt_then_kill(1.0)
402 404 reactor.callLater(2.0, reactor.stop)
403 405 signal.signal(signal.SIGINT,shutdown)
404 406 d = eset.start()
405 407 return d
406 408 def delay_start(cont_pid):
407 409 # This is needed because the controller doesn't start listening
408 410 # right when it starts and the controller needs to write
409 411 # furl files for the engine to pick up
410 412 reactor.callLater(1.0, start_engines, cont_pid)
411 413 dstart.addCallback(delay_start)
412 414 dstart.addErrback(lambda f: f.raiseException())
413 415
414 416 def main_pbs(args):
415 417 cont_args = []
416 418 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
417 419
418 420 # Check security settings before proceeding
419 keep_going = check_security(args, cont_args)
420 if not keep_going: return
421 if not check_security(args, cont_args):
422 return
421 423
422 424 cl = ControllerLauncher(extra_args=cont_args)
423 425 dstart = cl.start()
424 426 def start_engines(r):
425 427 pbs_set = PBSEngineSet(args.pbsscript)
426 428 def shutdown(signum, frame):
427 429 log.msg('Stopping pbs cluster')
428 430 d = pbs_set.kill()
429 431 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
430 432 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
431 433 signal.signal(signal.SIGINT,shutdown)
432 434 d = pbs_set.start(args.n)
433 435 return d
434 436 dstart.addCallback(start_engines)
435 437 dstart.addErrback(lambda f: f.raiseException())
436 438
437 439
438 440 def get_args():
439 441 base_parser = argparse.ArgumentParser(add_help=False)
440 442 base_parser.add_argument(
441 443 '-x',
442 444 action='store_true',
443 445 dest='x',
444 446 help='turn off client security'
445 447 )
446 448 base_parser.add_argument(
447 449 '-y',
448 450 action='store_true',
449 451 dest='y',
450 452 help='turn off engine security'
451 453 )
452 454 base_parser.add_argument(
453 455 "--logdir",
454 456 type=str,
455 457 dest="logdir",
456 458 help="directory to put log files (default=$IPYTHONDIR/log)",
457 459 default=pjoin(get_ipython_dir(),'log')
458 460 )
459 461 base_parser.add_argument(
460 462 "-n",
461 463 "--num",
462 464 type=int,
463 465 dest="n",
464 466 default=2,
465 467 help="the number of engines to start"
466 468 )
467 469
468 470 parser = argparse.ArgumentParser(
469 471 description='IPython cluster startup. This starts a controller and\
470 472 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
471 473 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
472 474 )
473 475 subparsers = parser.add_subparsers(
474 476 help='available cluster types. For help, do "ipcluster TYPE --help"')
475 477
476 478 parser_local = subparsers.add_parser(
477 479 'local',
478 480 help='run a local cluster',
479 481 parents=[base_parser]
480 482 )
481 483 parser_local.set_defaults(func=main_local)
482 484
483 485 parser_mpirun = subparsers.add_parser(
484 486 'mpirun',
485 487 help='run a cluster using mpirun',
486 488 parents=[base_parser]
487 489 )
488 490 parser_mpirun.add_argument(
489 491 "--mpi",
490 492 type=str,
491 493 dest="mpi", # Don't put a default here to allow no MPI support
492 494 help="how to call MPI_Init (default=mpi4py)"
493 495 )
494 496 parser_mpirun.set_defaults(func=main_mpirun)
495 497
496 498 parser_pbs = subparsers.add_parser(
497 499 'pbs',
498 500 help='run a pbs cluster',
499 501 parents=[base_parser]
500 502 )
501 503 parser_pbs.add_argument(
502 504 '--pbs-script',
503 505 type=str,
504 506 dest='pbsscript',
505 507 help='PBS script template',
506 508 default='pbs.template'
507 509 )
508 510 parser_pbs.set_defaults(func=main_pbs)
509 511 args = parser.parse_args()
510 512 return args
511 513
512 514 def main():
513 515 args = get_args()
514 516 reactor.callWhenRunning(args.func, args)
515 517 log.startLogging(sys.stdout)
516 518 reactor.run()
517 519
518 520 if __name__ == '__main__':
519 521 main()
General Comments 0
You need to be logged in to leave comments. Login now