##// END OF EJS Templates
Messing with newlines.
Brian Granger -
Show More
@@ -1,903 +1,904 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3 3
4 4 """A Twisted Service Representation of the IPython core.
5 5
6 6 The IPython Core exposed to the network is called the Engine. Its
7 7 representation in Twisted in the EngineService. Interfaces and adapters
8 8 are used to abstract out the details of the actual network protocol used.
9 9 The EngineService is an Engine that knows nothing about the actual protocol
10 10 used.
11 11
12 12 The EngineService is exposed with various network protocols in modules like:
13 13
14 14 enginepb.py
15 15 enginevanilla.py
16 16
17 17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 18 felt that we had over-engineered things. To improve the maintainability of the
19 19 code we have taken out the ICompleteEngine interface and the completeEngine
20 20 method that automatically added methods to engines.
21 21
22 22 """
23 23
24 24 __docformat__ = "restructuredtext en"
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Copyright (C) 2008 The IPython Development Team
28 28 #
29 29 # Distributed under the terms of the BSD License. The full license is in
30 30 # the file COPYING, distributed as part of this software.
31 31 #-------------------------------------------------------------------------------
32 32
33 33 #-------------------------------------------------------------------------------
34 34 # Imports
35 35 #-------------------------------------------------------------------------------
36 36
37 37 import os, sys, copy
38 38 import cPickle as pickle
39 39 from new import instancemethod
40 40
41 41 from twisted.application import service
42 42 from twisted.internet import defer, reactor
43 43 from twisted.python import log, failure, components
44 44 import zope.interface as zi
45 45
46 46 from IPython.kernel.core.interpreter import Interpreter
47 47 from IPython.kernel import newserialized, error, util
48 48 from IPython.kernel.util import printer
49 49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
50 50 from IPython.kernel import codeutil
51 51
52 52
53 53 #-------------------------------------------------------------------------------
54 54 # Interface specification for the Engine
55 55 #-------------------------------------------------------------------------------
56 56
57 57 class IEngineCore(zi.Interface):
58 58 """The minimal required interface for the IPython Engine.
59 59
60 60 This interface provides a formal specification of the IPython core.
61 61 All these methods should return deferreds regardless of what side of a
62 62 network connection they are on.
63 63
64 64 In general, this class simply wraps a shell class and wraps its return
65 65 values as Deferred objects. If the underlying shell class method raises
66 66 an exception, this class should convert it to a twisted.failure.Failure
67 67 that will be propagated along the Deferred's errback chain.
68 68
69 69 In addition, Failures are aggressive. By this, we mean that if a method
70 70 is performing multiple actions (like pulling multiple object) if any
71 71 single one fails, the entire method will fail with that Failure. It is
72 72 all or nothing.
73 73 """
74 74
75 75 id = zi.interface.Attribute("the id of the Engine object")
76 76 properties = zi.interface.Attribute("A dict of properties of the Engine")
77 77
78 78 def execute(lines):
79 79 """Execute lines of Python code.
80 80
81 81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
82 82 upon success.
83 83
84 84 Returns a failure object if the execution of lines raises an exception.
85 85 """
86 86
87 87 def push(namespace):
88 88 """Push dict namespace into the user's namespace.
89 89
90 90 Returns a deferred to None or a failure.
91 91 """
92 92
93 93 def pull(keys):
94 94 """Pulls values out of the user's namespace by keys.
95 95
96 96 Returns a deferred to a tuple objects or a single object.
97 97
98 98 Raises NameError if any one of objects doess not exist.
99 99 """
100 100
101 101 def push_function(namespace):
102 102 """Push a dict of key, function pairs into the user's namespace.
103 103
104 104 Returns a deferred to None or a failure."""
105 105
106 106 def pull_function(keys):
107 107 """Pulls functions out of the user's namespace by keys.
108 108
109 109 Returns a deferred to a tuple of functions or a single function.
110 110
111 111 Raises NameError if any one of the functions does not exist.
112 112 """
113 113
114 114 def get_result(i=None):
115 115 """Get the stdin/stdout/stderr of command i.
116 116
117 117 Returns a deferred to a dict with keys
118 118 (id, number, stdin, stdout, stderr).
119 119
120 120 Raises IndexError if command i does not exist.
121 121 Raises TypeError if i in not an int.
122 122 """
123 123
124 124 def reset():
125 125 """Reset the shell.
126 126
127 127 This clears the users namespace. Won't cause modules to be
128 128 reloaded. Should also re-initialize certain variables like id.
129 129 """
130 130
131 131 def kill():
132 132 """Kill the engine by stopping the reactor."""
133 133
134 134 def keys():
135 135 """Return the top level variables in the users namspace.
136 136
137 137 Returns a deferred to a dict."""
138 138
139 139
140 140 class IEngineSerialized(zi.Interface):
141 141 """Push/Pull methods that take Serialized objects.
142 142
143 143 All methods should return deferreds.
144 144 """
145 145
146 146 def push_serialized(namespace):
147 147 """Push a dict of keys and Serialized objects into the user's namespace."""
148 148
149 149 def pull_serialized(keys):
150 150 """Pull objects by key from the user's namespace as Serialized.
151 151
152 152 Returns a list of or one Serialized.
153 153
154 154 Raises NameError is any one of the objects does not exist.
155 155 """
156 156
157 157
158 158 class IEngineProperties(zi.Interface):
159 159 """Methods for access to the properties object of an Engine"""
160 160
161 161 properties = zi.Attribute("A StrictDict object, containing the properties")
162 162
163 163 def set_properties(properties):
164 164 """set properties by key and value"""
165 165
166 166 def get_properties(keys=None):
167 167 """get a list of properties by `keys`, if no keys specified, get all"""
168 168
169 169 def del_properties(keys):
170 170 """delete properties by `keys`"""
171 171
172 172 def has_properties(keys):
173 173 """get a list of bool values for whether `properties` has `keys`"""
174 174
175 175 def clear_properties():
176 176 """clear the properties dict"""
177 177
178 178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
179 179 """The basic engine interface that EngineService will implement.
180 180
181 181 This exists so it is easy to specify adapters that adapt to and from the
182 182 API that the basic EngineService implements.
183 183 """
184 184 pass
185 185
186 186 class IEngineQueued(IEngineBase):
187 187 """Interface for adding a queue to an IEngineBase.
188 188
189 189 This interface extends the IEngineBase interface to add methods for managing
190 190 the engine's queue. The implicit details of this interface are that the
191 191 execution of all methods declared in IEngineBase should appropriately be
192 192 put through a queue before execution.
193 193
194 194 All methods should return deferreds.
195 195 """
196 196
197 197 def clear_queue():
198 198 """Clear the queue."""
199 199
200 200 def queue_status():
201 201 """Get the queued and pending commands in the queue."""
202 202
203 203 def register_failure_observer(obs):
204 204 """Register an observer of pending Failures.
205 205
206 206 The observer must implement IFailureObserver.
207 207 """
208 208
209 209 def unregister_failure_observer(obs):
210 210 """Unregister an observer of pending Failures."""
211 211
212 212
213 213 class IEngineThreaded(zi.Interface):
214 214 """A place holder for threaded commands.
215 215
216 216 All methods should return deferreds.
217 217 """
218 218 pass
219 219
220 220
221 221 #-------------------------------------------------------------------------------
222 222 # Functions and classes to implement the EngineService
223 223 #-------------------------------------------------------------------------------
224 224
225 225
226 226 class StrictDict(dict):
227 227 """This is a strict copying dictionary for use as the interface to the
228 228 properties of an Engine.
229 229
230 230 :IMPORTANT:
231 231 This object copies the values you set to it, and returns copies to you
232 232 when you request them. The only way to change properties os explicitly
233 233 through the setitem and getitem of the dictionary interface.
234 234
235 235 Example:
236 236 >>> e = get_engine(id)
237 237 >>> L = [1,2,3]
238 238 >>> e.properties['L'] = L
239 239 >>> L == e.properties['L']
240 240 True
241 241 >>> L.append(99)
242 242 >>> L == e.properties['L']
243 243 False
244 244
245 245 Note that getitem copies, so calls to methods of objects do not affect
246 246 the properties, as seen here:
247 247
248 248 >>> e.properties[1] = range(2)
249 249 >>> print e.properties[1]
250 250 [0, 1]
251 251 >>> e.properties[1].append(2)
252 252 >>> print e.properties[1]
253 253 [0, 1]
254 254 """
255 255 def __init__(self, *args, **kwargs):
256 256 dict.__init__(self, *args, **kwargs)
257 257 self.modified = True
258 258
259 259 def __getitem__(self, key):
260 260 return copy.deepcopy(dict.__getitem__(self, key))
261 261
262 262 def __setitem__(self, key, value):
263 263 # check if this entry is valid for transport around the network
264 264 # and copying
265 265 try:
266 266 pickle.dumps(key, 2)
267 267 pickle.dumps(value, 2)
268 268 newvalue = copy.deepcopy(value)
269 269 except:
270 270 raise error.InvalidProperty(value)
271 271 dict.__setitem__(self, key, newvalue)
272 272 self.modified = True
273 273
274 274 def __delitem__(self, key):
275 275 dict.__delitem__(self, key)
276 276 self.modified = True
277 277
278 278 def update(self, dikt):
279 279 for k,v in dikt.iteritems():
280 280 self[k] = v
281 281
282 282 def pop(self, key):
283 283 self.modified = True
284 284 return dict.pop(self, key)
285 285
286 286 def popitem(self):
287 287 self.modified = True
288 288 return dict.popitem(self)
289 289
290 290 def clear(self):
291 291 self.modified = True
292 292 dict.clear(self)
293 293
294 294 def subDict(self, *keys):
295 295 d = {}
296 296 for key in keys:
297 297 d[key] = self[key]
298 298 return d
299 299
300 300
301 301
302 302 class EngineAPI(object):
303 303 """This is the object through which the user can edit the `properties`
304 304 attribute of an Engine.
305 305 The Engine Properties object copies all object in and out of itself.
306 306 See the EngineProperties object for details.
307 307 """
308 308 _fix=False
309 309 def __init__(self, id):
310 310 self.id = id
311 311 self.properties = StrictDict()
312 312 self._fix=True
313 313
314 314 def __setattr__(self, k,v):
315 315 if self._fix:
316 316 raise error.KernelError("I am protected!")
317 317 else:
318 318 object.__setattr__(self, k, v)
319 319
320 320 def __delattr__(self, key):
321 321 raise error.KernelError("I am protected!")
322 322
323 323
324 324 _apiDict = {}
325 325
326 326 def get_engine(id):
327 327 """Get the Engine API object, whcih currently just provides the properties
328 328 object, by ID"""
329 329 global _apiDict
330 330 if not _apiDict.get(id):
331 331 _apiDict[id] = EngineAPI(id)
332 332 return _apiDict[id]
333 333
334 334 def drop_engine(id):
335 335 """remove an engine"""
336 336 global _apiDict
337 337 if _apiDict.has_key(id):
338 338 del _apiDict[id]
339 339
340 340 class EngineService(object, service.Service):
341 341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
342 342
343 343 zi.implements(IEngineBase)
344 344 name = 'EngineService'
345 345
346 346 def __init__(self, shellClass=Interpreter, mpi=None):
347 347 """Create an EngineService.
348 348
349 349 shellClass: something that implements IInterpreter or core1
350 350 mpi: an mpi module that has rank and size attributes
351 351 """
352 352 self.shellClass = shellClass
353 353 self.shell = self.shellClass()
354 354 self.mpi = mpi
355 355 self.id = None
356 356 self.properties = get_engine(self.id).properties
357 357 if self.mpi is not None:
358 358 log.msg("MPI started with rank = %i and size = %i" %
359 359 (self.mpi.rank, self.mpi.size))
360 360 self.id = self.mpi.rank
361 361 self._seedNamespace()
362 362
363 363 # Make id a property so that the shell can get the updated id
364 364
365 365 def _setID(self, id):
366 366 self._id = id
367 367 self.properties = get_engine(id).properties
368 368 self.shell.push({'id': id})
369 369
370 370 def _getID(self):
371 371 return self._id
372 372
373 373 id = property(_getID, _setID)
374 374
375 375 def _seedNamespace(self):
376 376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
377 377
378 378 def executeAndRaise(self, msg, callable, *args, **kwargs):
379 379 """Call a method of self.shell and wrap any exception."""
380 380 d = defer.Deferred()
381 381 try:
382 382 result = callable(*args, **kwargs)
383 383 except:
384 384 # This gives the following:
385 385 # et=exception class
386 386 # ev=exception class instance
387 387 # tb=traceback object
388 388 et,ev,tb = sys.exc_info()
389 389 # This call adds attributes to the exception value
390 390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
391 391 # Add another attribute
392 392 ev._ipython_engine_info = msg
393 393 f = failure.Failure(ev,et,None)
394 394 d.errback(f)
395 395 else:
396 396 d.callback(result)
397 397
398 398 return d
399 399
400 400
401 401 # The IEngine methods. See the interface for documentation.
402 402
403 @profile
403 404 def execute(self, lines):
404 405 msg = {'engineid':self.id,
405 406 'method':'execute',
406 407 'args':[lines]}
407 408 d = self.executeAndRaise(msg, self.shell.execute, lines)
408 409 d.addCallback(self.addIDToResult)
409 410 return d
410 411
411 412 def addIDToResult(self, result):
412 413 result['id'] = self.id
413 414 return result
414 415
415 416 def push(self, namespace):
416 417 msg = {'engineid':self.id,
417 418 'method':'push',
418 419 'args':[repr(namespace.keys())]}
419 420 d = self.executeAndRaise(msg, self.shell.push, namespace)
420 421 return d
421 422
422 423 def pull(self, keys):
423 424 msg = {'engineid':self.id,
424 425 'method':'pull',
425 426 'args':[repr(keys)]}
426 427 d = self.executeAndRaise(msg, self.shell.pull, keys)
427 428 return d
428 429
429 430 def push_function(self, namespace):
430 431 msg = {'engineid':self.id,
431 432 'method':'push_function',
432 433 'args':[repr(namespace.keys())]}
433 434 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
434 435 return d
435 436
436 437 def pull_function(self, keys):
437 438 msg = {'engineid':self.id,
438 439 'method':'pull_function',
439 440 'args':[repr(keys)]}
440 441 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
441 442 return d
442 443
443 444 def get_result(self, i=None):
444 445 msg = {'engineid':self.id,
445 446 'method':'get_result',
446 447 'args':[repr(i)]}
447 448 d = self.executeAndRaise(msg, self.shell.getCommand, i)
448 449 d.addCallback(self.addIDToResult)
449 450 return d
450 451
451 452 def reset(self):
452 453 msg = {'engineid':self.id,
453 454 'method':'reset',
454 455 'args':[]}
455 456 del self.shell
456 457 self.shell = self.shellClass()
457 458 self.properties.clear()
458 459 d = self.executeAndRaise(msg, self._seedNamespace)
459 460 return d
460 461
461 462 def kill(self):
462 463 drop_engine(self.id)
463 464 try:
464 465 reactor.stop()
465 466 except RuntimeError:
466 467 log.msg('The reactor was not running apparently.')
467 468 return defer.fail()
468 469 else:
469 470 return defer.succeed(None)
470 471
471 472 def keys(self):
472 473 """Return a list of variables names in the users top level namespace.
473 474
474 475 This used to return a dict of all the keys/repr(values) in the
475 476 user's namespace. This was too much info for the ControllerService
476 477 to handle so it is now just a list of keys.
477 478 """
478 479
479 480 remotes = []
480 481 for k in self.shell.user_ns.iterkeys():
481 482 if k not in ['__name__', '_ih', '_oh', '__builtins__',
482 483 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
483 484 remotes.append(k)
484 485 return defer.succeed(remotes)
485 486
486 487 def set_properties(self, properties):
487 488 msg = {'engineid':self.id,
488 489 'method':'set_properties',
489 490 'args':[repr(properties.keys())]}
490 491 return self.executeAndRaise(msg, self.properties.update, properties)
491 492
492 493 def get_properties(self, keys=None):
493 494 msg = {'engineid':self.id,
494 495 'method':'get_properties',
495 496 'args':[repr(keys)]}
496 497 if keys is None:
497 498 keys = self.properties.keys()
498 499 return self.executeAndRaise(msg, self.properties.subDict, *keys)
499 500
500 501 def _doDel(self, keys):
501 502 for key in keys:
502 503 del self.properties[key]
503 504
504 505 def del_properties(self, keys):
505 506 msg = {'engineid':self.id,
506 507 'method':'del_properties',
507 508 'args':[repr(keys)]}
508 509 return self.executeAndRaise(msg, self._doDel, keys)
509 510
510 511 def _doHas(self, keys):
511 512 return [self.properties.has_key(key) for key in keys]
512 513
513 514 def has_properties(self, keys):
514 515 msg = {'engineid':self.id,
515 516 'method':'has_properties',
516 517 'args':[repr(keys)]}
517 518 return self.executeAndRaise(msg, self._doHas, keys)
518 519
519 520 def clear_properties(self):
520 521 msg = {'engineid':self.id,
521 522 'method':'clear_properties',
522 523 'args':[]}
523 524 return self.executeAndRaise(msg, self.properties.clear)
524 525
525 526 def push_serialized(self, sNamespace):
526 527 msg = {'engineid':self.id,
527 528 'method':'push_serialized',
528 529 'args':[repr(sNamespace.keys())]}
529 530 ns = {}
530 531 for k,v in sNamespace.iteritems():
531 532 try:
532 533 unserialized = newserialized.IUnSerialized(v)
533 534 ns[k] = unserialized.getObject()
534 535 except:
535 536 return defer.fail()
536 537 return self.executeAndRaise(msg, self.shell.push, ns)
537 538
538 539 def pull_serialized(self, keys):
539 540 msg = {'engineid':self.id,
540 541 'method':'pull_serialized',
541 542 'args':[repr(keys)]}
542 543 if isinstance(keys, str):
543 544 keys = [keys]
544 545 if len(keys)==1:
545 546 d = self.executeAndRaise(msg, self.shell.pull, keys)
546 547 d.addCallback(newserialized.serialize)
547 548 return d
548 549 elif len(keys)>1:
549 550 d = self.executeAndRaise(msg, self.shell.pull, keys)
550 551 @d.addCallback
551 552 def packThemUp(values):
552 553 serials = []
553 554 for v in values:
554 555 try:
555 556 serials.append(newserialized.serialize(v))
556 557 except:
557 558 return defer.fail(failure.Failure())
558 559 return serials
559 560 return packThemUp
560 561
561 562
562 563 def queue(methodToQueue):
563 564 def queuedMethod(this, *args, **kwargs):
564 565 name = methodToQueue.__name__
565 566 return this.submitCommand(Command(name, *args, **kwargs))
566 567 return queuedMethod
567 568
568 569 class QueuedEngine(object):
569 570 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
570 571
571 572 The resulting object will implement IEngineQueued which extends
572 573 IEngineCore which extends (IEngineBase, IEngineSerialized).
573 574
574 575 This seems like the best way of handling it, but I am not sure. The
575 576 other option is to have the various base interfaces be used like
576 577 mix-in intefaces. The problem I have with this is adpatation is
577 578 more difficult and complicated because there can be can multiple
578 579 original and final Interfaces.
579 580 """
580 581
581 582 zi.implements(IEngineQueued)
582 583
583 584 def __init__(self, engine):
584 585 """Create a QueuedEngine object from an engine
585 586
586 587 engine: An implementor of IEngineCore and IEngineSerialized
587 588 keepUpToDate: whether to update the remote status when the
588 589 queue is empty. Defaults to False.
589 590 """
590 591
591 592 # This is the right way to do these tests rather than
592 593 # IEngineCore in list(zi.providedBy(engine)) which will only
593 594 # picks of the interfaces that are directly declared by engine.
594 595 assert IEngineBase.providedBy(engine), \
595 596 "engine passed to QueuedEngine doesn't provide IEngineBase"
596 597
597 598 self.engine = engine
598 599 self.id = engine.id
599 600 self.queued = []
600 601 self.history = {}
601 602 self.engineStatus = {}
602 603 self.currentCommand = None
603 604 self.failureObservers = []
604 605
605 606 def _get_properties(self):
606 607 return self.engine.properties
607 608
608 609 properties = property(_get_properties, lambda self, _: None)
609 610 # Queue management methods. You should not call these directly
610 611
611 612 def submitCommand(self, cmd):
612 613 """Submit command to queue."""
613 614
614 615 d = defer.Deferred()
615 616 cmd.setDeferred(d)
616 617 if self.currentCommand is not None:
617 618 if self.currentCommand.finished:
618 619 # log.msg("Running command immediately: %r" % cmd)
619 620 self.currentCommand = cmd
620 621 self.runCurrentCommand()
621 622 else: # command is still running
622 623 # log.msg("Command is running: %r" % self.currentCommand)
623 624 # log.msg("Queueing: %r" % cmd)
624 625 self.queued.append(cmd)
625 626 else:
626 627 # log.msg("No current commands, running: %r" % cmd)
627 628 self.currentCommand = cmd
628 629 self.runCurrentCommand()
629 630 return d
630 631
631 632 def runCurrentCommand(self):
632 633 """Run current command."""
633 634
634 635 cmd = self.currentCommand
635 636 f = getattr(self.engine, cmd.remoteMethod, None)
636 637 if f:
637 638 d = f(*cmd.args, **cmd.kwargs)
638 639 if cmd.remoteMethod is 'execute':
639 640 d.addCallback(self.saveResult)
640 641 d.addCallback(self.finishCommand)
641 642 d.addErrback(self.abortCommand)
642 643 else:
643 644 return defer.fail(AttributeError(cmd.remoteMethod))
644 645
645 646 def _flushQueue(self):
646 647 """Pop next command in queue and run it."""
647 648
648 649 if len(self.queued) > 0:
649 650 self.currentCommand = self.queued.pop(0)
650 651 self.runCurrentCommand()
651 652
652 653 def saveResult(self, result):
653 654 """Put the result in the history."""
654 655 self.history[result['number']] = result
655 656 return result
656 657
657 658 def finishCommand(self, result):
658 659 """Finish currrent command."""
659 660
660 661 # The order of these commands is absolutely critical.
661 662 self.currentCommand.handleResult(result)
662 663 self.currentCommand.finished = True
663 664 self._flushQueue()
664 665 return result
665 666
666 667 def abortCommand(self, reason):
667 668 """Abort current command.
668 669
669 670 This eats the Failure but first passes it onto the Deferred that the
670 671 user has.
671 672
672 673 It also clear out the queue so subsequence commands don't run.
673 674 """
674 675
675 676 # The order of these 3 commands is absolutely critical. The currentCommand
676 677 # must first be marked as finished BEFORE the queue is cleared and before
677 678 # the current command is sent the failure.
678 679 # Also, the queue must be cleared BEFORE the current command is sent the Failure
679 680 # otherwise the errback chain could trigger new commands to be added to the
680 681 # queue before we clear it. We should clear ONLY the commands that were in
681 682 # the queue when the error occured.
682 683 self.currentCommand.finished = True
683 684 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
684 685 self.clear_queue(msg=s)
685 686 self.currentCommand.handleError(reason)
686 687
687 688 return None
688 689
689 690 #---------------------------------------------------------------------------
690 691 # IEngineCore methods
691 692 #---------------------------------------------------------------------------
692 693
693 694 @queue
694 695 def execute(self, lines):
695 696 pass
696
697
697 698 @queue
698 699 def push(self, namespace):
699 700 pass
700 701
701 702 @queue
702 703 def pull(self, keys):
703 704 pass
704 705
705 706 @queue
706 707 def push_function(self, namespace):
707 708 pass
708 709
709 710 @queue
710 711 def pull_function(self, keys):
711 712 pass
712 713
713 714 def get_result(self, i=None):
714 715 if i is None:
715 716 i = max(self.history.keys()+[None])
716 717
717 718 cmd = self.history.get(i, None)
718 719 # Uncomment this line to disable chaching of results
719 720 #cmd = None
720 721 if cmd is None:
721 722 return self.submitCommand(Command('get_result', i))
722 723 else:
723 724 return defer.succeed(cmd)
724 725
725 726 def reset(self):
726 727 self.clear_queue()
727 728 self.history = {} # reset the cache - I am not sure we should do this
728 729 return self.submitCommand(Command('reset'))
729 730
730 731 def kill(self):
731 732 self.clear_queue()
732 733 return self.submitCommand(Command('kill'))
733 734
734 735 @queue
735 736 def keys(self):
736 737 pass
737 738
738 739 #---------------------------------------------------------------------------
739 740 # IEngineSerialized methods
740 741 #---------------------------------------------------------------------------
741 742
742 743 @queue
743 744 def push_serialized(self, namespace):
744 745 pass
745 746
746 747 @queue
747 748 def pull_serialized(self, keys):
748 749 pass
749 750
750 751 #---------------------------------------------------------------------------
751 752 # IEngineProperties methods
752 753 #---------------------------------------------------------------------------
753 754
754 755 @queue
755 756 def set_properties(self, namespace):
756 757 pass
757 758
758 759 @queue
759 760 def get_properties(self, keys=None):
760 761 pass
761 762
762 763 @queue
763 764 def del_properties(self, keys):
764 765 pass
765 766
766 767 @queue
767 768 def has_properties(self, keys):
768 769 pass
769 770
770 771 @queue
771 772 def clear_properties(self):
772 773 pass
773 774
774 775 #---------------------------------------------------------------------------
775 776 # IQueuedEngine methods
776 777 #---------------------------------------------------------------------------
777 778
778 779 def clear_queue(self, msg=''):
779 780 """Clear the queue, but doesn't cancel the currently running commmand."""
780 781
781 782 for cmd in self.queued:
782 783 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
783 784 self.queued = []
784 785 return defer.succeed(None)
785 786
786 787 def queue_status(self):
787 788 if self.currentCommand is not None:
788 789 if self.currentCommand.finished:
789 790 pending = repr(None)
790 791 else:
791 792 pending = repr(self.currentCommand)
792 793 else:
793 794 pending = repr(None)
794 795 dikt = {'queue':map(repr,self.queued), 'pending':pending}
795 796 return defer.succeed(dikt)
796 797
797 798 def register_failure_observer(self, obs):
798 799 self.failureObservers.append(obs)
799 800
800 801 def unregister_failure_observer(self, obs):
801 802 self.failureObservers.remove(obs)
802 803
803 804
804 805 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
805 806 # IEngineQueued.
806 807 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
807 808
808 809
809 810 class Command(object):
810 811 """A command object that encapslates queued commands.
811 812
812 813 This class basically keeps track of a command that has been queued
813 814 in a QueuedEngine. It manages the deferreds and hold the method to be called
814 815 and the arguments to that method.
815 816 """
816 817
817 818
818 819 def __init__(self, remoteMethod, *args, **kwargs):
819 820 """Build a new Command object."""
820 821
821 822 self.remoteMethod = remoteMethod
822 823 self.args = args
823 824 self.kwargs = kwargs
824 825 self.finished = False
825 826
826 827 def setDeferred(self, d):
827 828 """Sets the deferred attribute of the Command."""
828 829
829 830 self.deferred = d
830 831
831 832 def __repr__(self):
832 833 if not self.args:
833 834 args = ''
834 835 else:
835 836 args = str(self.args)[1:-2] #cut off (...,)
836 837 for k,v in self.kwargs.iteritems():
837 838 if args:
838 839 args += ', '
839 840 args += '%s=%r' %(k,v)
840 841 return "%s(%s)" %(self.remoteMethod, args)
841 842
842 843 def handleResult(self, result):
843 844 """When the result is ready, relay it to self.deferred."""
844 845
845 846 self.deferred.callback(result)
846 847
847 848 def handleError(self, reason):
848 849 """When an error has occured, relay it to self.deferred."""
849 850
850 851 self.deferred.errback(reason)
851 852
852 853 class ThreadedEngineService(EngineService):
853 854 """An EngineService subclass that defers execute commands to a separate
854 855 thread.
855 856
856 857 ThreadedEngineService uses twisted.internet.threads.deferToThread to
857 858 defer execute requests to a separate thread. GUI frontends may want to
858 859 use ThreadedEngineService as the engine in an
859 860 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
860 861 block execution from blocking the GUI thread.
861 862 """
862 863
863 864 zi.implements(IEngineBase)
864 865
865 866 def __init__(self, shellClass=Interpreter, mpi=None):
866 867 EngineService.__init__(self, shellClass, mpi)
867 868
868 869 def wrapped_execute(self, msg, lines):
869 870 """Wrap self.shell.execute to add extra information to tracebacks"""
870 871
871 872 try:
872 873 result = self.shell.execute(lines)
873 874 except Exception,e:
874 875 # This gives the following:
875 876 # et=exception class
876 877 # ev=exception class instance
877 878 # tb=traceback object
878 879 et,ev,tb = sys.exc_info()
879 880 # This call adds attributes to the exception value
880 881 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
881 882 # Add another attribute
882 883
883 884 # Create a new exception with the new attributes
884 885 e = et(ev._ipython_traceback_text)
885 886 e._ipython_engine_info = msg
886 887
887 888 # Re-raise
888 889 raise e
889 890
890 891 return result
891 892
892 893
893 894 def execute(self, lines):
894 895 # Only import this if we are going to use this class
895 896 from twisted.internet import threads
896 897
897 898 msg = {'engineid':self.id,
898 899 'method':'execute',
899 900 'args':[lines]}
900 901
901 902 d = threads.deferToThread(self.wrapped_execute, msg, lines)
902 903 d.addCallback(self.addIDToResult)
903 904 return d
@@ -1,753 +1,754 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3 3
4 4 """Adapt the IPython ControllerServer to IMultiEngine.
5 5
6 6 This module provides classes that adapt a ControllerService to the
7 7 IMultiEngine interface. This interface is a basic interactive interface
8 8 for working with a set of engines where it is desired to have explicit
9 9 access to each registered engine.
10 10
11 11 The classes here are exposed to the network in files like:
12 12
13 13 * multienginevanilla.py
14 14 * multienginepb.py
15 15 """
16 16
17 17 __docformat__ = "restructuredtext en"
18 18
19 19 #-------------------------------------------------------------------------------
20 20 # Copyright (C) 2008 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-------------------------------------------------------------------------------
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Imports
28 28 #-------------------------------------------------------------------------------
29 29
30 30 from new import instancemethod
31 31 from types import FunctionType
32 32
33 33 from twisted.application import service
34 34 from twisted.internet import defer, reactor
35 35 from twisted.python import log, components, failure
36 36 from zope.interface import Interface, implements, Attribute
37 37
38 38 from IPython.tools import growl
39 39 from IPython.kernel.util import printer
40 40 from IPython.kernel.twistedutil import gatherBoth
41 41 from IPython.kernel import map as Map
42 42 from IPython.kernel import error
43 43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 44 from IPython.kernel.controllerservice import \
45 45 ControllerAdapterBase, \
46 46 ControllerService, \
47 47 IControllerBase
48 48
49 49
50 50 #-------------------------------------------------------------------------------
51 51 # Interfaces for the MultiEngine representation of a controller
52 52 #-------------------------------------------------------------------------------
53 53
54 54 class IEngineMultiplexer(Interface):
55 55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56 56
57 57 This class simply acts as a multiplexer of methods that are in the
58 58 various IEngines* interfaces. Thus the methods here are jut like those
59 59 in the IEngine* interfaces, but with an extra first argument, targets.
60 60 The targets argument can have the following forms:
61 61
62 62 * targets = 10 # Engines are indexed by ints
63 63 * targets = [0,1,2,3] # A list of ints
64 64 * targets = 'all' # A string to indicate all targets
65 65
66 66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 67 includes engines not being registered.
68 68
69 69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 70 with length equal to the number of targets. The elements of the list will
71 71 correspond to the return of the corresponding IEngine method.
72 72
73 73 Failures are aggressive, meaning that if an action fails for any target,
74 74 the overall action will fail immediately with that Failure.
75 75
76 76 :Parameters:
77 77 targets : int, list of ints, or 'all'
78 78 Engine ids the action will apply to.
79 79
80 80 :Returns: Deferred to a list of results for each engine.
81 81
82 82 :Exception:
83 83 InvalidEngineID
84 84 If the targets argument is bad or engines aren't registered.
85 85 NoEnginesRegistered
86 86 If there are no engines registered and targets='all'
87 87 """
88 88
89 89 #---------------------------------------------------------------------------
90 90 # Mutiplexed methods
91 91 #---------------------------------------------------------------------------
92 92
93 93 def execute(lines, targets='all'):
94 94 """Execute lines of Python code on targets.
95 95
96 96 See the class docstring for information about targets and possible
97 97 exceptions this method can raise.
98 98
99 99 :Parameters:
100 100 lines : str
101 101 String of python code to be executed on targets.
102 102 """
103 103
104 104 def push(namespace, targets='all'):
105 105 """Push dict namespace into the user's namespace on targets.
106 106
107 107 See the class docstring for information about targets and possible
108 108 exceptions this method can raise.
109 109
110 110 :Parameters:
111 111 namspace : dict
112 112 Dict of key value pairs to be put into the users namspace.
113 113 """
114 114
115 115 def pull(keys, targets='all'):
116 116 """Pull values out of the user's namespace on targets by keys.
117 117
118 118 See the class docstring for information about targets and possible
119 119 exceptions this method can raise.
120 120
121 121 :Parameters:
122 122 keys : tuple of strings
123 123 Sequence of keys to be pulled from user's namespace.
124 124 """
125 125
126 126 def push_function(namespace, targets='all'):
127 127 """"""
128 128
129 129 def pull_function(keys, targets='all'):
130 130 """"""
131 131
132 132 def get_result(i=None, targets='all'):
133 133 """Get the result for command i from targets.
134 134
135 135 See the class docstring for information about targets and possible
136 136 exceptions this method can raise.
137 137
138 138 :Parameters:
139 139 i : int or None
140 140 Command index or None to indicate most recent command.
141 141 """
142 142
143 143 def reset(targets='all'):
144 144 """Reset targets.
145 145
146 146 This clears the users namespace of the Engines, but won't cause
147 147 modules to be reloaded.
148 148 """
149 149
150 150 def keys(targets='all'):
151 151 """Get variable names defined in user's namespace on targets."""
152 152
153 153 def kill(controller=False, targets='all'):
154 154 """Kill the targets Engines and possibly the controller.
155 155
156 156 :Parameters:
157 157 controller : boolean
158 158 Should the controller be killed as well. If so all the
159 159 engines will be killed first no matter what targets is.
160 160 """
161 161
162 162 def push_serialized(namespace, targets='all'):
163 163 """Push a namespace of Serialized objects to targets.
164 164
165 165 :Parameters:
166 166 namespace : dict
167 167 A dict whose keys are the variable names and whose values
168 168 are serialized version of the objects.
169 169 """
170 170
171 171 def pull_serialized(keys, targets='all'):
172 172 """Pull Serialized objects by keys from targets.
173 173
174 174 :Parameters:
175 175 keys : tuple of strings
176 176 Sequence of variable names to pull as serialized objects.
177 177 """
178 178
179 179 def clear_queue(targets='all'):
180 180 """Clear the queue of pending command for targets."""
181 181
182 182 def queue_status(targets='all'):
183 183 """Get the status of the queue on the targets."""
184 184
185 185 def set_properties(properties, targets='all'):
186 186 """set properties by key and value"""
187 187
188 188 def get_properties(keys=None, targets='all'):
189 189 """get a list of properties by `keys`, if no keys specified, get all"""
190 190
191 191 def del_properties(keys, targets='all'):
192 192 """delete properties by `keys`"""
193 193
194 194 def has_properties(keys, targets='all'):
195 195 """get a list of bool values for whether `properties` has `keys`"""
196 196
197 197 def clear_properties(targets='all'):
198 198 """clear the properties dict"""
199 199
200 200
201 201 class IMultiEngine(IEngineMultiplexer):
202 202 """A controller that exposes an explicit interface to all of its engines.
203 203
204 204 This is the primary inteface for interactive usage.
205 205 """
206 206
207 207 def get_ids():
208 208 """Return list of currently registered ids.
209 209
210 210 :Returns: A Deferred to a list of registered engine ids.
211 211 """
212 212
213 213
214 214
215 215 #-------------------------------------------------------------------------------
216 216 # Implementation of the core MultiEngine classes
217 217 #-------------------------------------------------------------------------------
218 218
219 219 class MultiEngine(ControllerAdapterBase):
220 220 """The representation of a ControllerService as a IMultiEngine.
221 221
222 222 Although it is not implemented currently, this class would be where a
223 223 client/notification API is implemented. It could inherit from something
224 224 like results.NotifierParent and then use the notify method to send
225 225 notifications.
226 226 """
227 227
228 228 implements(IMultiEngine)
229 229
230 230 def __init(self, controller):
231 231 ControllerAdapterBase.__init__(self, controller)
232 232
233 233 #---------------------------------------------------------------------------
234 234 # Helper methods
235 235 #---------------------------------------------------------------------------
236 236
237 237 def engineList(self, targets):
238 238 """Parse the targets argument into a list of valid engine objects.
239 239
240 240 :Parameters:
241 241 targets : int, list of ints or 'all'
242 242 The targets argument to be parsed.
243 243
244 244 :Returns: List of engine objects.
245 245
246 246 :Exception:
247 247 InvalidEngineID
248 248 If targets is not valid or if an engine is not registered.
249 249 """
250 250 if isinstance(targets, int):
251 251 if targets not in self.engines.keys():
252 252 log.msg("Engine with id %i is not registered" % targets)
253 253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 254 else:
255 255 return [self.engines[targets]]
256 256 elif isinstance(targets, (list, tuple)):
257 257 for id in targets:
258 258 if id not in self.engines.keys():
259 259 log.msg("Engine with id %r is not registered" % id)
260 260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 261 return map(self.engines.get, targets)
262 262 elif targets == 'all':
263 263 eList = self.engines.values()
264 264 if len(eList) == 0:
265 265 msg = """There are no engines registered.
266 266 Check the logs in ~/.ipython/log if you think there should have been."""
267 267 raise error.NoEnginesRegistered(msg)
268 268 else:
269 269 return eList
270 270 else:
271 271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272 272
273 273 def _performOnEngines(self, methodName, *args, **kwargs):
274 274 """Calls a method on engines and returns deferred to list of results.
275 275
276 276 :Parameters:
277 277 methodName : str
278 278 Name of the method to be called.
279 279 targets : int, list of ints, 'all'
280 280 The targets argument to be parsed into a list of engine objects.
281 281 args
282 282 The positional keyword arguments to be passed to the engines.
283 283 kwargs
284 284 The keyword arguments passed to the method
285 285
286 286 :Returns: List of deferreds to the results on each engine
287 287
288 288 :Exception:
289 289 InvalidEngineID
290 290 If the targets argument is bad in any way.
291 291 AttributeError
292 292 If the method doesn't exist on one of the engines.
293 293 """
294 294 targets = kwargs.pop('targets')
295 295 log.msg("Performing %s on %r" % (methodName, targets))
296 296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 297 # This will and should raise if targets is not valid!
298 298 engines = self.engineList(targets)
299 299 dList = []
300 300 for e in engines:
301 301 meth = getattr(e, methodName, None)
302 302 if meth is not None:
303 303 dList.append(meth(*args, **kwargs))
304 304 else:
305 305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 306 return dList
307 307
308 308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 309 """Called _performOnEngines and wraps result/exception into deferred."""
310 310 try:
311 311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 313 return defer.fail(failure.Failure())
314 314 else:
315 315 # Having fireOnOneErrback is causing problems with the determinacy
316 316 # of the system. Basically, once a single engine has errbacked, this
317 317 # method returns. In some cases, this will cause client to submit
318 318 # another command. Because the previous command is still running
319 319 # on some engines, this command will be queued. When those commands
320 320 # then errback, the second command will raise QueueCleared. Ahhh!
321 321 d = gatherBoth(dList,
322 322 fireOnOneErrback=0,
323 323 consumeErrors=1,
324 324 logErrors=0)
325 325 d.addCallback(error.collect_exceptions, methodName)
326 326 return d
327 327
328 328 #---------------------------------------------------------------------------
329 329 # General IMultiEngine methods
330 330 #---------------------------------------------------------------------------
331 331
332 332 def get_ids(self):
333 333 return defer.succeed(self.engines.keys())
334 334
335 335 #---------------------------------------------------------------------------
336 336 # IEngineMultiplexer methods
337 337 #---------------------------------------------------------------------------
338 338
339 339 def execute(self, lines, targets='all'):
340 340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341 341
342 342 def push(self, ns, targets='all'):
343 343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344 344
345 345 def pull(self, keys, targets='all'):
346 346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347 347
348 348 def push_function(self, ns, targets='all'):
349 349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350 350
351 351 def pull_function(self, keys, targets='all'):
352 352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353 353
354 354 def get_result(self, i=None, targets='all'):
355 355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356 356
357 357 def reset(self, targets='all'):
358 358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359 359
360 360 def keys(self, targets='all'):
361 361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362 362
363 363 def kill(self, controller=False, targets='all'):
364 364 if controller:
365 365 targets = 'all'
366 366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 367 if controller:
368 368 log.msg("Killing controller")
369 369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 370 # Consume any weird stuff coming back
371 371 d.addBoth(lambda _: None)
372 372 return d
373 373
374 374 def push_serialized(self, namespace, targets='all'):
375 375 for k, v in namespace.iteritems():
376 376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 378 return d
379 379
380 380 def pull_serialized(self, keys, targets='all'):
381 381 try:
382 382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 384 return defer.fail(failure.Failure())
385 385 else:
386 386 for d in dList:
387 387 d.addCallback(self._logSizes)
388 388 d = gatherBoth(dList,
389 389 fireOnOneErrback=0,
390 390 consumeErrors=1,
391 391 logErrors=0)
392 392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 393 return d
394 394
395 395 def _logSizes(self, listOfSerialized):
396 396 if isinstance(listOfSerialized, (list, tuple)):
397 397 for s in listOfSerialized:
398 398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 399 else:
400 400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 401 return listOfSerialized
402 402
403 403 def clear_queue(self, targets='all'):
404 404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405 405
406 406 def queue_status(self, targets='all'):
407 407 log.msg("Getting queue status on %r" % targets)
408 408 try:
409 409 engines = self.engineList(targets)
410 410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 411 return defer.fail(failure.Failure())
412 412 else:
413 413 dList = []
414 414 for e in engines:
415 415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 416 d = gatherBoth(dList,
417 417 fireOnOneErrback=0,
418 418 consumeErrors=1,
419 419 logErrors=0)
420 420 d.addCallback(error.collect_exceptions, 'queue_status')
421 421 return d
422 422
423 423 def get_properties(self, keys=None, targets='all'):
424 424 log.msg("Getting properties on %r" % targets)
425 425 try:
426 426 engines = self.engineList(targets)
427 427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 428 return defer.fail(failure.Failure())
429 429 else:
430 430 dList = [e.get_properties(keys) for e in engines]
431 431 d = gatherBoth(dList,
432 432 fireOnOneErrback=0,
433 433 consumeErrors=1,
434 434 logErrors=0)
435 435 d.addCallback(error.collect_exceptions, 'get_properties')
436 436 return d
437 437
438 438 def set_properties(self, properties, targets='all'):
439 439 log.msg("Setting properties on %r" % targets)
440 440 try:
441 441 engines = self.engineList(targets)
442 442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 443 return defer.fail(failure.Failure())
444 444 else:
445 445 dList = [e.set_properties(properties) for e in engines]
446 446 d = gatherBoth(dList,
447 447 fireOnOneErrback=0,
448 448 consumeErrors=1,
449 449 logErrors=0)
450 450 d.addCallback(error.collect_exceptions, 'set_properties')
451 451 return d
452 452
453 453 def has_properties(self, keys, targets='all'):
454 454 log.msg("Checking properties on %r" % targets)
455 455 try:
456 456 engines = self.engineList(targets)
457 457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 458 return defer.fail(failure.Failure())
459 459 else:
460 460 dList = [e.has_properties(keys) for e in engines]
461 461 d = gatherBoth(dList,
462 462 fireOnOneErrback=0,
463 463 consumeErrors=1,
464 464 logErrors=0)
465 465 d.addCallback(error.collect_exceptions, 'has_properties')
466 466 return d
467 467
468 468 def del_properties(self, keys, targets='all'):
469 469 log.msg("Deleting properties on %r" % targets)
470 470 try:
471 471 engines = self.engineList(targets)
472 472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 473 return defer.fail(failure.Failure())
474 474 else:
475 475 dList = [e.del_properties(keys) for e in engines]
476 476 d = gatherBoth(dList,
477 477 fireOnOneErrback=0,
478 478 consumeErrors=1,
479 479 logErrors=0)
480 480 d.addCallback(error.collect_exceptions, 'del_properties')
481 481 return d
482 482
483 483 def clear_properties(self, targets='all'):
484 484 log.msg("Clearing properties on %r" % targets)
485 485 try:
486 486 engines = self.engineList(targets)
487 487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 488 return defer.fail(failure.Failure())
489 489 else:
490 490 dList = [e.clear_properties() for e in engines]
491 491 d = gatherBoth(dList,
492 492 fireOnOneErrback=0,
493 493 consumeErrors=1,
494 494 logErrors=0)
495 495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 496 return d
497 497
498 498
499 499 components.registerAdapter(MultiEngine,
500 500 IControllerBase,
501 501 IMultiEngine)
502 502
503 503
504 504 #-------------------------------------------------------------------------------
505 505 # Interfaces for the Synchronous MultiEngine
506 506 #-------------------------------------------------------------------------------
507 507
508 508 class ISynchronousEngineMultiplexer(Interface):
509 509 pass
510 510
511 511
512 512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 513 """Synchronous, two-phase version of IMultiEngine.
514 514
515 515 Methods in this interface are identical to those of IMultiEngine, but they
516 516 take one additional argument:
517 517
518 518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519 519
520 520 :Parameters:
521 521 block : boolean
522 522 Should the method return a deferred to a deferredID or the
523 523 actual result. If block=False a deferred to a deferredID is
524 524 returned and the user must call `get_pending_deferred` at a later
525 525 point. If block=True, a deferred to the actual result comes back.
526 526 """
527 527 def get_pending_deferred(deferredID, block=True):
528 528 """"""
529 529
530 530 def clear_pending_deferreds():
531 531 """"""
532 532
533 533
534 534 #-------------------------------------------------------------------------------
535 535 # Implementation of the Synchronous MultiEngine
536 536 #-------------------------------------------------------------------------------
537 537
538 538 class SynchronousMultiEngine(PendingDeferredManager):
539 539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540 540
541 541 Warning, this class uses a decorator that currently uses **kwargs.
542 542 Because of this block must be passed as a kwarg, not positionally.
543 543 """
544 544
545 545 implements(ISynchronousMultiEngine)
546 546
547 547 def __init__(self, multiengine):
548 548 self.multiengine = multiengine
549 549 PendingDeferredManager.__init__(self)
550 550
551 551 #---------------------------------------------------------------------------
552 552 # Decorated pending deferred methods
553 553 #---------------------------------------------------------------------------
554 554
555 @profile
555 556 @two_phase
556 557 def execute(self, lines, targets='all'):
557 558 d = self.multiengine.execute(lines, targets)
558 559 return d
559 560
560 561 @two_phase
561 562 def push(self, namespace, targets='all'):
562 563 return self.multiengine.push(namespace, targets)
563 564
564 565 @two_phase
565 566 def pull(self, keys, targets='all'):
566 567 d = self.multiengine.pull(keys, targets)
567 568 return d
568 569
569 570 @two_phase
570 571 def push_function(self, namespace, targets='all'):
571 572 return self.multiengine.push_function(namespace, targets)
572 573
573 574 @two_phase
574 575 def pull_function(self, keys, targets='all'):
575 576 d = self.multiengine.pull_function(keys, targets)
576 577 return d
577 578
578 579 @two_phase
579 580 def get_result(self, i=None, targets='all'):
580 581 return self.multiengine.get_result(i, targets='all')
581 582
582 583 @two_phase
583 584 def reset(self, targets='all'):
584 585 return self.multiengine.reset(targets)
585 586
586 587 @two_phase
587 588 def keys(self, targets='all'):
588 589 return self.multiengine.keys(targets)
589 590
590 591 @two_phase
591 592 def kill(self, controller=False, targets='all'):
592 593 return self.multiengine.kill(controller, targets)
593 594
594 595 @two_phase
595 596 def push_serialized(self, namespace, targets='all'):
596 597 return self.multiengine.push_serialized(namespace, targets)
597 598
598 599 @two_phase
599 600 def pull_serialized(self, keys, targets='all'):
600 601 return self.multiengine.pull_serialized(keys, targets)
601 602
602 603 @two_phase
603 604 def clear_queue(self, targets='all'):
604 605 return self.multiengine.clear_queue(targets)
605 606
606 607 @two_phase
607 608 def queue_status(self, targets='all'):
608 609 return self.multiengine.queue_status(targets)
609 610
610 611 @two_phase
611 612 def set_properties(self, properties, targets='all'):
612 613 return self.multiengine.set_properties(properties, targets)
613 614
614 615 @two_phase
615 616 def get_properties(self, keys=None, targets='all'):
616 617 return self.multiengine.get_properties(keys, targets)
617 618
618 619 @two_phase
619 620 def has_properties(self, keys, targets='all'):
620 621 return self.multiengine.has_properties(keys, targets)
621 622
622 623 @two_phase
623 624 def del_properties(self, keys, targets='all'):
624 625 return self.multiengine.del_properties(keys, targets)
625 626
626 627 @two_phase
627 628 def clear_properties(self, targets='all'):
628 629 return self.multiengine.clear_properties(targets)
629 630
630 631 #---------------------------------------------------------------------------
631 632 # IMultiEngine methods
632 633 #---------------------------------------------------------------------------
633 634
634 635 def get_ids(self):
635 636 """Return a list of registered engine ids.
636 637
637 638 Never use the two phase block/non-block stuff for this.
638 639 """
639 640 return self.multiengine.get_ids()
640 641
641 642
642 643 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643 644
644 645
645 646 #-------------------------------------------------------------------------------
646 647 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 648 #-------------------------------------------------------------------------------
648 649
649 650 #-------------------------------------------------------------------------------
650 651 # IMultiEngineCoordinator
651 652 #-------------------------------------------------------------------------------
652 653
653 654 class IMultiEngineCoordinator(Interface):
654 655 """Methods that work on multiple engines explicitly."""
655 656
656 657 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 658 """Partition and distribute a sequence to targets."""
658 659
659 660 def gather(key, dist='b', targets='all'):
660 661 """Gather object key from targets."""
661 662
662 663 def raw_map(func, seqs, dist='b', targets='all'):
663 664 """
664 665 A parallelized version of Python's builtin `map` function.
665 666
666 667 This has a slightly different syntax than the builtin `map`.
667 668 This is needed because we need to have keyword arguments and thus
668 669 can't use *args to capture all the sequences. Instead, they must
669 670 be passed in a list or tuple.
670 671
671 672 The equivalence is:
672 673
673 674 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674 675
675 676 Most users will want to use parallel functions or the `mapper`
676 677 and `map` methods for an API that follows that of the builtin
677 678 `map`.
678 679 """
679 680
680 681
681 682 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 683 """Methods that work on multiple engines explicitly."""
683 684
684 685 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 686 """Partition and distribute a sequence to targets."""
686 687
687 688 def gather(key, dist='b', targets='all', block=True):
688 689 """Gather object key from targets"""
689 690
690 691 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 692 """
692 693 A parallelized version of Python's builtin map.
693 694
694 695 This has a slightly different syntax than the builtin `map`.
695 696 This is needed because we need to have keyword arguments and thus
696 697 can't use *args to capture all the sequences. Instead, they must
697 698 be passed in a list or tuple.
698 699
699 700 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700 701
701 702 Most users will want to use parallel functions or the `mapper`
702 703 and `map` methods for an API that follows that of the builtin
703 704 `map`.
704 705 """
705 706
706 707
707 708 #-------------------------------------------------------------------------------
708 709 # IMultiEngineExtras
709 710 #-------------------------------------------------------------------------------
710 711
711 712 class IMultiEngineExtras(Interface):
712 713
713 714 def zip_pull(targets, keys):
714 715 """
715 716 Pull, but return results in a different format from `pull`.
716 717
717 718 This method basically returns zip(pull(targets, *keys)), with a few
718 719 edge cases handled differently. Users of chainsaw will find this format
719 720 familiar.
720 721 """
721 722
722 723 def run(targets, fname):
723 724 """Run a .py file on targets."""
724 725
725 726
726 727 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 728 def zip_pull(targets, keys, block=True):
728 729 """
729 730 Pull, but return results in a different format from `pull`.
730 731
731 732 This method basically returns zip(pull(targets, *keys)), with a few
732 733 edge cases handled differently. Users of chainsaw will find this format
733 734 familiar.
734 735 """
735 736
736 737 def run(targets, fname, block=True):
737 738 """Run a .py file on targets."""
738 739
739 740 #-------------------------------------------------------------------------------
740 741 # The full MultiEngine interface
741 742 #-------------------------------------------------------------------------------
742 743
743 744 class IFullMultiEngine(IMultiEngine,
744 745 IMultiEngineCoordinator,
745 746 IMultiEngineExtras):
746 747 pass
747 748
748 749
749 750 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 751 ISynchronousMultiEngineCoordinator,
751 752 ISynchronousMultiEngineExtras):
752 753 pass
753 754
@@ -1,757 +1,757 b''
1 1 # encoding: utf-8
2 2
3 3 """
4 4 Expose the multiengine controller over the Foolscap network protocol.
5 5 """
6 6
7 7 __docformat__ = "restructuredtext en"
8 8
9 9 #-------------------------------------------------------------------------------
10 10 # Copyright (C) 2008 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-------------------------------------------------------------------------------
15 15
16 16 #-------------------------------------------------------------------------------
17 17 # Imports
18 18 #-------------------------------------------------------------------------------
19 19
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22
23 23 from zope.interface import Interface, implements
24 24 from twisted.internet import defer
25 25 from twisted.python import components, failure, log
26 26
27 27 from foolscap import Referenceable
28 28
29 29 from IPython.kernel import error
30 30 from IPython.kernel.util import printer
31 31 from IPython.kernel import map as Map
32 32 from IPython.kernel.parallelfunction import ParallelFunction
33 33 from IPython.kernel.mapper import (
34 34 MultiEngineMapper,
35 35 IMultiEngineMapperFactory,
36 36 IMapper
37 37 )
38 38 from IPython.kernel.twistedutil import gatherBoth
39 39 from IPython.kernel.multiengine import (MultiEngine,
40 40 IMultiEngine,
41 41 IFullSynchronousMultiEngine,
42 42 ISynchronousMultiEngine)
43 43 from IPython.kernel.multiengineclient import wrapResultList
44 44 from IPython.kernel.pendingdeferred import PendingDeferredManager
45 45 from IPython.kernel.pickleutil import (can, canDict,
46 46 canSequence, uncan, uncanDict, uncanSequence)
47 47
48 48 from IPython.kernel.clientinterfaces import (
49 49 IFCClientInterfaceProvider,
50 50 IBlockingClientAdaptor
51 51 )
52 52
53 53 # Needed to access the true globals from __main__.__dict__
54 54 import __main__
55 55
56 56 #-------------------------------------------------------------------------------
57 57 # The Controller side of things
58 58 #-------------------------------------------------------------------------------
59 59
60 60 def packageResult(wrappedMethod):
61 61
62 62 def wrappedPackageResult(self, *args, **kwargs):
63 63 d = wrappedMethod(self, *args, **kwargs)
64 64 d.addCallback(self.packageSuccess)
65 65 d.addErrback(self.packageFailure)
66 66 return d
67 67 return wrappedPackageResult
68 68
69 69
70 70 class IFCSynchronousMultiEngine(Interface):
71 71 """Foolscap interface to `ISynchronousMultiEngine`.
72 72
73 73 The methods in this interface are similar to those of
74 74 `ISynchronousMultiEngine`, but their arguments and return values are pickled
75 75 if they are not already simple Python types that can be send over XML-RPC.
76 76
77 77 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
78 78 documentation about the methods.
79 79
80 80 Most methods in this interface act like the `ISynchronousMultiEngine`
81 81 versions and can be called in blocking or non-blocking mode.
82 82 """
83 83 pass
84 84
85 85
86 86 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
87 87 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
88 88 """
89 89
90 90 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
91 91
92 92 addSlash = True
93 93
94 94 def __init__(self, multiengine):
95 95 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
96 96 # it. This allow this class to do two adaptation steps.
97 97 self.smultiengine = ISynchronousMultiEngine(multiengine)
98 98 self._deferredIDCallbacks = {}
99 99
100 100 #---------------------------------------------------------------------------
101 101 # Non interface methods
102 102 #---------------------------------------------------------------------------
103 103
104 104 def packageFailure(self, f):
105 105 f.cleanFailure()
106 106 return self.packageSuccess(f)
107 107
108 108 def packageSuccess(self, obj):
109 109 serial = pickle.dumps(obj, 2)
110 110 return serial
111 111
112 112 #---------------------------------------------------------------------------
113 113 # Things related to PendingDeferredManager
114 114 #---------------------------------------------------------------------------
115 115
116 116 @packageResult
117 117 def remote_get_pending_deferred(self, deferredID, block):
118 118 d = self.smultiengine.get_pending_deferred(deferredID, block)
119 119 try:
120 120 callback = self._deferredIDCallbacks.pop(deferredID)
121 121 except KeyError:
122 122 callback = None
123 123 if callback is not None:
124 124 d.addCallback(callback[0], *callback[1], **callback[2])
125 125 return d
126 126
127 127 @packageResult
128 128 def remote_clear_pending_deferreds(self):
129 129 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
130 130
131 131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
132 132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
133 133 return did
134
134
135 135 #---------------------------------------------------------------------------
136 136 # IEngineMultiplexer related methods
137 137 #---------------------------------------------------------------------------
138 138
139 139 @packageResult
140 140 def remote_execute(self, lines, targets, block):
141 141 return self.smultiengine.execute(lines, targets=targets, block=block)
142 142
143 143 @packageResult
144 144 def remote_push(self, binaryNS, targets, block):
145 145 try:
146 146 namespace = pickle.loads(binaryNS)
147 147 except:
148 148 d = defer.fail(failure.Failure())
149 149 else:
150 150 d = self.smultiengine.push(namespace, targets=targets, block=block)
151 151 return d
152 152
153 153 @packageResult
154 154 def remote_pull(self, keys, targets, block):
155 155 d = self.smultiengine.pull(keys, targets=targets, block=block)
156 156 return d
157 157
158 158 @packageResult
159 159 def remote_push_function(self, binaryNS, targets, block):
160 160 try:
161 161 namespace = pickle.loads(binaryNS)
162 162 except:
163 163 d = defer.fail(failure.Failure())
164 164 else:
165 165 namespace = uncanDict(namespace)
166 166 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
167 167 return d
168 168
169 169 def _canMultipleKeys(self, result):
170 170 return [canSequence(r) for r in result]
171 171
172 172 @packageResult
173 173 def remote_pull_function(self, keys, targets, block):
174 174 def can_functions(r, keys):
175 175 if len(keys)==1 or isinstance(keys, str):
176 176 result = canSequence(r)
177 177 elif len(keys)>1:
178 178 result = [canSequence(s) for s in r]
179 179 return result
180 180 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
181 181 if block:
182 182 d.addCallback(can_functions, keys)
183 183 else:
184 184 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
185 185 return d
186 186
187 187 @packageResult
188 188 def remote_push_serialized(self, binaryNS, targets, block):
189 189 try:
190 190 namespace = pickle.loads(binaryNS)
191 191 except:
192 192 d = defer.fail(failure.Failure())
193 193 else:
194 194 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
195 195 return d
196 196
197 197 @packageResult
198 198 def remote_pull_serialized(self, keys, targets, block):
199 199 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
200 200 return d
201 201
202 202 @packageResult
203 203 def remote_get_result(self, i, targets, block):
204 204 if i == 'None':
205 205 i = None
206 206 return self.smultiengine.get_result(i, targets=targets, block=block)
207 207
208 208 @packageResult
209 209 def remote_reset(self, targets, block):
210 210 return self.smultiengine.reset(targets=targets, block=block)
211 211
212 212 @packageResult
213 213 def remote_keys(self, targets, block):
214 214 return self.smultiengine.keys(targets=targets, block=block)
215 215
216 216 @packageResult
217 217 def remote_kill(self, controller, targets, block):
218 218 return self.smultiengine.kill(controller, targets=targets, block=block)
219 219
220 220 @packageResult
221 221 def remote_clear_queue(self, targets, block):
222 222 return self.smultiengine.clear_queue(targets=targets, block=block)
223 223
224 224 @packageResult
225 225 def remote_queue_status(self, targets, block):
226 226 return self.smultiengine.queue_status(targets=targets, block=block)
227 227
228 228 @packageResult
229 229 def remote_set_properties(self, binaryNS, targets, block):
230 230 try:
231 231 ns = pickle.loads(binaryNS)
232 232 except:
233 233 d = defer.fail(failure.Failure())
234 234 else:
235 235 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
236 236 return d
237 237
238 238 @packageResult
239 239 def remote_get_properties(self, keys, targets, block):
240 240 if keys=='None':
241 241 keys=None
242 242 return self.smultiengine.get_properties(keys, targets=targets, block=block)
243 243
244 244 @packageResult
245 245 def remote_has_properties(self, keys, targets, block):
246 246 return self.smultiengine.has_properties(keys, targets=targets, block=block)
247 247
248 248 @packageResult
249 249 def remote_del_properties(self, keys, targets, block):
250 250 return self.smultiengine.del_properties(keys, targets=targets, block=block)
251 251
252 252 @packageResult
253 253 def remote_clear_properties(self, targets, block):
254 254 return self.smultiengine.clear_properties(targets=targets, block=block)
255 255
256 256 #---------------------------------------------------------------------------
257 257 # IMultiEngine related methods
258 258 #---------------------------------------------------------------------------
259 259
260 260 def remote_get_ids(self):
261 261 """Get the ids of the registered engines.
262 262
263 263 This method always blocks.
264 264 """
265 265 return self.smultiengine.get_ids()
266 266
267 267 #---------------------------------------------------------------------------
268 268 # IFCClientInterfaceProvider related methods
269 269 #---------------------------------------------------------------------------
270 270
271 271 def remote_get_client_name(self):
272 272 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
273 273
274 274
275 275 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
276 276 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
277 277 # two phase adaptation.
278 278 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
279 279 IMultiEngine, IFCSynchronousMultiEngine)
280 280
281 281
282 282 #-------------------------------------------------------------------------------
283 283 # The Client side of things
284 284 #-------------------------------------------------------------------------------
285 285
286 286
287 287 class FCFullSynchronousMultiEngineClient(object):
288 288
289 289 implements(
290 290 IFullSynchronousMultiEngine,
291 291 IBlockingClientAdaptor,
292 292 IMultiEngineMapperFactory,
293 293 IMapper
294 294 )
295 295
296 296 def __init__(self, remote_reference):
297 297 self.remote_reference = remote_reference
298 298 self._deferredIDCallbacks = {}
299 299 # This class manages some pending deferreds through this instance. This
300 300 # is required for methods like gather/scatter as it enables us to
301 301 # create our own pending deferreds for composite operations.
302 302 self.pdm = PendingDeferredManager()
303 303
304 304 #---------------------------------------------------------------------------
305 305 # Non interface methods
306 306 #---------------------------------------------------------------------------
307 307
308 308 def unpackage(self, r):
309 309 return pickle.loads(r)
310 310
311 311 #---------------------------------------------------------------------------
312 312 # Things related to PendingDeferredManager
313 313 #---------------------------------------------------------------------------
314 314
315 315 def get_pending_deferred(self, deferredID, block=True):
316 316
317 317 # Because we are managing some pending deferreds locally (through
318 318 # self.pdm) and some remotely (on the controller), we first try the
319 319 # local one and then the remote one.
320 320 if self.pdm.quick_has_id(deferredID):
321 321 d = self.pdm.get_pending_deferred(deferredID, block)
322 322 return d
323 323 else:
324 324 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
325 325 d.addCallback(self.unpackage)
326 326 try:
327 327 callback = self._deferredIDCallbacks.pop(deferredID)
328 328 except KeyError:
329 329 callback = None
330 330 if callback is not None:
331 331 d.addCallback(callback[0], *callback[1], **callback[2])
332 332 return d
333 333
334 334 def clear_pending_deferreds(self):
335 335
336 336 # This clear both the local (self.pdm) and remote pending deferreds
337 337 self.pdm.clear_pending_deferreds()
338 338 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
339 339 d2.addCallback(self.unpackage)
340 340 return d2
341 341
342 342 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
343 343 self._deferredIDCallbacks[did] = (callback, args, kwargs)
344 344 return did
345 345
346 346 #---------------------------------------------------------------------------
347 347 # IEngineMultiplexer related methods
348 348 #---------------------------------------------------------------------------
349
349
350 350 def execute(self, lines, targets='all', block=True):
351 351 d = self.remote_reference.callRemote('execute', lines, targets, block)
352 352 d.addCallback(self.unpackage)
353 353 return d
354 354
355 355 def push(self, namespace, targets='all', block=True):
356 356 serial = pickle.dumps(namespace, 2)
357 357 d = self.remote_reference.callRemote('push', serial, targets, block)
358 358 d.addCallback(self.unpackage)
359 359 return d
360 360
361 361 def pull(self, keys, targets='all', block=True):
362 362 d = self.remote_reference.callRemote('pull', keys, targets, block)
363 363 d.addCallback(self.unpackage)
364 364 return d
365 365
366 366 def push_function(self, namespace, targets='all', block=True):
367 367 cannedNamespace = canDict(namespace)
368 368 serial = pickle.dumps(cannedNamespace, 2)
369 369 d = self.remote_reference.callRemote('push_function', serial, targets, block)
370 370 d.addCallback(self.unpackage)
371 371 return d
372 372
373 373 def pull_function(self, keys, targets='all', block=True):
374 374 def uncan_functions(r, keys):
375 375 if len(keys)==1 or isinstance(keys, str):
376 376 return uncanSequence(r)
377 377 elif len(keys)>1:
378 378 return [uncanSequence(s) for s in r]
379 379 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
380 380 if block:
381 381 d.addCallback(self.unpackage)
382 382 d.addCallback(uncan_functions, keys)
383 383 else:
384 384 d.addCallback(self.unpackage)
385 385 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
386 386 return d
387 387
388 388 def push_serialized(self, namespace, targets='all', block=True):
389 389 cannedNamespace = canDict(namespace)
390 390 serial = pickle.dumps(cannedNamespace, 2)
391 391 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
392 392 d.addCallback(self.unpackage)
393 393 return d
394 394
395 395 def pull_serialized(self, keys, targets='all', block=True):
396 396 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
397 397 d.addCallback(self.unpackage)
398 398 return d
399 399
400 400 def get_result(self, i=None, targets='all', block=True):
401 401 if i is None: # This is because None cannot be marshalled by xml-rpc
402 402 i = 'None'
403 403 d = self.remote_reference.callRemote('get_result', i, targets, block)
404 404 d.addCallback(self.unpackage)
405 405 return d
406 406
407 407 def reset(self, targets='all', block=True):
408 408 d = self.remote_reference.callRemote('reset', targets, block)
409 409 d.addCallback(self.unpackage)
410 410 return d
411 411
412 412 def keys(self, targets='all', block=True):
413 413 d = self.remote_reference.callRemote('keys', targets, block)
414 414 d.addCallback(self.unpackage)
415 415 return d
416 416
417 417 def kill(self, controller=False, targets='all', block=True):
418 418 d = self.remote_reference.callRemote('kill', controller, targets, block)
419 419 d.addCallback(self.unpackage)
420 420 return d
421 421
422 422 def clear_queue(self, targets='all', block=True):
423 423 d = self.remote_reference.callRemote('clear_queue', targets, block)
424 424 d.addCallback(self.unpackage)
425 425 return d
426 426
427 427 def queue_status(self, targets='all', block=True):
428 428 d = self.remote_reference.callRemote('queue_status', targets, block)
429 429 d.addCallback(self.unpackage)
430 430 return d
431 431
432 432 def set_properties(self, properties, targets='all', block=True):
433 433 serial = pickle.dumps(properties, 2)
434 434 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
435 435 d.addCallback(self.unpackage)
436 436 return d
437 437
438 438 def get_properties(self, keys=None, targets='all', block=True):
439 439 if keys==None:
440 440 keys='None'
441 441 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
442 442 d.addCallback(self.unpackage)
443 443 return d
444 444
445 445 def has_properties(self, keys, targets='all', block=True):
446 446 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
447 447 d.addCallback(self.unpackage)
448 448 return d
449 449
450 450 def del_properties(self, keys, targets='all', block=True):
451 451 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
452 452 d.addCallback(self.unpackage)
453 453 return d
454 454
455 455 def clear_properties(self, targets='all', block=True):
456 456 d = self.remote_reference.callRemote('clear_properties', targets, block)
457 457 d.addCallback(self.unpackage)
458 458 return d
459 459
460 460 #---------------------------------------------------------------------------
461 461 # IMultiEngine related methods
462 462 #---------------------------------------------------------------------------
463 463
464 464 def get_ids(self):
465 465 d = self.remote_reference.callRemote('get_ids')
466 466 return d
467 467
468 468 #---------------------------------------------------------------------------
469 469 # ISynchronousMultiEngineCoordinator related methods
470 470 #---------------------------------------------------------------------------
471 471
472 472 def _process_targets(self, targets):
473 473 def create_targets(ids):
474 474 if isinstance(targets, int):
475 475 engines = [targets]
476 476 elif targets=='all':
477 477 engines = ids
478 478 elif isinstance(targets, (list, tuple)):
479 479 engines = targets
480 480 for t in engines:
481 481 if not t in ids:
482 482 raise error.InvalidEngineID("engine with id %r does not exist"%t)
483 483 return engines
484 484
485 485 d = self.get_ids()
486 486 d.addCallback(create_targets)
487 487 return d
488 488
489 489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
490 490
491 491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
492 492 # This enables us to collect a bunch fo deferred ids and make a secondary
493 493 # deferred id that corresponds to the entire group. This logic is extremely
494 494 # difficult to get right though.
495 495 def do_scatter(engines):
496 496 nEngines = len(engines)
497 497 mapClass = Map.dists[dist]
498 498 mapObject = mapClass()
499 499 d_list = []
500 500 # Loop through and push to each engine in non-blocking mode.
501 501 # This returns a set of deferreds to deferred_ids
502 502 for index, engineid in enumerate(engines):
503 503 partition = mapObject.getPartition(seq, index, nEngines)
504 504 if flatten and len(partition) == 1:
505 505 d = self.push({key: partition[0]}, targets=engineid, block=False)
506 506 else:
507 507 d = self.push({key: partition}, targets=engineid, block=False)
508 508 d_list.append(d)
509 509 # Collect the deferred to deferred_ids
510 510 d = gatherBoth(d_list,
511 511 fireOnOneErrback=0,
512 512 consumeErrors=1,
513 513 logErrors=0)
514 514 # Now d has a list of deferred_ids or Failures coming
515 515 d.addCallback(error.collect_exceptions, 'scatter')
516 516 def process_did_list(did_list):
517 517 """Turn a list of deferred_ids into a final result or failure."""
518 518 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
519 519 final_d = gatherBoth(new_d_list,
520 520 fireOnOneErrback=0,
521 521 consumeErrors=1,
522 522 logErrors=0)
523 523 final_d.addCallback(error.collect_exceptions, 'scatter')
524 524 final_d.addCallback(lambda lop: [i[0] for i in lop])
525 525 return final_d
526 526 # Now, depending on block, we need to handle the list deferred_ids
527 527 # coming down the pipe diferently.
528 528 if block:
529 529 # If we are blocking register a callback that will transform the
530 530 # list of deferred_ids into the final result.
531 531 d.addCallback(process_did_list)
532 532 return d
533 533 else:
534 534 # Here we are going to use a _local_ PendingDeferredManager.
535 535 deferred_id = self.pdm.get_deferred_id()
536 536 # This is the deferred we will return to the user that will fire
537 537 # with the local deferred_id AFTER we have received the list of
538 538 # primary deferred_ids
539 539 d_to_return = defer.Deferred()
540 540 def do_it(did_list):
541 541 """Produce a deferred to the final result, but first fire the
542 542 deferred we will return to the user that has the local
543 543 deferred id."""
544 544 d_to_return.callback(deferred_id)
545 545 return process_did_list(did_list)
546 546 d.addCallback(do_it)
547 547 # Now save the deferred to the final result
548 548 self.pdm.save_pending_deferred(d, deferred_id)
549 549 return d_to_return
550 550
551 551 d = self._process_targets(targets)
552 552 d.addCallback(do_scatter)
553 553 return d
554 554
555 555 def gather(self, key, dist='b', targets='all', block=True):
556 556
557 557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
558 558 # This enables us to collect a bunch fo deferred ids and make a secondary
559 559 # deferred id that corresponds to the entire group. This logic is extremely
560 560 # difficult to get right though.
561 561 def do_gather(engines):
562 562 nEngines = len(engines)
563 563 mapClass = Map.dists[dist]
564 564 mapObject = mapClass()
565 565 d_list = []
566 566 # Loop through and push to each engine in non-blocking mode.
567 567 # This returns a set of deferreds to deferred_ids
568 568 for index, engineid in enumerate(engines):
569 569 d = self.pull(key, targets=engineid, block=False)
570 570 d_list.append(d)
571 571 # Collect the deferred to deferred_ids
572 572 d = gatherBoth(d_list,
573 573 fireOnOneErrback=0,
574 574 consumeErrors=1,
575 575 logErrors=0)
576 576 # Now d has a list of deferred_ids or Failures coming
577 577 d.addCallback(error.collect_exceptions, 'scatter')
578 578 def process_did_list(did_list):
579 579 """Turn a list of deferred_ids into a final result or failure."""
580 580 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
581 581 final_d = gatherBoth(new_d_list,
582 582 fireOnOneErrback=0,
583 583 consumeErrors=1,
584 584 logErrors=0)
585 585 final_d.addCallback(error.collect_exceptions, 'gather')
586 586 final_d.addCallback(lambda lop: [i[0] for i in lop])
587 587 final_d.addCallback(mapObject.joinPartitions)
588 588 return final_d
589 589 # Now, depending on block, we need to handle the list deferred_ids
590 590 # coming down the pipe diferently.
591 591 if block:
592 592 # If we are blocking register a callback that will transform the
593 593 # list of deferred_ids into the final result.
594 594 d.addCallback(process_did_list)
595 595 return d
596 596 else:
597 597 # Here we are going to use a _local_ PendingDeferredManager.
598 598 deferred_id = self.pdm.get_deferred_id()
599 599 # This is the deferred we will return to the user that will fire
600 600 # with the local deferred_id AFTER we have received the list of
601 601 # primary deferred_ids
602 602 d_to_return = defer.Deferred()
603 603 def do_it(did_list):
604 604 """Produce a deferred to the final result, but first fire the
605 605 deferred we will return to the user that has the local
606 606 deferred id."""
607 607 d_to_return.callback(deferred_id)
608 608 return process_did_list(did_list)
609 609 d.addCallback(do_it)
610 610 # Now save the deferred to the final result
611 611 self.pdm.save_pending_deferred(d, deferred_id)
612 612 return d_to_return
613 613
614 614 d = self._process_targets(targets)
615 615 d.addCallback(do_gather)
616 616 return d
617 617
618 618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
619 619 """
620 620 A parallelized version of Python's builtin map.
621 621
622 622 This has a slightly different syntax than the builtin `map`.
623 623 This is needed because we need to have keyword arguments and thus
624 624 can't use *args to capture all the sequences. Instead, they must
625 625 be passed in a list or tuple.
626 626
627 627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628 628
629 629 Most users will want to use parallel functions or the `mapper`
630 630 and `map` methods for an API that follows that of the builtin
631 631 `map`.
632 632 """
633 633 if not isinstance(sequences, (list, tuple)):
634 634 raise TypeError('sequences must be a list or tuple')
635 635 max_len = max(len(s) for s in sequences)
636 636 for s in sequences:
637 637 if len(s)!=max_len:
638 638 raise ValueError('all sequences must have equal length')
639 639 if isinstance(func, FunctionType):
640 640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
641 641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
642 642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
643 643 elif isinstance(func, str):
644 644 d = defer.succeed(None)
645 645 sourceToRun = \
646 646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
647 647 else:
648 648 raise TypeError("func must be a function or str")
649 649
650 650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
651 651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
652 652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
653 653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
654 654 return d
655 655
656 656 def map(self, func, *sequences):
657 657 """
658 658 A parallel version of Python's builtin `map` function.
659 659
660 660 This method applies a function to sequences of arguments. It
661 661 follows the same syntax as the builtin `map`.
662 662
663 663 This method creates a mapper objects by calling `self.mapper` with
664 664 no arguments and then uses that mapper to do the mapping. See
665 665 the documentation of `mapper` for more details.
666 666 """
667 667 return self.mapper().map(func, *sequences)
668 668
669 669 def mapper(self, dist='b', targets='all', block=True):
670 670 """
671 671 Create a mapper object that has a `map` method.
672 672
673 673 This method returns an object that implements the `IMapper`
674 674 interface. This method is a factory that is used to control how
675 675 the map happens.
676 676
677 677 :Parameters:
678 678 dist : str
679 679 What decomposition to use, 'b' is the only one supported
680 680 currently
681 681 targets : str, int, sequence of ints
682 682 Which engines to use for the map
683 683 block : boolean
684 684 Should calls to `map` block or not
685 685 """
686 686 return MultiEngineMapper(self, dist, targets, block)
687 687
688 688 def parallel(self, dist='b', targets='all', block=True):
689 689 """
690 690 A decorator that turns a function into a parallel function.
691 691
692 692 This can be used as:
693 693
694 694 @parallel()
695 695 def f(x, y)
696 696 ...
697 697
698 698 f(range(10), range(10))
699 699
700 700 This causes f(0,0), f(1,1), ... to be called in parallel.
701 701
702 702 :Parameters:
703 703 dist : str
704 704 What decomposition to use, 'b' is the only one supported
705 705 currently
706 706 targets : str, int, sequence of ints
707 707 Which engines to use for the map
708 708 block : boolean
709 709 Should calls to `map` block or not
710 710 """
711 711 mapper = self.mapper(dist, targets, block)
712 712 pf = ParallelFunction(mapper)
713 713 return pf
714 714
715 715 #---------------------------------------------------------------------------
716 716 # ISynchronousMultiEngineExtras related methods
717 717 #---------------------------------------------------------------------------
718 718
719 719 def _transformPullResult(self, pushResult, multitargets, lenKeys):
720 720 if not multitargets:
721 721 result = pushResult[0]
722 722 elif lenKeys > 1:
723 723 result = zip(*pushResult)
724 724 elif lenKeys is 1:
725 725 result = list(pushResult)
726 726 return result
727 727
728 728 def zip_pull(self, keys, targets='all', block=True):
729 729 multitargets = not isinstance(targets, int) and len(targets) > 1
730 730 lenKeys = len(keys)
731 731 d = self.pull(keys, targets=targets, block=block)
732 732 if block:
733 733 d.addCallback(self._transformPullResult, multitargets, lenKeys)
734 734 else:
735 735 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
736 736 return d
737 737
738 738 def run(self, fname, targets='all', block=True):
739 739 fileobj = open(fname,'r')
740 740 source = fileobj.read()
741 741 fileobj.close()
742 742 # if the compilation blows, we get a local error right away
743 743 try:
744 744 code = compile(source,fname,'exec')
745 745 except:
746 746 return defer.fail(failure.Failure())
747 747 # Now run the code
748 748 d = self.execute(source, targets=targets, block=block)
749 749 return d
750 750
751 751 #---------------------------------------------------------------------------
752 752 # IBlockingClientAdaptor related methods
753 753 #---------------------------------------------------------------------------
754 754
755 755 def adapt_to_blocking_client(self):
756 756 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
757 757 return IFullBlockingMultiEngineClient(self)
General Comments 0
You need to be logged in to leave comments. Login now