##// END OF EJS Templates
Fixing mysterious bug in IEnginePropertiesTestCase.strictDict....
Brian Granger -
Show More
@@ -1,906 +1,906 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3 3
4 4 """A Twisted Service Representation of the IPython core.
5 5
6 6 The IPython Core exposed to the network is called the Engine. Its
7 7 representation in Twisted in the EngineService. Interfaces and adapters
8 8 are used to abstract out the details of the actual network protocol used.
9 9 The EngineService is an Engine that knows nothing about the actual protocol
10 10 used.
11 11
12 12 The EngineService is exposed with various network protocols in modules like:
13 13
14 14 enginepb.py
15 15 enginevanilla.py
16 16
17 17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 18 felt that we had over-engineered things. To improve the maintainability of the
19 19 code we have taken out the ICompleteEngine interface and the completeEngine
20 20 method that automatically added methods to engines.
21 21
22 22 """
23 23
24 24 __docformat__ = "restructuredtext en"
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Copyright (C) 2008 The IPython Development Team
28 28 #
29 29 # Distributed under the terms of the BSD License. The full license is in
30 30 # the file COPYING, distributed as part of this software.
31 31 #-------------------------------------------------------------------------------
32 32
33 33 #-------------------------------------------------------------------------------
34 34 # Imports
35 35 #-------------------------------------------------------------------------------
36 36
37 37 # Tell nose to skip the testing of this module
38 38 __test__ = {}
39 39
40 40 import os, sys, copy
41 41 import cPickle as pickle
42 42 from new import instancemethod
43 43
44 44 from twisted.application import service
45 45 from twisted.internet import defer, reactor
46 46 from twisted.python import log, failure, components
47 47 import zope.interface as zi
48 48
49 49 from IPython.kernel.core.interpreter import Interpreter
50 50 from IPython.kernel import newserialized, error, util
51 51 from IPython.kernel.util import printer
52 52 from IPython.kernel.twistedutil import gatherBoth, DeferredList
53 53 from IPython.kernel import codeutil
54 54
55 55
56 56 #-------------------------------------------------------------------------------
57 57 # Interface specification for the Engine
58 58 #-------------------------------------------------------------------------------
59 59
60 60 class IEngineCore(zi.Interface):
61 61 """The minimal required interface for the IPython Engine.
62 62
63 63 This interface provides a formal specification of the IPython core.
64 64 All these methods should return deferreds regardless of what side of a
65 65 network connection they are on.
66 66
67 67 In general, this class simply wraps a shell class and wraps its return
68 68 values as Deferred objects. If the underlying shell class method raises
69 69 an exception, this class should convert it to a twisted.failure.Failure
70 70 that will be propagated along the Deferred's errback chain.
71 71
72 72 In addition, Failures are aggressive. By this, we mean that if a method
73 73 is performing multiple actions (like pulling multiple object) if any
74 74 single one fails, the entire method will fail with that Failure. It is
75 75 all or nothing.
76 76 """
77 77
78 78 id = zi.interface.Attribute("the id of the Engine object")
79 79 properties = zi.interface.Attribute("A dict of properties of the Engine")
80 80
81 81 def execute(lines):
82 82 """Execute lines of Python code.
83 83
84 84 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
85 85 upon success.
86 86
87 87 Returns a failure object if the execution of lines raises an exception.
88 88 """
89 89
90 90 def push(namespace):
91 91 """Push dict namespace into the user's namespace.
92 92
93 93 Returns a deferred to None or a failure.
94 94 """
95 95
96 96 def pull(keys):
97 97 """Pulls values out of the user's namespace by keys.
98 98
99 99 Returns a deferred to a tuple objects or a single object.
100 100
101 101 Raises NameError if any one of objects doess not exist.
102 102 """
103 103
104 104 def push_function(namespace):
105 105 """Push a dict of key, function pairs into the user's namespace.
106 106
107 107 Returns a deferred to None or a failure."""
108 108
109 109 def pull_function(keys):
110 110 """Pulls functions out of the user's namespace by keys.
111 111
112 112 Returns a deferred to a tuple of functions or a single function.
113 113
114 114 Raises NameError if any one of the functions does not exist.
115 115 """
116 116
117 117 def get_result(i=None):
118 118 """Get the stdin/stdout/stderr of command i.
119 119
120 120 Returns a deferred to a dict with keys
121 121 (id, number, stdin, stdout, stderr).
122 122
123 123 Raises IndexError if command i does not exist.
124 124 Raises TypeError if i in not an int.
125 125 """
126 126
127 127 def reset():
128 128 """Reset the shell.
129 129
130 130 This clears the users namespace. Won't cause modules to be
131 131 reloaded. Should also re-initialize certain variables like id.
132 132 """
133 133
134 134 def kill():
135 135 """Kill the engine by stopping the reactor."""
136 136
137 137 def keys():
138 138 """Return the top level variables in the users namspace.
139 139
140 140 Returns a deferred to a dict."""
141 141
142 142
143 143 class IEngineSerialized(zi.Interface):
144 144 """Push/Pull methods that take Serialized objects.
145 145
146 146 All methods should return deferreds.
147 147 """
148 148
149 149 def push_serialized(namespace):
150 150 """Push a dict of keys and Serialized objects into the user's namespace."""
151 151
152 152 def pull_serialized(keys):
153 153 """Pull objects by key from the user's namespace as Serialized.
154 154
155 155 Returns a list of or one Serialized.
156 156
157 157 Raises NameError is any one of the objects does not exist.
158 158 """
159 159
160 160
161 161 class IEngineProperties(zi.Interface):
162 162 """Methods for access to the properties object of an Engine"""
163 163
164 164 properties = zi.Attribute("A StrictDict object, containing the properties")
165 165
166 166 def set_properties(properties):
167 167 """set properties by key and value"""
168 168
169 169 def get_properties(keys=None):
170 170 """get a list of properties by `keys`, if no keys specified, get all"""
171 171
172 172 def del_properties(keys):
173 173 """delete properties by `keys`"""
174 174
175 175 def has_properties(keys):
176 176 """get a list of bool values for whether `properties` has `keys`"""
177 177
178 178 def clear_properties():
179 179 """clear the properties dict"""
180 180
181 181 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
182 182 """The basic engine interface that EngineService will implement.
183 183
184 184 This exists so it is easy to specify adapters that adapt to and from the
185 185 API that the basic EngineService implements.
186 186 """
187 187 pass
188 188
189 189 class IEngineQueued(IEngineBase):
190 190 """Interface for adding a queue to an IEngineBase.
191 191
192 192 This interface extends the IEngineBase interface to add methods for managing
193 193 the engine's queue. The implicit details of this interface are that the
194 194 execution of all methods declared in IEngineBase should appropriately be
195 195 put through a queue before execution.
196 196
197 197 All methods should return deferreds.
198 198 """
199 199
200 200 def clear_queue():
201 201 """Clear the queue."""
202 202
203 203 def queue_status():
204 204 """Get the queued and pending commands in the queue."""
205 205
206 206 def register_failure_observer(obs):
207 207 """Register an observer of pending Failures.
208 208
209 209 The observer must implement IFailureObserver.
210 210 """
211 211
212 212 def unregister_failure_observer(obs):
213 213 """Unregister an observer of pending Failures."""
214 214
215 215
216 216 class IEngineThreaded(zi.Interface):
217 217 """A place holder for threaded commands.
218 218
219 219 All methods should return deferreds.
220 220 """
221 221 pass
222 222
223 223
224 224 #-------------------------------------------------------------------------------
225 225 # Functions and classes to implement the EngineService
226 226 #-------------------------------------------------------------------------------
227 227
228 228
229 229 class StrictDict(dict):
230 230 """This is a strict copying dictionary for use as the interface to the
231 231 properties of an Engine.
232 232
233 233 :IMPORTANT:
234 234 This object copies the values you set to it, and returns copies to you
235 235 when you request them. The only way to change properties os explicitly
236 236 through the setitem and getitem of the dictionary interface.
237 237
238 238 Example:
239 239 >>> e = get_engine(id)
240 240 >>> L = [1,2,3]
241 241 >>> e.properties['L'] = L
242 242 >>> L == e.properties['L']
243 243 True
244 244 >>> L.append(99)
245 245 >>> L == e.properties['L']
246 246 False
247 247
248 248 Note that getitem copies, so calls to methods of objects do not affect
249 249 the properties, as seen here:
250 250
251 251 >>> e.properties[1] = range(2)
252 252 >>> print e.properties[1]
253 253 [0, 1]
254 254 >>> e.properties[1].append(2)
255 255 >>> print e.properties[1]
256 256 [0, 1]
257 257 """
258 258 def __init__(self, *args, **kwargs):
259 259 dict.__init__(self, *args, **kwargs)
260 260 self.modified = True
261 261
262 262 def __getitem__(self, key):
263 263 return copy.deepcopy(dict.__getitem__(self, key))
264 264
265 265 def __setitem__(self, key, value):
266 266 # check if this entry is valid for transport around the network
267 267 # and copying
268 268 try:
269 269 pickle.dumps(key, 2)
270 270 pickle.dumps(value, 2)
271 271 newvalue = copy.deepcopy(value)
272 except:
273 raise error.InvalidProperty(value)
272 except Exception, e:
273 raise error.InvalidProperty("can't be a value: %r" % value)
274 274 dict.__setitem__(self, key, newvalue)
275 275 self.modified = True
276 276
277 277 def __delitem__(self, key):
278 278 dict.__delitem__(self, key)
279 279 self.modified = True
280 280
281 281 def update(self, dikt):
282 282 for k,v in dikt.iteritems():
283 283 self[k] = v
284 284
285 285 def pop(self, key):
286 286 self.modified = True
287 287 return dict.pop(self, key)
288 288
289 289 def popitem(self):
290 290 self.modified = True
291 291 return dict.popitem(self)
292 292
293 293 def clear(self):
294 294 self.modified = True
295 295 dict.clear(self)
296 296
297 297 def subDict(self, *keys):
298 298 d = {}
299 299 for key in keys:
300 300 d[key] = self[key]
301 301 return d
302 302
303 303
304 304
305 305 class EngineAPI(object):
306 306 """This is the object through which the user can edit the `properties`
307 307 attribute of an Engine.
308 308 The Engine Properties object copies all object in and out of itself.
309 309 See the EngineProperties object for details.
310 310 """
311 311 _fix=False
312 312 def __init__(self, id):
313 313 self.id = id
314 314 self.properties = StrictDict()
315 315 self._fix=True
316 316
317 317 def __setattr__(self, k,v):
318 318 if self._fix:
319 319 raise error.KernelError("I am protected!")
320 320 else:
321 321 object.__setattr__(self, k, v)
322 322
323 323 def __delattr__(self, key):
324 324 raise error.KernelError("I am protected!")
325 325
326 326
327 327 _apiDict = {}
328 328
329 329 def get_engine(id):
330 330 """Get the Engine API object, whcih currently just provides the properties
331 331 object, by ID"""
332 332 global _apiDict
333 333 if not _apiDict.get(id):
334 334 _apiDict[id] = EngineAPI(id)
335 335 return _apiDict[id]
336 336
337 337 def drop_engine(id):
338 338 """remove an engine"""
339 339 global _apiDict
340 340 if _apiDict.has_key(id):
341 341 del _apiDict[id]
342 342
343 343 class EngineService(object, service.Service):
344 344 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
345 345
346 346 zi.implements(IEngineBase)
347 347 name = 'EngineService'
348 348
349 349 def __init__(self, shellClass=Interpreter, mpi=None):
350 350 """Create an EngineService.
351 351
352 352 shellClass: something that implements IInterpreter or core1
353 353 mpi: an mpi module that has rank and size attributes
354 354 """
355 355 self.shellClass = shellClass
356 356 self.shell = self.shellClass()
357 357 self.mpi = mpi
358 358 self.id = None
359 359 self.properties = get_engine(self.id).properties
360 360 if self.mpi is not None:
361 361 log.msg("MPI started with rank = %i and size = %i" %
362 362 (self.mpi.rank, self.mpi.size))
363 363 self.id = self.mpi.rank
364 364 self._seedNamespace()
365 365
366 366 # Make id a property so that the shell can get the updated id
367 367
368 368 def _setID(self, id):
369 369 self._id = id
370 370 self.properties = get_engine(id).properties
371 371 self.shell.push({'id': id})
372 372
373 373 def _getID(self):
374 374 return self._id
375 375
376 376 id = property(_getID, _setID)
377 377
378 378 def _seedNamespace(self):
379 379 self.shell.push({'mpi': self.mpi, 'id' : self.id})
380 380
381 381 def executeAndRaise(self, msg, callable, *args, **kwargs):
382 382 """Call a method of self.shell and wrap any exception."""
383 383 d = defer.Deferred()
384 384 try:
385 385 result = callable(*args, **kwargs)
386 386 except:
387 387 # This gives the following:
388 388 # et=exception class
389 389 # ev=exception class instance
390 390 # tb=traceback object
391 391 et,ev,tb = sys.exc_info()
392 392 # This call adds attributes to the exception value
393 393 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
394 394 # Add another attribute
395 395 ev._ipython_engine_info = msg
396 396 f = failure.Failure(ev,et,None)
397 397 d.errback(f)
398 398 else:
399 399 d.callback(result)
400 400
401 401 return d
402 402
403 403
404 404 # The IEngine methods. See the interface for documentation.
405 405
406 406 def execute(self, lines):
407 407 msg = {'engineid':self.id,
408 408 'method':'execute',
409 409 'args':[lines]}
410 410 d = self.executeAndRaise(msg, self.shell.execute, lines)
411 411 d.addCallback(self.addIDToResult)
412 412 return d
413 413
414 414 def addIDToResult(self, result):
415 415 result['id'] = self.id
416 416 return result
417 417
418 418 def push(self, namespace):
419 419 msg = {'engineid':self.id,
420 420 'method':'push',
421 421 'args':[repr(namespace.keys())]}
422 422 d = self.executeAndRaise(msg, self.shell.push, namespace)
423 423 return d
424 424
425 425 def pull(self, keys):
426 426 msg = {'engineid':self.id,
427 427 'method':'pull',
428 428 'args':[repr(keys)]}
429 429 d = self.executeAndRaise(msg, self.shell.pull, keys)
430 430 return d
431 431
432 432 def push_function(self, namespace):
433 433 msg = {'engineid':self.id,
434 434 'method':'push_function',
435 435 'args':[repr(namespace.keys())]}
436 436 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
437 437 return d
438 438
439 439 def pull_function(self, keys):
440 440 msg = {'engineid':self.id,
441 441 'method':'pull_function',
442 442 'args':[repr(keys)]}
443 443 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
444 444 return d
445 445
446 446 def get_result(self, i=None):
447 447 msg = {'engineid':self.id,
448 448 'method':'get_result',
449 449 'args':[repr(i)]}
450 450 d = self.executeAndRaise(msg, self.shell.getCommand, i)
451 451 d.addCallback(self.addIDToResult)
452 452 return d
453 453
454 454 def reset(self):
455 455 msg = {'engineid':self.id,
456 456 'method':'reset',
457 457 'args':[]}
458 458 del self.shell
459 459 self.shell = self.shellClass()
460 460 self.properties.clear()
461 461 d = self.executeAndRaise(msg, self._seedNamespace)
462 462 return d
463 463
464 464 def kill(self):
465 465 drop_engine(self.id)
466 466 try:
467 467 reactor.stop()
468 468 except RuntimeError:
469 469 log.msg('The reactor was not running apparently.')
470 470 return defer.fail()
471 471 else:
472 472 return defer.succeed(None)
473 473
474 474 def keys(self):
475 475 """Return a list of variables names in the users top level namespace.
476 476
477 477 This used to return a dict of all the keys/repr(values) in the
478 478 user's namespace. This was too much info for the ControllerService
479 479 to handle so it is now just a list of keys.
480 480 """
481 481
482 482 remotes = []
483 483 for k in self.shell.user_ns.iterkeys():
484 484 if k not in ['__name__', '_ih', '_oh', '__builtins__',
485 485 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
486 486 remotes.append(k)
487 487 return defer.succeed(remotes)
488 488
489 489 def set_properties(self, properties):
490 490 msg = {'engineid':self.id,
491 491 'method':'set_properties',
492 492 'args':[repr(properties.keys())]}
493 493 return self.executeAndRaise(msg, self.properties.update, properties)
494 494
495 495 def get_properties(self, keys=None):
496 496 msg = {'engineid':self.id,
497 497 'method':'get_properties',
498 498 'args':[repr(keys)]}
499 499 if keys is None:
500 500 keys = self.properties.keys()
501 501 return self.executeAndRaise(msg, self.properties.subDict, *keys)
502 502
503 503 def _doDel(self, keys):
504 504 for key in keys:
505 505 del self.properties[key]
506 506
507 507 def del_properties(self, keys):
508 508 msg = {'engineid':self.id,
509 509 'method':'del_properties',
510 510 'args':[repr(keys)]}
511 511 return self.executeAndRaise(msg, self._doDel, keys)
512 512
513 513 def _doHas(self, keys):
514 514 return [self.properties.has_key(key) for key in keys]
515 515
516 516 def has_properties(self, keys):
517 517 msg = {'engineid':self.id,
518 518 'method':'has_properties',
519 519 'args':[repr(keys)]}
520 520 return self.executeAndRaise(msg, self._doHas, keys)
521 521
522 522 def clear_properties(self):
523 523 msg = {'engineid':self.id,
524 524 'method':'clear_properties',
525 525 'args':[]}
526 526 return self.executeAndRaise(msg, self.properties.clear)
527 527
528 528 def push_serialized(self, sNamespace):
529 529 msg = {'engineid':self.id,
530 530 'method':'push_serialized',
531 531 'args':[repr(sNamespace.keys())]}
532 532 ns = {}
533 533 for k,v in sNamespace.iteritems():
534 534 try:
535 535 unserialized = newserialized.IUnSerialized(v)
536 536 ns[k] = unserialized.getObject()
537 537 except:
538 538 return defer.fail()
539 539 return self.executeAndRaise(msg, self.shell.push, ns)
540 540
541 541 def pull_serialized(self, keys):
542 542 msg = {'engineid':self.id,
543 543 'method':'pull_serialized',
544 544 'args':[repr(keys)]}
545 545 if isinstance(keys, str):
546 546 keys = [keys]
547 547 if len(keys)==1:
548 548 d = self.executeAndRaise(msg, self.shell.pull, keys)
549 549 d.addCallback(newserialized.serialize)
550 550 return d
551 551 elif len(keys)>1:
552 552 d = self.executeAndRaise(msg, self.shell.pull, keys)
553 553 @d.addCallback
554 554 def packThemUp(values):
555 555 serials = []
556 556 for v in values:
557 557 try:
558 558 serials.append(newserialized.serialize(v))
559 559 except:
560 560 return defer.fail(failure.Failure())
561 561 return serials
562 562 return packThemUp
563 563
564 564
565 565 def queue(methodToQueue):
566 566 def queuedMethod(this, *args, **kwargs):
567 567 name = methodToQueue.__name__
568 568 return this.submitCommand(Command(name, *args, **kwargs))
569 569 return queuedMethod
570 570
571 571 class QueuedEngine(object):
572 572 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
573 573
574 574 The resulting object will implement IEngineQueued which extends
575 575 IEngineCore which extends (IEngineBase, IEngineSerialized).
576 576
577 577 This seems like the best way of handling it, but I am not sure. The
578 578 other option is to have the various base interfaces be used like
579 579 mix-in intefaces. The problem I have with this is adpatation is
580 580 more difficult and complicated because there can be can multiple
581 581 original and final Interfaces.
582 582 """
583 583
584 584 zi.implements(IEngineQueued)
585 585
586 586 def __init__(self, engine):
587 587 """Create a QueuedEngine object from an engine
588 588
589 589 engine: An implementor of IEngineCore and IEngineSerialized
590 590 keepUpToDate: whether to update the remote status when the
591 591 queue is empty. Defaults to False.
592 592 """
593 593
594 594 # This is the right way to do these tests rather than
595 595 # IEngineCore in list(zi.providedBy(engine)) which will only
596 596 # picks of the interfaces that are directly declared by engine.
597 597 assert IEngineBase.providedBy(engine), \
598 598 "engine passed to QueuedEngine doesn't provide IEngineBase"
599 599
600 600 self.engine = engine
601 601 self.id = engine.id
602 602 self.queued = []
603 603 self.history = {}
604 604 self.engineStatus = {}
605 605 self.currentCommand = None
606 606 self.failureObservers = []
607 607
608 608 def _get_properties(self):
609 609 return self.engine.properties
610 610
611 611 properties = property(_get_properties, lambda self, _: None)
612 612 # Queue management methods. You should not call these directly
613 613
614 614 def submitCommand(self, cmd):
615 615 """Submit command to queue."""
616 616
617 617 d = defer.Deferred()
618 618 cmd.setDeferred(d)
619 619 if self.currentCommand is not None:
620 620 if self.currentCommand.finished:
621 621 # log.msg("Running command immediately: %r" % cmd)
622 622 self.currentCommand = cmd
623 623 self.runCurrentCommand()
624 624 else: # command is still running
625 625 # log.msg("Command is running: %r" % self.currentCommand)
626 626 # log.msg("Queueing: %r" % cmd)
627 627 self.queued.append(cmd)
628 628 else:
629 629 # log.msg("No current commands, running: %r" % cmd)
630 630 self.currentCommand = cmd
631 631 self.runCurrentCommand()
632 632 return d
633 633
634 634 def runCurrentCommand(self):
635 635 """Run current command."""
636 636
637 637 cmd = self.currentCommand
638 638 f = getattr(self.engine, cmd.remoteMethod, None)
639 639 if f:
640 640 d = f(*cmd.args, **cmd.kwargs)
641 641 if cmd.remoteMethod is 'execute':
642 642 d.addCallback(self.saveResult)
643 643 d.addCallback(self.finishCommand)
644 644 d.addErrback(self.abortCommand)
645 645 else:
646 646 return defer.fail(AttributeError(cmd.remoteMethod))
647 647
648 648 def _flushQueue(self):
649 649 """Pop next command in queue and run it."""
650 650
651 651 if len(self.queued) > 0:
652 652 self.currentCommand = self.queued.pop(0)
653 653 self.runCurrentCommand()
654 654
655 655 def saveResult(self, result):
656 656 """Put the result in the history."""
657 657 self.history[result['number']] = result
658 658 return result
659 659
660 660 def finishCommand(self, result):
661 661 """Finish currrent command."""
662 662
663 663 # The order of these commands is absolutely critical.
664 664 self.currentCommand.handleResult(result)
665 665 self.currentCommand.finished = True
666 666 self._flushQueue()
667 667 return result
668 668
669 669 def abortCommand(self, reason):
670 670 """Abort current command.
671 671
672 672 This eats the Failure but first passes it onto the Deferred that the
673 673 user has.
674 674
675 675 It also clear out the queue so subsequence commands don't run.
676 676 """
677 677
678 678 # The order of these 3 commands is absolutely critical. The currentCommand
679 679 # must first be marked as finished BEFORE the queue is cleared and before
680 680 # the current command is sent the failure.
681 681 # Also, the queue must be cleared BEFORE the current command is sent the Failure
682 682 # otherwise the errback chain could trigger new commands to be added to the
683 683 # queue before we clear it. We should clear ONLY the commands that were in
684 684 # the queue when the error occured.
685 685 self.currentCommand.finished = True
686 686 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
687 687 self.clear_queue(msg=s)
688 688 self.currentCommand.handleError(reason)
689 689
690 690 return None
691 691
692 692 #---------------------------------------------------------------------------
693 693 # IEngineCore methods
694 694 #---------------------------------------------------------------------------
695 695
696 696 @queue
697 697 def execute(self, lines):
698 698 pass
699 699
700 700 @queue
701 701 def push(self, namespace):
702 702 pass
703 703
704 704 @queue
705 705 def pull(self, keys):
706 706 pass
707 707
708 708 @queue
709 709 def push_function(self, namespace):
710 710 pass
711 711
712 712 @queue
713 713 def pull_function(self, keys):
714 714 pass
715 715
716 716 def get_result(self, i=None):
717 717 if i is None:
718 718 i = max(self.history.keys()+[None])
719 719
720 720 cmd = self.history.get(i, None)
721 721 # Uncomment this line to disable chaching of results
722 722 #cmd = None
723 723 if cmd is None:
724 724 return self.submitCommand(Command('get_result', i))
725 725 else:
726 726 return defer.succeed(cmd)
727 727
728 728 def reset(self):
729 729 self.clear_queue()
730 730 self.history = {} # reset the cache - I am not sure we should do this
731 731 return self.submitCommand(Command('reset'))
732 732
733 733 def kill(self):
734 734 self.clear_queue()
735 735 return self.submitCommand(Command('kill'))
736 736
737 737 @queue
738 738 def keys(self):
739 739 pass
740 740
741 741 #---------------------------------------------------------------------------
742 742 # IEngineSerialized methods
743 743 #---------------------------------------------------------------------------
744 744
745 745 @queue
746 746 def push_serialized(self, namespace):
747 747 pass
748 748
749 749 @queue
750 750 def pull_serialized(self, keys):
751 751 pass
752 752
753 753 #---------------------------------------------------------------------------
754 754 # IEngineProperties methods
755 755 #---------------------------------------------------------------------------
756 756
757 757 @queue
758 758 def set_properties(self, namespace):
759 759 pass
760 760
761 761 @queue
762 762 def get_properties(self, keys=None):
763 763 pass
764 764
765 765 @queue
766 766 def del_properties(self, keys):
767 767 pass
768 768
769 769 @queue
770 770 def has_properties(self, keys):
771 771 pass
772 772
773 773 @queue
774 774 def clear_properties(self):
775 775 pass
776 776
777 777 #---------------------------------------------------------------------------
778 778 # IQueuedEngine methods
779 779 #---------------------------------------------------------------------------
780 780
781 781 def clear_queue(self, msg=''):
782 782 """Clear the queue, but doesn't cancel the currently running commmand."""
783 783
784 784 for cmd in self.queued:
785 785 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
786 786 self.queued = []
787 787 return defer.succeed(None)
788 788
789 789 def queue_status(self):
790 790 if self.currentCommand is not None:
791 791 if self.currentCommand.finished:
792 792 pending = repr(None)
793 793 else:
794 794 pending = repr(self.currentCommand)
795 795 else:
796 796 pending = repr(None)
797 797 dikt = {'queue':map(repr,self.queued), 'pending':pending}
798 798 return defer.succeed(dikt)
799 799
800 800 def register_failure_observer(self, obs):
801 801 self.failureObservers.append(obs)
802 802
803 803 def unregister_failure_observer(self, obs):
804 804 self.failureObservers.remove(obs)
805 805
806 806
807 807 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
808 808 # IEngineQueued.
809 809 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
810 810
811 811
812 812 class Command(object):
813 813 """A command object that encapslates queued commands.
814 814
815 815 This class basically keeps track of a command that has been queued
816 816 in a QueuedEngine. It manages the deferreds and hold the method to be called
817 817 and the arguments to that method.
818 818 """
819 819
820 820
821 821 def __init__(self, remoteMethod, *args, **kwargs):
822 822 """Build a new Command object."""
823 823
824 824 self.remoteMethod = remoteMethod
825 825 self.args = args
826 826 self.kwargs = kwargs
827 827 self.finished = False
828 828
829 829 def setDeferred(self, d):
830 830 """Sets the deferred attribute of the Command."""
831 831
832 832 self.deferred = d
833 833
834 834 def __repr__(self):
835 835 if not self.args:
836 836 args = ''
837 837 else:
838 838 args = str(self.args)[1:-2] #cut off (...,)
839 839 for k,v in self.kwargs.iteritems():
840 840 if args:
841 841 args += ', '
842 842 args += '%s=%r' %(k,v)
843 843 return "%s(%s)" %(self.remoteMethod, args)
844 844
845 845 def handleResult(self, result):
846 846 """When the result is ready, relay it to self.deferred."""
847 847
848 848 self.deferred.callback(result)
849 849
850 850 def handleError(self, reason):
851 851 """When an error has occured, relay it to self.deferred."""
852 852
853 853 self.deferred.errback(reason)
854 854
855 855 class ThreadedEngineService(EngineService):
856 856 """An EngineService subclass that defers execute commands to a separate
857 857 thread.
858 858
859 859 ThreadedEngineService uses twisted.internet.threads.deferToThread to
860 860 defer execute requests to a separate thread. GUI frontends may want to
861 861 use ThreadedEngineService as the engine in an
862 862 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
863 863 block execution from blocking the GUI thread.
864 864 """
865 865
866 866 zi.implements(IEngineBase)
867 867
868 868 def __init__(self, shellClass=Interpreter, mpi=None):
869 869 EngineService.__init__(self, shellClass, mpi)
870 870
871 871 def wrapped_execute(self, msg, lines):
872 872 """Wrap self.shell.execute to add extra information to tracebacks"""
873 873
874 874 try:
875 875 result = self.shell.execute(lines)
876 876 except Exception,e:
877 877 # This gives the following:
878 878 # et=exception class
879 879 # ev=exception class instance
880 880 # tb=traceback object
881 881 et,ev,tb = sys.exc_info()
882 882 # This call adds attributes to the exception value
883 883 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
884 884 # Add another attribute
885 885
886 886 # Create a new exception with the new attributes
887 887 e = et(ev._ipython_traceback_text)
888 888 e._ipython_engine_info = msg
889 889
890 890 # Re-raise
891 891 raise e
892 892
893 893 return result
894 894
895 895
896 896 def execute(self, lines):
897 897 # Only import this if we are going to use this class
898 898 from twisted.internet import threads
899 899
900 900 msg = {'engineid':self.id,
901 901 'method':'execute',
902 902 'args':[lines]}
903 903
904 904 d = threads.deferToThread(self.wrapped_execute, msg, lines)
905 905 d.addCallback(self.addIDToResult)
906 906 return d
@@ -1,373 +1,374 b''
1 1 # encoding: utf-8
2 2
3 3 """Test template for complete engine object"""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 import cPickle as pickle
19 19
20 20 from twisted.internet import defer, reactor
21 21 from twisted.python import failure
22 22 from twisted.application import service
23 23 import zope.interface as zi
24 24
25 25 from IPython.kernel import newserialized
26 26 from IPython.kernel import error
27 27 from IPython.kernel.pickleutil import can, uncan
28 28 import IPython.kernel.engineservice as es
29 29 from IPython.kernel.core.interpreter import Interpreter
30 30 from IPython.testing.parametric import Parametric, parametric
31 31
32 32 #-------------------------------------------------------------------------------
33 33 # Tests
34 34 #-------------------------------------------------------------------------------
35 35
36 36
37 37 # A sequence of valid commands run through execute
38 38 validCommands = ['a=5',
39 39 'b=10',
40 40 'a=5; b=10; c=a+b',
41 41 'import math; 2.0*math.pi',
42 42 """def f():
43 43 result = 0.0
44 44 for i in range(10):
45 45 result += i
46 46 """,
47 47 'if 1<2: a=5',
48 48 """import time
49 49 time.sleep(0.1)""",
50 50 """from math import cos;
51 51 x = 1.0*cos(0.5)""", # Semicolons lead to Discard ast nodes that should be discarded
52 52 """s = 1
53 53 s = set()
54 54 """, # Trailing whitespace should be allowed.
55 55 """import math
56 56 math.cos(1.0)""", # Test a method call with a discarded return value
57 57 """x=1.0234
58 58 a=5; b=10""", # Test an embedded semicolon
59 59 """x=1.0234
60 60 a=5; b=10;""" # Test both an embedded and trailing semicolon
61 61 ]
62 62
63 63 # A sequence of commands that raise various exceptions
64 64 invalidCommands = [('a=1/0',ZeroDivisionError),
65 65 ('print v',NameError),
66 66 ('l=[];l[0]',IndexError),
67 67 ("d={};d['a']",KeyError),
68 68 ("assert 1==0",AssertionError),
69 69 ("import abababsdbfsbaljasdlja",ImportError),
70 70 ("raise Exception()",Exception)]
71 71
72 72 def testf(x):
73 73 return 2.0*x
74 74
75 75 globala = 99
76 76
77 77 def testg(x):
78 78 return globala*x
79 79
80 80 class IEngineCoreTestCase(object):
81 81 """Test an IEngineCore implementer."""
82 82
83 83 def createShell(self):
84 84 return Interpreter()
85 85
86 86 def catchQueueCleared(self, f):
87 87 try:
88 88 f.raiseException()
89 89 except error.QueueCleared:
90 90 pass
91 91
92 92 def testIEngineCoreInterface(self):
93 93 """Does self.engine claim to implement IEngineCore?"""
94 94 self.assert_(es.IEngineCore.providedBy(self.engine))
95 95
96 96 def testIEngineCoreInterfaceMethods(self):
97 97 """Does self.engine have the methods and attributes in IEngineCore."""
98 98 for m in list(es.IEngineCore):
99 99 self.assert_(hasattr(self.engine, m))
100 100
101 101 def testIEngineCoreDeferreds(self):
102 102 d = self.engine.execute('a=5')
103 103 d.addCallback(lambda _: self.engine.pull('a'))
104 104 d.addCallback(lambda _: self.engine.get_result())
105 105 d.addCallback(lambda _: self.engine.keys())
106 106 d.addCallback(lambda _: self.engine.push(dict(a=10)))
107 107 return d
108 108
109 109 def runTestExecute(self, cmd):
110 110 self.shell = Interpreter()
111 111 actual = self.shell.execute(cmd)
112 112 def compare(computed):
113 113 actual['id'] = computed['id']
114 114 self.assertEquals(actual, computed)
115 115 d = self.engine.execute(cmd)
116 116 d.addCallback(compare)
117 117 return d
118 118
119 119 @parametric
120 120 def testExecute(cls):
121 121 return [(cls.runTestExecute, cmd) for cmd in validCommands]
122 122
123 123 def runTestExecuteFailures(self, cmd, exc):
124 124 def compare(f):
125 125 self.assertRaises(exc, f.raiseException)
126 126 d = self.engine.execute(cmd)
127 127 d.addErrback(compare)
128 128 return d
129 129
130 130 @parametric
131 131 def testExecuteFailuresEngineService(cls):
132 132 return [(cls.runTestExecuteFailures, cmd, exc)
133 133 for cmd, exc in invalidCommands]
134 134
135 135 def runTestPushPull(self, o):
136 136 d = self.engine.push(dict(a=o))
137 137 d.addCallback(lambda r: self.engine.pull('a'))
138 138 d.addCallback(lambda r: self.assertEquals(o,r))
139 139 return d
140 140
141 141 @parametric
142 142 def testPushPull(cls):
143 143 objs = [10,"hi there",1.2342354,{"p":(1,2)},None]
144 144 return [(cls.runTestPushPull, o) for o in objs]
145 145
146 146 def testPullNameError(self):
147 147 d = self.engine.push(dict(a=5))
148 148 d.addCallback(lambda _:self.engine.reset())
149 149 d.addCallback(lambda _: self.engine.pull("a"))
150 150 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
151 151 return d
152 152
153 153 def testPushPullFailures(self):
154 154 d = self.engine.pull('a')
155 155 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
156 156 d.addCallback(lambda _: self.engine.execute('l = lambda x: x'))
157 157 d.addCallback(lambda _: self.engine.pull('l'))
158 158 d.addErrback(lambda f: self.assertRaises(pickle.PicklingError, f.raiseException))
159 159 d.addCallback(lambda _: self.engine.push(dict(l=lambda x: x)))
160 160 d.addErrback(lambda f: self.assertRaises(pickle.PicklingError, f.raiseException))
161 161 return d
162 162
163 163 def testPushPullArray(self):
164 164 try:
165 165 import numpy
166 166 except:
167 167 return
168 168 a = numpy.random.random(1000)
169 169 d = self.engine.push(dict(a=a))
170 170 d.addCallback(lambda _: self.engine.pull('a'))
171 171 d.addCallback(lambda b: b==a)
172 172 d.addCallback(lambda c: c.all())
173 173 return self.assertDeferredEquals(d, True)
174 174
175 175 def testPushFunction(self):
176 176
177 177 d = self.engine.push_function(dict(f=testf))
178 178 d.addCallback(lambda _: self.engine.execute('result = f(10)'))
179 179 d.addCallback(lambda _: self.engine.pull('result'))
180 180 d.addCallback(lambda r: self.assertEquals(r, testf(10)))
181 181 return d
182 182
183 183 def testPullFunction(self):
184 184 d = self.engine.push_function(dict(f=testf, g=testg))
185 185 d.addCallback(lambda _: self.engine.pull_function(('f','g')))
186 186 d.addCallback(lambda r: self.assertEquals(r[0](10), testf(10)))
187 187 return d
188 188
189 189 def testPushFunctionGlobal(self):
190 190 """Make sure that pushed functions pick up the user's namespace for globals."""
191 191 d = self.engine.push(dict(globala=globala))
192 192 d.addCallback(lambda _: self.engine.push_function(dict(g=testg)))
193 193 d.addCallback(lambda _: self.engine.execute('result = g(10)'))
194 194 d.addCallback(lambda _: self.engine.pull('result'))
195 195 d.addCallback(lambda r: self.assertEquals(r, testg(10)))
196 196 return d
197 197
198 198 def testGetResultFailure(self):
199 199 d = self.engine.get_result(None)
200 200 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
201 201 d.addCallback(lambda _: self.engine.get_result(10))
202 202 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
203 203 return d
204 204
205 205 def runTestGetResult(self, cmd):
206 206 self.shell = Interpreter()
207 207 actual = self.shell.execute(cmd)
208 208 def compare(computed):
209 209 actual['id'] = computed['id']
210 210 self.assertEquals(actual, computed)
211 211 d = self.engine.execute(cmd)
212 212 d.addCallback(lambda r: self.engine.get_result(r['number']))
213 213 d.addCallback(compare)
214 214 return d
215 215
216 216 @parametric
217 217 def testGetResult(cls):
218 218 return [(cls.runTestGetResult, cmd) for cmd in validCommands]
219 219
220 220 def testGetResultDefault(self):
221 221 cmd = 'a=5'
222 222 shell = self.createShell()
223 223 shellResult = shell.execute(cmd)
224 224 def popit(dikt, key):
225 225 dikt.pop(key)
226 226 return dikt
227 227 d = self.engine.execute(cmd)
228 228 d.addCallback(lambda _: self.engine.get_result())
229 229 d.addCallback(lambda r: self.assertEquals(shellResult, popit(r,'id')))
230 230 return d
231 231
232 232 def testKeys(self):
233 233 d = self.engine.keys()
234 234 d.addCallback(lambda s: isinstance(s, list))
235 235 d.addCallback(lambda r: self.assertEquals(r, True))
236 236 return d
237 237
238 238 Parametric(IEngineCoreTestCase)
239 239
240 240 class IEngineSerializedTestCase(object):
241 241 """Test an IEngineCore implementer."""
242 242
243 243 def testIEngineSerializedInterface(self):
244 244 """Does self.engine claim to implement IEngineCore?"""
245 245 self.assert_(es.IEngineSerialized.providedBy(self.engine))
246 246
247 247 def testIEngineSerializedInterfaceMethods(self):
248 248 """Does self.engine have the methods and attributes in IEngineCore."""
249 249 for m in list(es.IEngineSerialized):
250 250 self.assert_(hasattr(self.engine, m))
251 251
252 252 def testIEngineSerializedDeferreds(self):
253 253 dList = []
254 254 d = self.engine.push_serialized(dict(key=newserialized.serialize(12345)))
255 255 self.assert_(isinstance(d, defer.Deferred))
256 256 dList.append(d)
257 257 d = self.engine.pull_serialized('key')
258 258 self.assert_(isinstance(d, defer.Deferred))
259 259 dList.append(d)
260 260 D = defer.DeferredList(dList)
261 261 return D
262 262
263 263 def testPushPullSerialized(self):
264 264 objs = [10,"hi there",1.2342354,{"p":(1,2)}]
265 265 d = defer.succeed(None)
266 266 for o in objs:
267 267 self.engine.push_serialized(dict(key=newserialized.serialize(o)))
268 268 value = self.engine.pull_serialized('key')
269 269 value.addCallback(lambda serial: newserialized.IUnSerialized(serial).getObject())
270 270 d = self.assertDeferredEquals(value,o,d)
271 271 return d
272 272
273 273 def testPullSerializedFailures(self):
274 274 d = self.engine.pull_serialized('a')
275 275 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
276 276 d.addCallback(lambda _: self.engine.execute('l = lambda x: x'))
277 277 d.addCallback(lambda _: self.engine.pull_serialized('l'))
278 278 d.addErrback(lambda f: self.assertRaises(pickle.PicklingError, f.raiseException))
279 279 return d
280 280
281 281 Parametric(IEngineSerializedTestCase)
282 282
283 283 class IEngineQueuedTestCase(object):
284 284 """Test an IEngineQueued implementer."""
285 285
286 286 def testIEngineQueuedInterface(self):
287 287 """Does self.engine claim to implement IEngineQueued?"""
288 288 self.assert_(es.IEngineQueued.providedBy(self.engine))
289 289
290 290 def testIEngineQueuedInterfaceMethods(self):
291 291 """Does self.engine have the methods and attributes in IEngineQueued."""
292 292 for m in list(es.IEngineQueued):
293 293 self.assert_(hasattr(self.engine, m))
294 294
295 295 def testIEngineQueuedDeferreds(self):
296 296 dList = []
297 297 d = self.engine.clear_queue()
298 298 self.assert_(isinstance(d, defer.Deferred))
299 299 dList.append(d)
300 300 d = self.engine.queue_status()
301 301 self.assert_(isinstance(d, defer.Deferred))
302 302 dList.append(d)
303 303 D = defer.DeferredList(dList)
304 304 return D
305 305
306 306 def testClearQueue(self):
307 307 result = self.engine.clear_queue()
308 308 d1 = self.assertDeferredEquals(result, None)
309 309 d1.addCallback(lambda _: self.engine.queue_status())
310 310 d2 = self.assertDeferredEquals(d1, {'queue':[], 'pending':'None'})
311 311 return d2
312 312
313 313 def testQueueStatus(self):
314 314 result = self.engine.queue_status()
315 315 result.addCallback(lambda r: 'queue' in r and 'pending' in r)
316 316 d = self.assertDeferredEquals(result, True)
317 317 return d
318 318
319 319 Parametric(IEngineQueuedTestCase)
320 320
321 321 class IEnginePropertiesTestCase(object):
322 322 """Test an IEngineProperties implementor."""
323 323
324 324 def testIEnginePropertiesInterface(self):
325 325 """Does self.engine claim to implement IEngineProperties?"""
326 326 self.assert_(es.IEngineProperties.providedBy(self.engine))
327 327
328 328 def testIEnginePropertiesInterfaceMethods(self):
329 329 """Does self.engine have the methods and attributes in IEngineProperties."""
330 330 for m in list(es.IEngineProperties):
331 331 self.assert_(hasattr(self.engine, m))
332 332
333 333 def testGetSetProperties(self):
334 334 dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
335 335 d = self.engine.set_properties(dikt)
336 336 d.addCallback(lambda r: self.engine.get_properties())
337 337 d = self.assertDeferredEquals(d, dikt)
338 338 d.addCallback(lambda r: self.engine.get_properties(('c',)))
339 339 d = self.assertDeferredEquals(d, {'c': dikt['c']})
340 340 d.addCallback(lambda r: self.engine.set_properties(dict(c=False)))
341 341 d.addCallback(lambda r: self.engine.get_properties(('c', 'd')))
342 342 d = self.assertDeferredEquals(d, dict(c=False, d=None))
343 343 return d
344 344
345 345 def testClearProperties(self):
346 346 dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
347 347 d = self.engine.set_properties(dikt)
348 348 d.addCallback(lambda r: self.engine.clear_properties())
349 349 d.addCallback(lambda r: self.engine.get_properties())
350 350 d = self.assertDeferredEquals(d, {})
351 351 return d
352 352
353 353 def testDelHasProperties(self):
354 354 dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
355 355 d = self.engine.set_properties(dikt)
356 356 d.addCallback(lambda r: self.engine.del_properties(('b','e')))
357 357 d.addCallback(lambda r: self.engine.has_properties(('a','b','c','d','e')))
358 358 d = self.assertDeferredEquals(d, [True, False, True, True, False])
359 359 return d
360 360
361 361 def testStrictDict(self):
362 362 s = """from IPython.kernel.engineservice import get_engine
363 363 p = get_engine(%s).properties"""%self.engine.id
364 364 d = self.engine.execute(s)
365 365 d.addCallback(lambda r: self.engine.execute("p['a'] = lambda _:None"))
366 d = self.assertDeferredRaises(d, error.InvalidProperty)
366 d.addErrback(lambda f: self.assertRaises(error.InvalidProperty,
367 f.raiseException))
367 368 d.addCallback(lambda r: self.engine.execute("p['a'] = range(5)"))
368 369 d.addCallback(lambda r: self.engine.execute("p['a'].append(5)"))
369 370 d.addCallback(lambda r: self.engine.get_properties('a'))
370 371 d = self.assertDeferredEquals(d, dict(a=range(5)))
371 372 return d
372 373
373 374 Parametric(IEnginePropertiesTestCase)
General Comments 0
You need to be logged in to leave comments. Login now