##// END OF EJS Templates
Merging in vvatsa's ssh mode for ipcluster with some changes....
Brian Granger -
Show More
@@ -1,904 +1,903 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3 3
4 4 """A Twisted Service Representation of the IPython core.
5 5
6 6 The IPython Core exposed to the network is called the Engine. Its
7 7 representation in Twisted in the EngineService. Interfaces and adapters
8 8 are used to abstract out the details of the actual network protocol used.
9 9 The EngineService is an Engine that knows nothing about the actual protocol
10 10 used.
11 11
12 12 The EngineService is exposed with various network protocols in modules like:
13 13
14 14 enginepb.py
15 15 enginevanilla.py
16 16
17 17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 18 felt that we had over-engineered things. To improve the maintainability of the
19 19 code we have taken out the ICompleteEngine interface and the completeEngine
20 20 method that automatically added methods to engines.
21 21
22 22 """
23 23
24 24 __docformat__ = "restructuredtext en"
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Copyright (C) 2008 The IPython Development Team
28 28 #
29 29 # Distributed under the terms of the BSD License. The full license is in
30 30 # the file COPYING, distributed as part of this software.
31 31 #-------------------------------------------------------------------------------
32 32
33 33 #-------------------------------------------------------------------------------
34 34 # Imports
35 35 #-------------------------------------------------------------------------------
36 36
37 37 import os, sys, copy
38 38 import cPickle as pickle
39 39 from new import instancemethod
40 40
41 41 from twisted.application import service
42 42 from twisted.internet import defer, reactor
43 43 from twisted.python import log, failure, components
44 44 import zope.interface as zi
45 45
46 46 from IPython.kernel.core.interpreter import Interpreter
47 47 from IPython.kernel import newserialized, error, util
48 48 from IPython.kernel.util import printer
49 49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
50 50 from IPython.kernel import codeutil
51 51
52 52
53 53 #-------------------------------------------------------------------------------
54 54 # Interface specification for the Engine
55 55 #-------------------------------------------------------------------------------
56 56
57 57 class IEngineCore(zi.Interface):
58 58 """The minimal required interface for the IPython Engine.
59 59
60 60 This interface provides a formal specification of the IPython core.
61 61 All these methods should return deferreds regardless of what side of a
62 62 network connection they are on.
63 63
64 64 In general, this class simply wraps a shell class and wraps its return
65 65 values as Deferred objects. If the underlying shell class method raises
66 66 an exception, this class should convert it to a twisted.failure.Failure
67 67 that will be propagated along the Deferred's errback chain.
68 68
69 69 In addition, Failures are aggressive. By this, we mean that if a method
70 70 is performing multiple actions (like pulling multiple object) if any
71 71 single one fails, the entire method will fail with that Failure. It is
72 72 all or nothing.
73 73 """
74 74
75 75 id = zi.interface.Attribute("the id of the Engine object")
76 76 properties = zi.interface.Attribute("A dict of properties of the Engine")
77 77
78 78 def execute(lines):
79 79 """Execute lines of Python code.
80 80
81 81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
82 82 upon success.
83 83
84 84 Returns a failure object if the execution of lines raises an exception.
85 85 """
86 86
87 87 def push(namespace):
88 88 """Push dict namespace into the user's namespace.
89 89
90 90 Returns a deferred to None or a failure.
91 91 """
92 92
93 93 def pull(keys):
94 94 """Pulls values out of the user's namespace by keys.
95 95
96 96 Returns a deferred to a tuple objects or a single object.
97 97
98 98 Raises NameError if any one of objects doess not exist.
99 99 """
100 100
101 101 def push_function(namespace):
102 102 """Push a dict of key, function pairs into the user's namespace.
103 103
104 104 Returns a deferred to None or a failure."""
105 105
106 106 def pull_function(keys):
107 107 """Pulls functions out of the user's namespace by keys.
108 108
109 109 Returns a deferred to a tuple of functions or a single function.
110 110
111 111 Raises NameError if any one of the functions does not exist.
112 112 """
113 113
114 114 def get_result(i=None):
115 115 """Get the stdin/stdout/stderr of command i.
116 116
117 117 Returns a deferred to a dict with keys
118 118 (id, number, stdin, stdout, stderr).
119 119
120 120 Raises IndexError if command i does not exist.
121 121 Raises TypeError if i in not an int.
122 122 """
123 123
124 124 def reset():
125 125 """Reset the shell.
126 126
127 127 This clears the users namespace. Won't cause modules to be
128 128 reloaded. Should also re-initialize certain variables like id.
129 129 """
130 130
131 131 def kill():
132 132 """Kill the engine by stopping the reactor."""
133 133
134 134 def keys():
135 135 """Return the top level variables in the users namspace.
136 136
137 137 Returns a deferred to a dict."""
138 138
139 139
140 140 class IEngineSerialized(zi.Interface):
141 141 """Push/Pull methods that take Serialized objects.
142 142
143 143 All methods should return deferreds.
144 144 """
145 145
146 146 def push_serialized(namespace):
147 147 """Push a dict of keys and Serialized objects into the user's namespace."""
148 148
149 149 def pull_serialized(keys):
150 150 """Pull objects by key from the user's namespace as Serialized.
151 151
152 152 Returns a list of or one Serialized.
153 153
154 154 Raises NameError is any one of the objects does not exist.
155 155 """
156 156
157 157
158 158 class IEngineProperties(zi.Interface):
159 159 """Methods for access to the properties object of an Engine"""
160 160
161 161 properties = zi.Attribute("A StrictDict object, containing the properties")
162 162
163 163 def set_properties(properties):
164 164 """set properties by key and value"""
165 165
166 166 def get_properties(keys=None):
167 167 """get a list of properties by `keys`, if no keys specified, get all"""
168 168
169 169 def del_properties(keys):
170 170 """delete properties by `keys`"""
171 171
172 172 def has_properties(keys):
173 173 """get a list of bool values for whether `properties` has `keys`"""
174 174
175 175 def clear_properties():
176 176 """clear the properties dict"""
177 177
178 178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
179 179 """The basic engine interface that EngineService will implement.
180 180
181 181 This exists so it is easy to specify adapters that adapt to and from the
182 182 API that the basic EngineService implements.
183 183 """
184 184 pass
185 185
186 186 class IEngineQueued(IEngineBase):
187 187 """Interface for adding a queue to an IEngineBase.
188 188
189 189 This interface extends the IEngineBase interface to add methods for managing
190 190 the engine's queue. The implicit details of this interface are that the
191 191 execution of all methods declared in IEngineBase should appropriately be
192 192 put through a queue before execution.
193 193
194 194 All methods should return deferreds.
195 195 """
196 196
197 197 def clear_queue():
198 198 """Clear the queue."""
199 199
200 200 def queue_status():
201 201 """Get the queued and pending commands in the queue."""
202 202
203 203 def register_failure_observer(obs):
204 204 """Register an observer of pending Failures.
205 205
206 206 The observer must implement IFailureObserver.
207 207 """
208 208
209 209 def unregister_failure_observer(obs):
210 210 """Unregister an observer of pending Failures."""
211 211
212 212
213 213 class IEngineThreaded(zi.Interface):
214 214 """A place holder for threaded commands.
215 215
216 216 All methods should return deferreds.
217 217 """
218 218 pass
219 219
220 220
221 221 #-------------------------------------------------------------------------------
222 222 # Functions and classes to implement the EngineService
223 223 #-------------------------------------------------------------------------------
224 224
225 225
226 226 class StrictDict(dict):
227 227 """This is a strict copying dictionary for use as the interface to the
228 228 properties of an Engine.
229 229
230 230 :IMPORTANT:
231 231 This object copies the values you set to it, and returns copies to you
232 232 when you request them. The only way to change properties os explicitly
233 233 through the setitem and getitem of the dictionary interface.
234 234
235 235 Example:
236 236 >>> e = get_engine(id)
237 237 >>> L = [1,2,3]
238 238 >>> e.properties['L'] = L
239 239 >>> L == e.properties['L']
240 240 True
241 241 >>> L.append(99)
242 242 >>> L == e.properties['L']
243 243 False
244 244
245 245 Note that getitem copies, so calls to methods of objects do not affect
246 246 the properties, as seen here:
247 247
248 248 >>> e.properties[1] = range(2)
249 249 >>> print e.properties[1]
250 250 [0, 1]
251 251 >>> e.properties[1].append(2)
252 252 >>> print e.properties[1]
253 253 [0, 1]
254 254 """
255 255 def __init__(self, *args, **kwargs):
256 256 dict.__init__(self, *args, **kwargs)
257 257 self.modified = True
258 258
259 259 def __getitem__(self, key):
260 260 return copy.deepcopy(dict.__getitem__(self, key))
261 261
262 262 def __setitem__(self, key, value):
263 263 # check if this entry is valid for transport around the network
264 264 # and copying
265 265 try:
266 266 pickle.dumps(key, 2)
267 267 pickle.dumps(value, 2)
268 268 newvalue = copy.deepcopy(value)
269 269 except:
270 270 raise error.InvalidProperty(value)
271 271 dict.__setitem__(self, key, newvalue)
272 272 self.modified = True
273 273
274 274 def __delitem__(self, key):
275 275 dict.__delitem__(self, key)
276 276 self.modified = True
277 277
278 278 def update(self, dikt):
279 279 for k,v in dikt.iteritems():
280 280 self[k] = v
281 281
282 282 def pop(self, key):
283 283 self.modified = True
284 284 return dict.pop(self, key)
285 285
286 286 def popitem(self):
287 287 self.modified = True
288 288 return dict.popitem(self)
289 289
290 290 def clear(self):
291 291 self.modified = True
292 292 dict.clear(self)
293 293
294 294 def subDict(self, *keys):
295 295 d = {}
296 296 for key in keys:
297 297 d[key] = self[key]
298 298 return d
299 299
300 300
301 301
302 302 class EngineAPI(object):
303 303 """This is the object through which the user can edit the `properties`
304 304 attribute of an Engine.
305 305 The Engine Properties object copies all object in and out of itself.
306 306 See the EngineProperties object for details.
307 307 """
308 308 _fix=False
309 309 def __init__(self, id):
310 310 self.id = id
311 311 self.properties = StrictDict()
312 312 self._fix=True
313 313
314 314 def __setattr__(self, k,v):
315 315 if self._fix:
316 316 raise error.KernelError("I am protected!")
317 317 else:
318 318 object.__setattr__(self, k, v)
319 319
320 320 def __delattr__(self, key):
321 321 raise error.KernelError("I am protected!")
322 322
323 323
324 324 _apiDict = {}
325 325
326 326 def get_engine(id):
327 327 """Get the Engine API object, whcih currently just provides the properties
328 328 object, by ID"""
329 329 global _apiDict
330 330 if not _apiDict.get(id):
331 331 _apiDict[id] = EngineAPI(id)
332 332 return _apiDict[id]
333 333
334 334 def drop_engine(id):
335 335 """remove an engine"""
336 336 global _apiDict
337 337 if _apiDict.has_key(id):
338 338 del _apiDict[id]
339 339
340 340 class EngineService(object, service.Service):
341 341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
342 342
343 343 zi.implements(IEngineBase)
344 344 name = 'EngineService'
345 345
346 346 def __init__(self, shellClass=Interpreter, mpi=None):
347 347 """Create an EngineService.
348 348
349 349 shellClass: something that implements IInterpreter or core1
350 350 mpi: an mpi module that has rank and size attributes
351 351 """
352 352 self.shellClass = shellClass
353 353 self.shell = self.shellClass()
354 354 self.mpi = mpi
355 355 self.id = None
356 356 self.properties = get_engine(self.id).properties
357 357 if self.mpi is not None:
358 358 log.msg("MPI started with rank = %i and size = %i" %
359 359 (self.mpi.rank, self.mpi.size))
360 360 self.id = self.mpi.rank
361 361 self._seedNamespace()
362 362
363 363 # Make id a property so that the shell can get the updated id
364 364
365 365 def _setID(self, id):
366 366 self._id = id
367 367 self.properties = get_engine(id).properties
368 368 self.shell.push({'id': id})
369 369
370 370 def _getID(self):
371 371 return self._id
372 372
373 373 id = property(_getID, _setID)
374 374
375 375 def _seedNamespace(self):
376 376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
377 377
378 378 def executeAndRaise(self, msg, callable, *args, **kwargs):
379 379 """Call a method of self.shell and wrap any exception."""
380 380 d = defer.Deferred()
381 381 try:
382 382 result = callable(*args, **kwargs)
383 383 except:
384 384 # This gives the following:
385 385 # et=exception class
386 386 # ev=exception class instance
387 387 # tb=traceback object
388 388 et,ev,tb = sys.exc_info()
389 389 # This call adds attributes to the exception value
390 390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
391 391 # Add another attribute
392 392 ev._ipython_engine_info = msg
393 393 f = failure.Failure(ev,et,None)
394 394 d.errback(f)
395 395 else:
396 396 d.callback(result)
397 397
398 398 return d
399 399
400 400
401 401 # The IEngine methods. See the interface for documentation.
402 402
403 @profile
404 403 def execute(self, lines):
405 404 msg = {'engineid':self.id,
406 405 'method':'execute',
407 406 'args':[lines]}
408 407 d = self.executeAndRaise(msg, self.shell.execute, lines)
409 408 d.addCallback(self.addIDToResult)
410 409 return d
411 410
412 411 def addIDToResult(self, result):
413 412 result['id'] = self.id
414 413 return result
415 414
416 415 def push(self, namespace):
417 416 msg = {'engineid':self.id,
418 417 'method':'push',
419 418 'args':[repr(namespace.keys())]}
420 419 d = self.executeAndRaise(msg, self.shell.push, namespace)
421 420 return d
422 421
423 422 def pull(self, keys):
424 423 msg = {'engineid':self.id,
425 424 'method':'pull',
426 425 'args':[repr(keys)]}
427 426 d = self.executeAndRaise(msg, self.shell.pull, keys)
428 427 return d
429 428
430 429 def push_function(self, namespace):
431 430 msg = {'engineid':self.id,
432 431 'method':'push_function',
433 432 'args':[repr(namespace.keys())]}
434 433 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
435 434 return d
436 435
437 436 def pull_function(self, keys):
438 437 msg = {'engineid':self.id,
439 438 'method':'pull_function',
440 439 'args':[repr(keys)]}
441 440 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
442 441 return d
443 442
444 443 def get_result(self, i=None):
445 444 msg = {'engineid':self.id,
446 445 'method':'get_result',
447 446 'args':[repr(i)]}
448 447 d = self.executeAndRaise(msg, self.shell.getCommand, i)
449 448 d.addCallback(self.addIDToResult)
450 449 return d
451 450
452 451 def reset(self):
453 452 msg = {'engineid':self.id,
454 453 'method':'reset',
455 454 'args':[]}
456 455 del self.shell
457 456 self.shell = self.shellClass()
458 457 self.properties.clear()
459 458 d = self.executeAndRaise(msg, self._seedNamespace)
460 459 return d
461 460
462 461 def kill(self):
463 462 drop_engine(self.id)
464 463 try:
465 464 reactor.stop()
466 465 except RuntimeError:
467 466 log.msg('The reactor was not running apparently.')
468 467 return defer.fail()
469 468 else:
470 469 return defer.succeed(None)
471 470
472 471 def keys(self):
473 472 """Return a list of variables names in the users top level namespace.
474 473
475 474 This used to return a dict of all the keys/repr(values) in the
476 475 user's namespace. This was too much info for the ControllerService
477 476 to handle so it is now just a list of keys.
478 477 """
479 478
480 479 remotes = []
481 480 for k in self.shell.user_ns.iterkeys():
482 481 if k not in ['__name__', '_ih', '_oh', '__builtins__',
483 482 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
484 483 remotes.append(k)
485 484 return defer.succeed(remotes)
486 485
487 486 def set_properties(self, properties):
488 487 msg = {'engineid':self.id,
489 488 'method':'set_properties',
490 489 'args':[repr(properties.keys())]}
491 490 return self.executeAndRaise(msg, self.properties.update, properties)
492 491
493 492 def get_properties(self, keys=None):
494 493 msg = {'engineid':self.id,
495 494 'method':'get_properties',
496 495 'args':[repr(keys)]}
497 496 if keys is None:
498 497 keys = self.properties.keys()
499 498 return self.executeAndRaise(msg, self.properties.subDict, *keys)
500 499
501 500 def _doDel(self, keys):
502 501 for key in keys:
503 502 del self.properties[key]
504 503
505 504 def del_properties(self, keys):
506 505 msg = {'engineid':self.id,
507 506 'method':'del_properties',
508 507 'args':[repr(keys)]}
509 508 return self.executeAndRaise(msg, self._doDel, keys)
510 509
511 510 def _doHas(self, keys):
512 511 return [self.properties.has_key(key) for key in keys]
513 512
514 513 def has_properties(self, keys):
515 514 msg = {'engineid':self.id,
516 515 'method':'has_properties',
517 516 'args':[repr(keys)]}
518 517 return self.executeAndRaise(msg, self._doHas, keys)
519 518
520 519 def clear_properties(self):
521 520 msg = {'engineid':self.id,
522 521 'method':'clear_properties',
523 522 'args':[]}
524 523 return self.executeAndRaise(msg, self.properties.clear)
525 524
526 525 def push_serialized(self, sNamespace):
527 526 msg = {'engineid':self.id,
528 527 'method':'push_serialized',
529 528 'args':[repr(sNamespace.keys())]}
530 529 ns = {}
531 530 for k,v in sNamespace.iteritems():
532 531 try:
533 532 unserialized = newserialized.IUnSerialized(v)
534 533 ns[k] = unserialized.getObject()
535 534 except:
536 535 return defer.fail()
537 536 return self.executeAndRaise(msg, self.shell.push, ns)
538 537
539 538 def pull_serialized(self, keys):
540 539 msg = {'engineid':self.id,
541 540 'method':'pull_serialized',
542 541 'args':[repr(keys)]}
543 542 if isinstance(keys, str):
544 543 keys = [keys]
545 544 if len(keys)==1:
546 545 d = self.executeAndRaise(msg, self.shell.pull, keys)
547 546 d.addCallback(newserialized.serialize)
548 547 return d
549 548 elif len(keys)>1:
550 549 d = self.executeAndRaise(msg, self.shell.pull, keys)
551 550 @d.addCallback
552 551 def packThemUp(values):
553 552 serials = []
554 553 for v in values:
555 554 try:
556 555 serials.append(newserialized.serialize(v))
557 556 except:
558 557 return defer.fail(failure.Failure())
559 558 return serials
560 559 return packThemUp
561 560
562 561
563 562 def queue(methodToQueue):
564 563 def queuedMethod(this, *args, **kwargs):
565 564 name = methodToQueue.__name__
566 565 return this.submitCommand(Command(name, *args, **kwargs))
567 566 return queuedMethod
568 567
569 568 class QueuedEngine(object):
570 569 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
571 570
572 571 The resulting object will implement IEngineQueued which extends
573 572 IEngineCore which extends (IEngineBase, IEngineSerialized).
574 573
575 574 This seems like the best way of handling it, but I am not sure. The
576 575 other option is to have the various base interfaces be used like
577 576 mix-in intefaces. The problem I have with this is adpatation is
578 577 more difficult and complicated because there can be can multiple
579 578 original and final Interfaces.
580 579 """
581 580
582 581 zi.implements(IEngineQueued)
583 582
584 583 def __init__(self, engine):
585 584 """Create a QueuedEngine object from an engine
586 585
587 586 engine: An implementor of IEngineCore and IEngineSerialized
588 587 keepUpToDate: whether to update the remote status when the
589 588 queue is empty. Defaults to False.
590 589 """
591 590
592 591 # This is the right way to do these tests rather than
593 592 # IEngineCore in list(zi.providedBy(engine)) which will only
594 593 # picks of the interfaces that are directly declared by engine.
595 594 assert IEngineBase.providedBy(engine), \
596 595 "engine passed to QueuedEngine doesn't provide IEngineBase"
597 596
598 597 self.engine = engine
599 598 self.id = engine.id
600 599 self.queued = []
601 600 self.history = {}
602 601 self.engineStatus = {}
603 602 self.currentCommand = None
604 603 self.failureObservers = []
605 604
606 605 def _get_properties(self):
607 606 return self.engine.properties
608 607
609 608 properties = property(_get_properties, lambda self, _: None)
610 609 # Queue management methods. You should not call these directly
611 610
612 611 def submitCommand(self, cmd):
613 612 """Submit command to queue."""
614 613
615 614 d = defer.Deferred()
616 615 cmd.setDeferred(d)
617 616 if self.currentCommand is not None:
618 617 if self.currentCommand.finished:
619 618 # log.msg("Running command immediately: %r" % cmd)
620 619 self.currentCommand = cmd
621 620 self.runCurrentCommand()
622 621 else: # command is still running
623 622 # log.msg("Command is running: %r" % self.currentCommand)
624 623 # log.msg("Queueing: %r" % cmd)
625 624 self.queued.append(cmd)
626 625 else:
627 626 # log.msg("No current commands, running: %r" % cmd)
628 627 self.currentCommand = cmd
629 628 self.runCurrentCommand()
630 629 return d
631 630
632 631 def runCurrentCommand(self):
633 632 """Run current command."""
634 633
635 634 cmd = self.currentCommand
636 635 f = getattr(self.engine, cmd.remoteMethod, None)
637 636 if f:
638 637 d = f(*cmd.args, **cmd.kwargs)
639 638 if cmd.remoteMethod is 'execute':
640 639 d.addCallback(self.saveResult)
641 640 d.addCallback(self.finishCommand)
642 641 d.addErrback(self.abortCommand)
643 642 else:
644 643 return defer.fail(AttributeError(cmd.remoteMethod))
645 644
646 645 def _flushQueue(self):
647 646 """Pop next command in queue and run it."""
648 647
649 648 if len(self.queued) > 0:
650 649 self.currentCommand = self.queued.pop(0)
651 650 self.runCurrentCommand()
652 651
653 652 def saveResult(self, result):
654 653 """Put the result in the history."""
655 654 self.history[result['number']] = result
656 655 return result
657 656
658 657 def finishCommand(self, result):
659 658 """Finish currrent command."""
660 659
661 660 # The order of these commands is absolutely critical.
662 661 self.currentCommand.handleResult(result)
663 662 self.currentCommand.finished = True
664 663 self._flushQueue()
665 664 return result
666 665
667 666 def abortCommand(self, reason):
668 667 """Abort current command.
669 668
670 669 This eats the Failure but first passes it onto the Deferred that the
671 670 user has.
672 671
673 672 It also clear out the queue so subsequence commands don't run.
674 673 """
675 674
676 675 # The order of these 3 commands is absolutely critical. The currentCommand
677 676 # must first be marked as finished BEFORE the queue is cleared and before
678 677 # the current command is sent the failure.
679 678 # Also, the queue must be cleared BEFORE the current command is sent the Failure
680 679 # otherwise the errback chain could trigger new commands to be added to the
681 680 # queue before we clear it. We should clear ONLY the commands that were in
682 681 # the queue when the error occured.
683 682 self.currentCommand.finished = True
684 683 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
685 684 self.clear_queue(msg=s)
686 685 self.currentCommand.handleError(reason)
687 686
688 687 return None
689 688
690 689 #---------------------------------------------------------------------------
691 690 # IEngineCore methods
692 691 #---------------------------------------------------------------------------
693 692
694 693 @queue
695 694 def execute(self, lines):
696 695 pass
697 696
698 697 @queue
699 698 def push(self, namespace):
700 699 pass
701 700
702 701 @queue
703 702 def pull(self, keys):
704 703 pass
705 704
706 705 @queue
707 706 def push_function(self, namespace):
708 707 pass
709 708
710 709 @queue
711 710 def pull_function(self, keys):
712 711 pass
713 712
714 713 def get_result(self, i=None):
715 714 if i is None:
716 715 i = max(self.history.keys()+[None])
717 716
718 717 cmd = self.history.get(i, None)
719 718 # Uncomment this line to disable chaching of results
720 719 #cmd = None
721 720 if cmd is None:
722 721 return self.submitCommand(Command('get_result', i))
723 722 else:
724 723 return defer.succeed(cmd)
725 724
726 725 def reset(self):
727 726 self.clear_queue()
728 727 self.history = {} # reset the cache - I am not sure we should do this
729 728 return self.submitCommand(Command('reset'))
730 729
731 730 def kill(self):
732 731 self.clear_queue()
733 732 return self.submitCommand(Command('kill'))
734 733
735 734 @queue
736 735 def keys(self):
737 736 pass
738 737
739 738 #---------------------------------------------------------------------------
740 739 # IEngineSerialized methods
741 740 #---------------------------------------------------------------------------
742 741
743 742 @queue
744 743 def push_serialized(self, namespace):
745 744 pass
746 745
747 746 @queue
748 747 def pull_serialized(self, keys):
749 748 pass
750 749
751 750 #---------------------------------------------------------------------------
752 751 # IEngineProperties methods
753 752 #---------------------------------------------------------------------------
754 753
755 754 @queue
756 755 def set_properties(self, namespace):
757 756 pass
758 757
759 758 @queue
760 759 def get_properties(self, keys=None):
761 760 pass
762 761
763 762 @queue
764 763 def del_properties(self, keys):
765 764 pass
766 765
767 766 @queue
768 767 def has_properties(self, keys):
769 768 pass
770 769
771 770 @queue
772 771 def clear_properties(self):
773 772 pass
774 773
775 774 #---------------------------------------------------------------------------
776 775 # IQueuedEngine methods
777 776 #---------------------------------------------------------------------------
778 777
779 778 def clear_queue(self, msg=''):
780 779 """Clear the queue, but doesn't cancel the currently running commmand."""
781 780
782 781 for cmd in self.queued:
783 782 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
784 783 self.queued = []
785 784 return defer.succeed(None)
786 785
787 786 def queue_status(self):
788 787 if self.currentCommand is not None:
789 788 if self.currentCommand.finished:
790 789 pending = repr(None)
791 790 else:
792 791 pending = repr(self.currentCommand)
793 792 else:
794 793 pending = repr(None)
795 794 dikt = {'queue':map(repr,self.queued), 'pending':pending}
796 795 return defer.succeed(dikt)
797 796
798 797 def register_failure_observer(self, obs):
799 798 self.failureObservers.append(obs)
800 799
801 800 def unregister_failure_observer(self, obs):
802 801 self.failureObservers.remove(obs)
803 802
804 803
805 804 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
806 805 # IEngineQueued.
807 806 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
808 807
809 808
810 809 class Command(object):
811 810 """A command object that encapslates queued commands.
812 811
813 812 This class basically keeps track of a command that has been queued
814 813 in a QueuedEngine. It manages the deferreds and hold the method to be called
815 814 and the arguments to that method.
816 815 """
817 816
818 817
819 818 def __init__(self, remoteMethod, *args, **kwargs):
820 819 """Build a new Command object."""
821 820
822 821 self.remoteMethod = remoteMethod
823 822 self.args = args
824 823 self.kwargs = kwargs
825 824 self.finished = False
826 825
827 826 def setDeferred(self, d):
828 827 """Sets the deferred attribute of the Command."""
829 828
830 829 self.deferred = d
831 830
832 831 def __repr__(self):
833 832 if not self.args:
834 833 args = ''
835 834 else:
836 835 args = str(self.args)[1:-2] #cut off (...,)
837 836 for k,v in self.kwargs.iteritems():
838 837 if args:
839 838 args += ', '
840 839 args += '%s=%r' %(k,v)
841 840 return "%s(%s)" %(self.remoteMethod, args)
842 841
843 842 def handleResult(self, result):
844 843 """When the result is ready, relay it to self.deferred."""
845 844
846 845 self.deferred.callback(result)
847 846
848 847 def handleError(self, reason):
849 848 """When an error has occured, relay it to self.deferred."""
850 849
851 850 self.deferred.errback(reason)
852 851
853 852 class ThreadedEngineService(EngineService):
854 853 """An EngineService subclass that defers execute commands to a separate
855 854 thread.
856 855
857 856 ThreadedEngineService uses twisted.internet.threads.deferToThread to
858 857 defer execute requests to a separate thread. GUI frontends may want to
859 858 use ThreadedEngineService as the engine in an
860 859 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
861 860 block execution from blocking the GUI thread.
862 861 """
863 862
864 863 zi.implements(IEngineBase)
865 864
866 865 def __init__(self, shellClass=Interpreter, mpi=None):
867 866 EngineService.__init__(self, shellClass, mpi)
868 867
869 868 def wrapped_execute(self, msg, lines):
870 869 """Wrap self.shell.execute to add extra information to tracebacks"""
871 870
872 871 try:
873 872 result = self.shell.execute(lines)
874 873 except Exception,e:
875 874 # This gives the following:
876 875 # et=exception class
877 876 # ev=exception class instance
878 877 # tb=traceback object
879 878 et,ev,tb = sys.exc_info()
880 879 # This call adds attributes to the exception value
881 880 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
882 881 # Add another attribute
883 882
884 883 # Create a new exception with the new attributes
885 884 e = et(ev._ipython_traceback_text)
886 885 e._ipython_engine_info = msg
887 886
888 887 # Re-raise
889 888 raise e
890 889
891 890 return result
892 891
893 892
894 893 def execute(self, lines):
895 894 # Only import this if we are going to use this class
896 895 from twisted.internet import threads
897 896
898 897 msg = {'engineid':self.id,
899 898 'method':'execute',
900 899 'args':[lines]}
901 900
902 901 d = threads.deferToThread(self.wrapped_execute, msg, lines)
903 902 d.addCallback(self.addIDToResult)
904 903 return d
@@ -1,754 +1,753 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3 3
4 4 """Adapt the IPython ControllerServer to IMultiEngine.
5 5
6 6 This module provides classes that adapt a ControllerService to the
7 7 IMultiEngine interface. This interface is a basic interactive interface
8 8 for working with a set of engines where it is desired to have explicit
9 9 access to each registered engine.
10 10
11 11 The classes here are exposed to the network in files like:
12 12
13 13 * multienginevanilla.py
14 14 * multienginepb.py
15 15 """
16 16
17 17 __docformat__ = "restructuredtext en"
18 18
19 19 #-------------------------------------------------------------------------------
20 20 # Copyright (C) 2008 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-------------------------------------------------------------------------------
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Imports
28 28 #-------------------------------------------------------------------------------
29 29
30 30 from new import instancemethod
31 31 from types import FunctionType
32 32
33 33 from twisted.application import service
34 34 from twisted.internet import defer, reactor
35 35 from twisted.python import log, components, failure
36 36 from zope.interface import Interface, implements, Attribute
37 37
38 38 from IPython.tools import growl
39 39 from IPython.kernel.util import printer
40 40 from IPython.kernel.twistedutil import gatherBoth
41 41 from IPython.kernel import map as Map
42 42 from IPython.kernel import error
43 43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 44 from IPython.kernel.controllerservice import \
45 45 ControllerAdapterBase, \
46 46 ControllerService, \
47 47 IControllerBase
48 48
49 49
50 50 #-------------------------------------------------------------------------------
51 51 # Interfaces for the MultiEngine representation of a controller
52 52 #-------------------------------------------------------------------------------
53 53
54 54 class IEngineMultiplexer(Interface):
55 55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56 56
57 57 This class simply acts as a multiplexer of methods that are in the
58 58 various IEngines* interfaces. Thus the methods here are jut like those
59 59 in the IEngine* interfaces, but with an extra first argument, targets.
60 60 The targets argument can have the following forms:
61 61
62 62 * targets = 10 # Engines are indexed by ints
63 63 * targets = [0,1,2,3] # A list of ints
64 64 * targets = 'all' # A string to indicate all targets
65 65
66 66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 67 includes engines not being registered.
68 68
69 69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 70 with length equal to the number of targets. The elements of the list will
71 71 correspond to the return of the corresponding IEngine method.
72 72
73 73 Failures are aggressive, meaning that if an action fails for any target,
74 74 the overall action will fail immediately with that Failure.
75 75
76 76 :Parameters:
77 77 targets : int, list of ints, or 'all'
78 78 Engine ids the action will apply to.
79 79
80 80 :Returns: Deferred to a list of results for each engine.
81 81
82 82 :Exception:
83 83 InvalidEngineID
84 84 If the targets argument is bad or engines aren't registered.
85 85 NoEnginesRegistered
86 86 If there are no engines registered and targets='all'
87 87 """
88 88
89 89 #---------------------------------------------------------------------------
90 90 # Mutiplexed methods
91 91 #---------------------------------------------------------------------------
92 92
93 93 def execute(lines, targets='all'):
94 94 """Execute lines of Python code on targets.
95 95
96 96 See the class docstring for information about targets and possible
97 97 exceptions this method can raise.
98 98
99 99 :Parameters:
100 100 lines : str
101 101 String of python code to be executed on targets.
102 102 """
103 103
104 104 def push(namespace, targets='all'):
105 105 """Push dict namespace into the user's namespace on targets.
106 106
107 107 See the class docstring for information about targets and possible
108 108 exceptions this method can raise.
109 109
110 110 :Parameters:
111 111 namspace : dict
112 112 Dict of key value pairs to be put into the users namspace.
113 113 """
114 114
115 115 def pull(keys, targets='all'):
116 116 """Pull values out of the user's namespace on targets by keys.
117 117
118 118 See the class docstring for information about targets and possible
119 119 exceptions this method can raise.
120 120
121 121 :Parameters:
122 122 keys : tuple of strings
123 123 Sequence of keys to be pulled from user's namespace.
124 124 """
125 125
126 126 def push_function(namespace, targets='all'):
127 127 """"""
128 128
129 129 def pull_function(keys, targets='all'):
130 130 """"""
131 131
132 132 def get_result(i=None, targets='all'):
133 133 """Get the result for command i from targets.
134 134
135 135 See the class docstring for information about targets and possible
136 136 exceptions this method can raise.
137 137
138 138 :Parameters:
139 139 i : int or None
140 140 Command index or None to indicate most recent command.
141 141 """
142 142
143 143 def reset(targets='all'):
144 144 """Reset targets.
145 145
146 146 This clears the users namespace of the Engines, but won't cause
147 147 modules to be reloaded.
148 148 """
149 149
150 150 def keys(targets='all'):
151 151 """Get variable names defined in user's namespace on targets."""
152 152
153 153 def kill(controller=False, targets='all'):
154 154 """Kill the targets Engines and possibly the controller.
155 155
156 156 :Parameters:
157 157 controller : boolean
158 158 Should the controller be killed as well. If so all the
159 159 engines will be killed first no matter what targets is.
160 160 """
161 161
162 162 def push_serialized(namespace, targets='all'):
163 163 """Push a namespace of Serialized objects to targets.
164 164
165 165 :Parameters:
166 166 namespace : dict
167 167 A dict whose keys are the variable names and whose values
168 168 are serialized version of the objects.
169 169 """
170 170
171 171 def pull_serialized(keys, targets='all'):
172 172 """Pull Serialized objects by keys from targets.
173 173
174 174 :Parameters:
175 175 keys : tuple of strings
176 176 Sequence of variable names to pull as serialized objects.
177 177 """
178 178
179 179 def clear_queue(targets='all'):
180 180 """Clear the queue of pending command for targets."""
181 181
182 182 def queue_status(targets='all'):
183 183 """Get the status of the queue on the targets."""
184 184
185 185 def set_properties(properties, targets='all'):
186 186 """set properties by key and value"""
187 187
188 188 def get_properties(keys=None, targets='all'):
189 189 """get a list of properties by `keys`, if no keys specified, get all"""
190 190
191 191 def del_properties(keys, targets='all'):
192 192 """delete properties by `keys`"""
193 193
194 194 def has_properties(keys, targets='all'):
195 195 """get a list of bool values for whether `properties` has `keys`"""
196 196
197 197 def clear_properties(targets='all'):
198 198 """clear the properties dict"""
199 199
200 200
201 201 class IMultiEngine(IEngineMultiplexer):
202 202 """A controller that exposes an explicit interface to all of its engines.
203 203
204 204 This is the primary inteface for interactive usage.
205 205 """
206 206
207 207 def get_ids():
208 208 """Return list of currently registered ids.
209 209
210 210 :Returns: A Deferred to a list of registered engine ids.
211 211 """
212 212
213 213
214 214
215 215 #-------------------------------------------------------------------------------
216 216 # Implementation of the core MultiEngine classes
217 217 #-------------------------------------------------------------------------------
218 218
219 219 class MultiEngine(ControllerAdapterBase):
220 220 """The representation of a ControllerService as a IMultiEngine.
221 221
222 222 Although it is not implemented currently, this class would be where a
223 223 client/notification API is implemented. It could inherit from something
224 224 like results.NotifierParent and then use the notify method to send
225 225 notifications.
226 226 """
227 227
228 228 implements(IMultiEngine)
229 229
230 230 def __init(self, controller):
231 231 ControllerAdapterBase.__init__(self, controller)
232 232
233 233 #---------------------------------------------------------------------------
234 234 # Helper methods
235 235 #---------------------------------------------------------------------------
236 236
237 237 def engineList(self, targets):
238 238 """Parse the targets argument into a list of valid engine objects.
239 239
240 240 :Parameters:
241 241 targets : int, list of ints or 'all'
242 242 The targets argument to be parsed.
243 243
244 244 :Returns: List of engine objects.
245 245
246 246 :Exception:
247 247 InvalidEngineID
248 248 If targets is not valid or if an engine is not registered.
249 249 """
250 250 if isinstance(targets, int):
251 251 if targets not in self.engines.keys():
252 252 log.msg("Engine with id %i is not registered" % targets)
253 253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 254 else:
255 255 return [self.engines[targets]]
256 256 elif isinstance(targets, (list, tuple)):
257 257 for id in targets:
258 258 if id not in self.engines.keys():
259 259 log.msg("Engine with id %r is not registered" % id)
260 260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 261 return map(self.engines.get, targets)
262 262 elif targets == 'all':
263 263 eList = self.engines.values()
264 264 if len(eList) == 0:
265 265 msg = """There are no engines registered.
266 266 Check the logs in ~/.ipython/log if you think there should have been."""
267 267 raise error.NoEnginesRegistered(msg)
268 268 else:
269 269 return eList
270 270 else:
271 271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272 272
273 273 def _performOnEngines(self, methodName, *args, **kwargs):
274 274 """Calls a method on engines and returns deferred to list of results.
275 275
276 276 :Parameters:
277 277 methodName : str
278 278 Name of the method to be called.
279 279 targets : int, list of ints, 'all'
280 280 The targets argument to be parsed into a list of engine objects.
281 281 args
282 282 The positional keyword arguments to be passed to the engines.
283 283 kwargs
284 284 The keyword arguments passed to the method
285 285
286 286 :Returns: List of deferreds to the results on each engine
287 287
288 288 :Exception:
289 289 InvalidEngineID
290 290 If the targets argument is bad in any way.
291 291 AttributeError
292 292 If the method doesn't exist on one of the engines.
293 293 """
294 294 targets = kwargs.pop('targets')
295 295 log.msg("Performing %s on %r" % (methodName, targets))
296 296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 297 # This will and should raise if targets is not valid!
298 298 engines = self.engineList(targets)
299 299 dList = []
300 300 for e in engines:
301 301 meth = getattr(e, methodName, None)
302 302 if meth is not None:
303 303 dList.append(meth(*args, **kwargs))
304 304 else:
305 305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 306 return dList
307 307
308 308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 309 """Called _performOnEngines and wraps result/exception into deferred."""
310 310 try:
311 311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 313 return defer.fail(failure.Failure())
314 314 else:
315 315 # Having fireOnOneErrback is causing problems with the determinacy
316 316 # of the system. Basically, once a single engine has errbacked, this
317 317 # method returns. In some cases, this will cause client to submit
318 318 # another command. Because the previous command is still running
319 319 # on some engines, this command will be queued. When those commands
320 320 # then errback, the second command will raise QueueCleared. Ahhh!
321 321 d = gatherBoth(dList,
322 322 fireOnOneErrback=0,
323 323 consumeErrors=1,
324 324 logErrors=0)
325 325 d.addCallback(error.collect_exceptions, methodName)
326 326 return d
327 327
328 328 #---------------------------------------------------------------------------
329 329 # General IMultiEngine methods
330 330 #---------------------------------------------------------------------------
331 331
332 332 def get_ids(self):
333 333 return defer.succeed(self.engines.keys())
334 334
335 335 #---------------------------------------------------------------------------
336 336 # IEngineMultiplexer methods
337 337 #---------------------------------------------------------------------------
338 338
339 339 def execute(self, lines, targets='all'):
340 340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341 341
342 342 def push(self, ns, targets='all'):
343 343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344 344
345 345 def pull(self, keys, targets='all'):
346 346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347 347
348 348 def push_function(self, ns, targets='all'):
349 349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350 350
351 351 def pull_function(self, keys, targets='all'):
352 352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353 353
354 354 def get_result(self, i=None, targets='all'):
355 355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356 356
357 357 def reset(self, targets='all'):
358 358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359 359
360 360 def keys(self, targets='all'):
361 361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362 362
363 363 def kill(self, controller=False, targets='all'):
364 364 if controller:
365 365 targets = 'all'
366 366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 367 if controller:
368 368 log.msg("Killing controller")
369 369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 370 # Consume any weird stuff coming back
371 371 d.addBoth(lambda _: None)
372 372 return d
373 373
374 374 def push_serialized(self, namespace, targets='all'):
375 375 for k, v in namespace.iteritems():
376 376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 378 return d
379 379
380 380 def pull_serialized(self, keys, targets='all'):
381 381 try:
382 382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 384 return defer.fail(failure.Failure())
385 385 else:
386 386 for d in dList:
387 387 d.addCallback(self._logSizes)
388 388 d = gatherBoth(dList,
389 389 fireOnOneErrback=0,
390 390 consumeErrors=1,
391 391 logErrors=0)
392 392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 393 return d
394 394
395 395 def _logSizes(self, listOfSerialized):
396 396 if isinstance(listOfSerialized, (list, tuple)):
397 397 for s in listOfSerialized:
398 398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 399 else:
400 400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 401 return listOfSerialized
402 402
403 403 def clear_queue(self, targets='all'):
404 404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405 405
406 406 def queue_status(self, targets='all'):
407 407 log.msg("Getting queue status on %r" % targets)
408 408 try:
409 409 engines = self.engineList(targets)
410 410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 411 return defer.fail(failure.Failure())
412 412 else:
413 413 dList = []
414 414 for e in engines:
415 415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 416 d = gatherBoth(dList,
417 417 fireOnOneErrback=0,
418 418 consumeErrors=1,
419 419 logErrors=0)
420 420 d.addCallback(error.collect_exceptions, 'queue_status')
421 421 return d
422 422
423 423 def get_properties(self, keys=None, targets='all'):
424 424 log.msg("Getting properties on %r" % targets)
425 425 try:
426 426 engines = self.engineList(targets)
427 427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 428 return defer.fail(failure.Failure())
429 429 else:
430 430 dList = [e.get_properties(keys) for e in engines]
431 431 d = gatherBoth(dList,
432 432 fireOnOneErrback=0,
433 433 consumeErrors=1,
434 434 logErrors=0)
435 435 d.addCallback(error.collect_exceptions, 'get_properties')
436 436 return d
437 437
438 438 def set_properties(self, properties, targets='all'):
439 439 log.msg("Setting properties on %r" % targets)
440 440 try:
441 441 engines = self.engineList(targets)
442 442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 443 return defer.fail(failure.Failure())
444 444 else:
445 445 dList = [e.set_properties(properties) for e in engines]
446 446 d = gatherBoth(dList,
447 447 fireOnOneErrback=0,
448 448 consumeErrors=1,
449 449 logErrors=0)
450 450 d.addCallback(error.collect_exceptions, 'set_properties')
451 451 return d
452 452
453 453 def has_properties(self, keys, targets='all'):
454 454 log.msg("Checking properties on %r" % targets)
455 455 try:
456 456 engines = self.engineList(targets)
457 457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 458 return defer.fail(failure.Failure())
459 459 else:
460 460 dList = [e.has_properties(keys) for e in engines]
461 461 d = gatherBoth(dList,
462 462 fireOnOneErrback=0,
463 463 consumeErrors=1,
464 464 logErrors=0)
465 465 d.addCallback(error.collect_exceptions, 'has_properties')
466 466 return d
467 467
468 468 def del_properties(self, keys, targets='all'):
469 469 log.msg("Deleting properties on %r" % targets)
470 470 try:
471 471 engines = self.engineList(targets)
472 472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 473 return defer.fail(failure.Failure())
474 474 else:
475 475 dList = [e.del_properties(keys) for e in engines]
476 476 d = gatherBoth(dList,
477 477 fireOnOneErrback=0,
478 478 consumeErrors=1,
479 479 logErrors=0)
480 480 d.addCallback(error.collect_exceptions, 'del_properties')
481 481 return d
482 482
483 483 def clear_properties(self, targets='all'):
484 484 log.msg("Clearing properties on %r" % targets)
485 485 try:
486 486 engines = self.engineList(targets)
487 487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 488 return defer.fail(failure.Failure())
489 489 else:
490 490 dList = [e.clear_properties() for e in engines]
491 491 d = gatherBoth(dList,
492 492 fireOnOneErrback=0,
493 493 consumeErrors=1,
494 494 logErrors=0)
495 495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 496 return d
497 497
498 498
499 499 components.registerAdapter(MultiEngine,
500 500 IControllerBase,
501 501 IMultiEngine)
502 502
503 503
504 504 #-------------------------------------------------------------------------------
505 505 # Interfaces for the Synchronous MultiEngine
506 506 #-------------------------------------------------------------------------------
507 507
508 508 class ISynchronousEngineMultiplexer(Interface):
509 509 pass
510 510
511 511
512 512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 513 """Synchronous, two-phase version of IMultiEngine.
514 514
515 515 Methods in this interface are identical to those of IMultiEngine, but they
516 516 take one additional argument:
517 517
518 518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519 519
520 520 :Parameters:
521 521 block : boolean
522 522 Should the method return a deferred to a deferredID or the
523 523 actual result. If block=False a deferred to a deferredID is
524 524 returned and the user must call `get_pending_deferred` at a later
525 525 point. If block=True, a deferred to the actual result comes back.
526 526 """
527 527 def get_pending_deferred(deferredID, block=True):
528 528 """"""
529 529
530 530 def clear_pending_deferreds():
531 531 """"""
532 532
533 533
534 534 #-------------------------------------------------------------------------------
535 535 # Implementation of the Synchronous MultiEngine
536 536 #-------------------------------------------------------------------------------
537 537
538 538 class SynchronousMultiEngine(PendingDeferredManager):
539 539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540 540
541 541 Warning, this class uses a decorator that currently uses **kwargs.
542 542 Because of this block must be passed as a kwarg, not positionally.
543 543 """
544 544
545 545 implements(ISynchronousMultiEngine)
546 546
547 547 def __init__(self, multiengine):
548 548 self.multiengine = multiengine
549 549 PendingDeferredManager.__init__(self)
550 550
551 551 #---------------------------------------------------------------------------
552 552 # Decorated pending deferred methods
553 553 #---------------------------------------------------------------------------
554 554
555 @profile
556 555 @two_phase
557 556 def execute(self, lines, targets='all'):
558 557 d = self.multiengine.execute(lines, targets)
559 558 return d
560 559
561 560 @two_phase
562 561 def push(self, namespace, targets='all'):
563 562 return self.multiengine.push(namespace, targets)
564 563
565 564 @two_phase
566 565 def pull(self, keys, targets='all'):
567 566 d = self.multiengine.pull(keys, targets)
568 567 return d
569 568
570 569 @two_phase
571 570 def push_function(self, namespace, targets='all'):
572 571 return self.multiengine.push_function(namespace, targets)
573 572
574 573 @two_phase
575 574 def pull_function(self, keys, targets='all'):
576 575 d = self.multiengine.pull_function(keys, targets)
577 576 return d
578 577
579 578 @two_phase
580 579 def get_result(self, i=None, targets='all'):
581 580 return self.multiengine.get_result(i, targets='all')
582 581
583 582 @two_phase
584 583 def reset(self, targets='all'):
585 584 return self.multiengine.reset(targets)
586 585
587 586 @two_phase
588 587 def keys(self, targets='all'):
589 588 return self.multiengine.keys(targets)
590 589
591 590 @two_phase
592 591 def kill(self, controller=False, targets='all'):
593 592 return self.multiengine.kill(controller, targets)
594 593
595 594 @two_phase
596 595 def push_serialized(self, namespace, targets='all'):
597 596 return self.multiengine.push_serialized(namespace, targets)
598 597
599 598 @two_phase
600 599 def pull_serialized(self, keys, targets='all'):
601 600 return self.multiengine.pull_serialized(keys, targets)
602 601
603 602 @two_phase
604 603 def clear_queue(self, targets='all'):
605 604 return self.multiengine.clear_queue(targets)
606 605
607 606 @two_phase
608 607 def queue_status(self, targets='all'):
609 608 return self.multiengine.queue_status(targets)
610 609
611 610 @two_phase
612 611 def set_properties(self, properties, targets='all'):
613 612 return self.multiengine.set_properties(properties, targets)
614 613
615 614 @two_phase
616 615 def get_properties(self, keys=None, targets='all'):
617 616 return self.multiengine.get_properties(keys, targets)
618 617
619 618 @two_phase
620 619 def has_properties(self, keys, targets='all'):
621 620 return self.multiengine.has_properties(keys, targets)
622 621
623 622 @two_phase
624 623 def del_properties(self, keys, targets='all'):
625 624 return self.multiengine.del_properties(keys, targets)
626 625
627 626 @two_phase
628 627 def clear_properties(self, targets='all'):
629 628 return self.multiengine.clear_properties(targets)
630 629
631 630 #---------------------------------------------------------------------------
632 631 # IMultiEngine methods
633 632 #---------------------------------------------------------------------------
634 633
635 634 def get_ids(self):
636 635 """Return a list of registered engine ids.
637 636
638 637 Never use the two phase block/non-block stuff for this.
639 638 """
640 639 return self.multiengine.get_ids()
641 640
642 641
643 642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
644 643
645 644
646 645 #-------------------------------------------------------------------------------
647 646 # Various high-level interfaces that can be used as MultiEngine mix-ins
648 647 #-------------------------------------------------------------------------------
649 648
650 649 #-------------------------------------------------------------------------------
651 650 # IMultiEngineCoordinator
652 651 #-------------------------------------------------------------------------------
653 652
654 653 class IMultiEngineCoordinator(Interface):
655 654 """Methods that work on multiple engines explicitly."""
656 655
657 656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
658 657 """Partition and distribute a sequence to targets."""
659 658
660 659 def gather(key, dist='b', targets='all'):
661 660 """Gather object key from targets."""
662 661
663 662 def raw_map(func, seqs, dist='b', targets='all'):
664 663 """
665 664 A parallelized version of Python's builtin `map` function.
666 665
667 666 This has a slightly different syntax than the builtin `map`.
668 667 This is needed because we need to have keyword arguments and thus
669 668 can't use *args to capture all the sequences. Instead, they must
670 669 be passed in a list or tuple.
671 670
672 671 The equivalence is:
673 672
674 673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
675 674
676 675 Most users will want to use parallel functions or the `mapper`
677 676 and `map` methods for an API that follows that of the builtin
678 677 `map`.
679 678 """
680 679
681 680
682 681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
683 682 """Methods that work on multiple engines explicitly."""
684 683
685 684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
686 685 """Partition and distribute a sequence to targets."""
687 686
688 687 def gather(key, dist='b', targets='all', block=True):
689 688 """Gather object key from targets"""
690 689
691 690 def raw_map(func, seqs, dist='b', targets='all', block=True):
692 691 """
693 692 A parallelized version of Python's builtin map.
694 693
695 694 This has a slightly different syntax than the builtin `map`.
696 695 This is needed because we need to have keyword arguments and thus
697 696 can't use *args to capture all the sequences. Instead, they must
698 697 be passed in a list or tuple.
699 698
700 699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
701 700
702 701 Most users will want to use parallel functions or the `mapper`
703 702 and `map` methods for an API that follows that of the builtin
704 703 `map`.
705 704 """
706 705
707 706
708 707 #-------------------------------------------------------------------------------
709 708 # IMultiEngineExtras
710 709 #-------------------------------------------------------------------------------
711 710
712 711 class IMultiEngineExtras(Interface):
713 712
714 713 def zip_pull(targets, keys):
715 714 """
716 715 Pull, but return results in a different format from `pull`.
717 716
718 717 This method basically returns zip(pull(targets, *keys)), with a few
719 718 edge cases handled differently. Users of chainsaw will find this format
720 719 familiar.
721 720 """
722 721
723 722 def run(targets, fname):
724 723 """Run a .py file on targets."""
725 724
726 725
727 726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
728 727 def zip_pull(targets, keys, block=True):
729 728 """
730 729 Pull, but return results in a different format from `pull`.
731 730
732 731 This method basically returns zip(pull(targets, *keys)), with a few
733 732 edge cases handled differently. Users of chainsaw will find this format
734 733 familiar.
735 734 """
736 735
737 736 def run(targets, fname, block=True):
738 737 """Run a .py file on targets."""
739 738
740 739 #-------------------------------------------------------------------------------
741 740 # The full MultiEngine interface
742 741 #-------------------------------------------------------------------------------
743 742
744 743 class IFullMultiEngine(IMultiEngine,
745 744 IMultiEngineCoordinator,
746 745 IMultiEngineExtras):
747 746 pass
748 747
749 748
750 749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
751 750 ISynchronousMultiEngineCoordinator,
752 751 ISynchronousMultiEngineExtras):
753 752 pass
754 753
@@ -1,658 +1,723 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import get_ipython_dir, num_cpus
33 33 from IPython.kernel.fcutil import have_crypto
34 34 from IPython.kernel.error import SecurityError
35 35 from IPython.kernel.fcutil import have_crypto
36 36 from IPython.kernel.twistedutil import gatherBoth
37 37 from IPython.kernel.util import printer
38 38
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # General process handling code
42 42 #-----------------------------------------------------------------------------
43 43
44 44 def find_exe(cmd):
45 45 try:
46 46 import win32api
47 47 except ImportError:
48 48 raise ImportError('you need to have pywin32 installed for this to work')
49 49 else:
50 50 try:
51 51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 52 except:
53 53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 54 return path
55 55
56 56 class ProcessStateError(Exception):
57 57 pass
58 58
59 59 class UnknownStatus(Exception):
60 60 pass
61 61
62 62 class LauncherProcessProtocol(ProcessProtocol):
63 63 """
64 64 A ProcessProtocol to go with the ProcessLauncher.
65 65 """
66 66 def __init__(self, process_launcher):
67 67 self.process_launcher = process_launcher
68 68
69 69 def connectionMade(self):
70 70 self.process_launcher.fire_start_deferred(self.transport.pid)
71 71
72 72 def processEnded(self, status):
73 73 value = status.value
74 74 if isinstance(value, ProcessDone):
75 75 self.process_launcher.fire_stop_deferred(0)
76 76 elif isinstance(value, ProcessTerminated):
77 77 self.process_launcher.fire_stop_deferred(
78 78 {'exit_code':value.exitCode,
79 79 'signal':value.signal,
80 80 'status':value.status
81 81 }
82 82 )
83 83 else:
84 84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85 85
86 86 def outReceived(self, data):
87 87 log.msg(data)
88 88
89 89 def errReceived(self, data):
90 90 log.err(data)
91 91
92 92 class ProcessLauncher(object):
93 93 """
94 94 Start and stop an external process in an asynchronous manner.
95 95
96 96 Currently this uses deferreds to notify other parties of process state
97 97 changes. This is an awkward design and should be moved to using
98 98 a formal NotificationCenter.
99 99 """
100 100 def __init__(self, cmd_and_args):
101 101 self.cmd = cmd_and_args[0]
102 102 self.args = cmd_and_args
103 103 self._reset()
104 104
105 105 def _reset(self):
106 106 self.process_protocol = None
107 107 self.pid = None
108 108 self.start_deferred = None
109 109 self.stop_deferreds = []
110 110 self.state = 'before' # before, running, or after
111 111
112 112 @property
113 113 def running(self):
114 114 if self.state == 'running':
115 115 return True
116 116 else:
117 117 return False
118 118
119 119 def fire_start_deferred(self, pid):
120 120 self.pid = pid
121 121 self.state = 'running'
122 122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 123 self.start_deferred.callback(pid)
124 124
125 125 def start(self):
126 126 if self.state == 'before':
127 127 self.process_protocol = LauncherProcessProtocol(self)
128 128 self.start_deferred = defer.Deferred()
129 129 self.process_transport = reactor.spawnProcess(
130 130 self.process_protocol,
131 131 self.cmd,
132 132 self.args,
133 133 env=os.environ
134 134 )
135 135 return self.start_deferred
136 136 else:
137 137 s = 'the process has already been started and has state: %r' % \
138 138 self.state
139 139 return defer.fail(ProcessStateError(s))
140 140
141 141 def get_stop_deferred(self):
142 142 if self.state == 'running' or self.state == 'before':
143 143 d = defer.Deferred()
144 144 self.stop_deferreds.append(d)
145 145 return d
146 146 else:
147 147 s = 'this process is already complete'
148 148 return defer.fail(ProcessStateError(s))
149 149
150 150 def fire_stop_deferred(self, exit_code):
151 151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 152 self.state = 'after'
153 153 for d in self.stop_deferreds:
154 154 d.callback(exit_code)
155 155
156 156 def signal(self, sig):
157 157 """
158 158 Send a signal to the process.
159 159
160 160 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 161 """
162 162 if self.state == 'running':
163 163 self.process_transport.signalProcess(sig)
164 164
165 165 # def __del__(self):
166 166 # self.signal('KILL')
167 167
168 168 def interrupt_then_kill(self, delay=1.0):
169 169 self.signal('INT')
170 170 reactor.callLater(delay, self.signal, 'KILL')
171 171
172 172
173 173 #-----------------------------------------------------------------------------
174 174 # Code for launching controller and engines
175 175 #-----------------------------------------------------------------------------
176 176
177 177
178 178 class ControllerLauncher(ProcessLauncher):
179 179
180 180 def __init__(self, extra_args=None):
181 181 if sys.platform == 'win32':
182 182 # This logic is needed because the ipcontroller script doesn't
183 183 # always get installed in the same way or in the same location.
184 184 from IPython.kernel.scripts import ipcontroller
185 185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 186 # The -u option here turns on unbuffered output, which is required
187 187 # on Win32 to prevent wierd conflict and problems with Twisted
188 188 args = [find_exe('python'), '-u', script_location]
189 189 else:
190 190 args = ['ipcontroller']
191 191 self.extra_args = extra_args
192 192 if extra_args is not None:
193 193 args.extend(extra_args)
194 194
195 195 ProcessLauncher.__init__(self, args)
196 196
197 197
198 198 class EngineLauncher(ProcessLauncher):
199 199
200 200 def __init__(self, extra_args=None):
201 201 if sys.platform == 'win32':
202 202 # This logic is needed because the ipcontroller script doesn't
203 203 # always get installed in the same way or in the same location.
204 204 from IPython.kernel.scripts import ipengine
205 205 script_location = ipengine.__file__.replace('.pyc', '.py')
206 206 # The -u option here turns on unbuffered output, which is required
207 207 # on Win32 to prevent wierd conflict and problems with Twisted
208 208 args = [find_exe('python'), '-u', script_location]
209 209 else:
210 210 args = ['ipengine']
211 211 self.extra_args = extra_args
212 212 if extra_args is not None:
213 213 args.extend(extra_args)
214 214
215 215 ProcessLauncher.__init__(self, args)
216 216
217 217
218 218 class LocalEngineSet(object):
219 219
220 220 def __init__(self, extra_args=None):
221 221 self.extra_args = extra_args
222 222 self.launchers = []
223 223
224 224 def start(self, n):
225 225 dlist = []
226 226 for i in range(n):
227 227 el = EngineLauncher(extra_args=self.extra_args)
228 228 d = el.start()
229 229 self.launchers.append(el)
230 230 dlist.append(d)
231 231 dfinal = gatherBoth(dlist, consumeErrors=True)
232 232 dfinal.addCallback(self._handle_start)
233 233 return dfinal
234 234
235 235 def _handle_start(self, r):
236 236 log.msg('Engines started with pids: %r' % r)
237 237 return r
238 238
239 239 def _handle_stop(self, r):
240 240 log.msg('Engines received signal: %r' % r)
241 241 return r
242 242
243 243 def signal(self, sig):
244 244 dlist = []
245 245 for el in self.launchers:
246 246 d = el.get_stop_deferred()
247 247 dlist.append(d)
248 248 el.signal(sig)
249 249 dfinal = gatherBoth(dlist, consumeErrors=True)
250 250 dfinal.addCallback(self._handle_stop)
251 251 return dfinal
252 252
253 253 def interrupt_then_kill(self, delay=1.0):
254 254 dlist = []
255 255 for el in self.launchers:
256 256 d = el.get_stop_deferred()
257 257 dlist.append(d)
258 258 el.interrupt_then_kill(delay)
259 259 dfinal = gatherBoth(dlist, consumeErrors=True)
260 260 dfinal.addCallback(self._handle_stop)
261 261 return dfinal
262 262
263 263
264 264 class BatchEngineSet(object):
265 265
266 266 # Subclasses must fill these in. See PBSEngineSet
267 267 submit_command = ''
268 268 delete_command = ''
269 269 job_id_regexp = ''
270 270
271 271 def __init__(self, template_file, **kwargs):
272 272 self.template_file = template_file
273 273 self.context = {}
274 274 self.context.update(kwargs)
275 275 self.batch_file = self.template_file+'-run'
276 276
277 277 def parse_job_id(self, output):
278 278 m = re.match(self.job_id_regexp, output)
279 279 if m is not None:
280 280 job_id = m.group()
281 281 else:
282 282 raise Exception("job id couldn't be determined: %s" % output)
283 283 self.job_id = job_id
284 284 log.msg('Job started with job id: %r' % job_id)
285 285 return job_id
286 286
287 287 def write_batch_script(self, n):
288 288 self.context['n'] = n
289 289 template = open(self.template_file, 'r').read()
290 290 log.msg('Using template for batch script: %s' % self.template_file)
291 291 script_as_string = Itpl.itplns(template, self.context)
292 292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
293 293 f = open(self.batch_file,'w')
294 294 f.write(script_as_string)
295 295 f.close()
296 296
297 297 def handle_error(self, f):
298 298 f.printTraceback()
299 299 f.raiseException()
300 300
301 301 def start(self, n):
302 302 self.write_batch_script(n)
303 303 d = getProcessOutput(self.submit_command,
304 304 [self.batch_file],env=os.environ)
305 305 d.addCallback(self.parse_job_id)
306 306 d.addErrback(self.handle_error)
307 307 return d
308 308
309 309 def kill(self):
310 310 d = getProcessOutput(self.delete_command,
311 311 [self.job_id],env=os.environ)
312 312 return d
313 313
314 314 class PBSEngineSet(BatchEngineSet):
315 315
316 316 submit_command = 'qsub'
317 317 delete_command = 'qdel'
318 318 job_id_regexp = '\d+'
319 319
320 320 def __init__(self, template_file, **kwargs):
321 321 BatchEngineSet.__init__(self, template_file, **kwargs)
322 322
323 class SSHEngineSet(object):
323
324 324 sshx_template="""#!/bin/sh
325 325 "$@" &> /dev/null &
326 echo $!"""
326 echo $!
327 """
327 328
328 329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
329 332
330 ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM"""
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
331 336
332 337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
339
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
344 """
345
333 346 self.temp_dir = tempfile.gettempdir()
334 if sshx != None:
347 if sshx is not None:
335 348 self.sshx = sshx
336 349 else:
337 self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER'])
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
338 355 f = open(self.sshx, 'w')
339 356 f.writelines(self.sshx_template)
340 357 f.close()
341 358 self.engine_command = ipengine
342 359 self.engine_hosts = engine_hosts
343 self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER'])
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
344 365 f = open(self.engine_killer, 'w')
345 366 f.writelines(self.engine_killer_template)
346 367 f.close()
347 368
348 369 def start(self, send_furl=False):
370 dlist = []
349 371 for host in self.engine_hosts.keys():
350 372 count = self.engine_hosts[host]
351 self._start(host, count, send_furl)
373 d = self._start(host, count, send_furl)
374 dlist.append(d)
375 return gatherBoth(dlist, consumeErrors=True)
352 376
353 def killall(self):
354 for host in self.engine_hosts.keys():
355 self._killall(host)
377 def _start(self, hostname, count=1, send_furl=False):
378 if send_furl:
379 d = self._scp_furl(hostname)
380 else:
381 d = defer.succeed(None)
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 return d
356 385
357 def _start(self, host_name, count=1, send_furl=False):
386 def _scp_furl(self, hostname):
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 cmd_list = scp_cmd.split()
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 log.msg('Copying furl file: %s' % scp_cmd)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 return d
358 393
359 def _scp_sshx(d):
360 scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER'])
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
398 )
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
361 401 sshx_scp = scp_cmd.split()
362 print sshx_scp
363 402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
364 d.addCallback(_exec_engine)
403 return d
365 404
366 def _exec_engine(d):
367 exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command)
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
409 )
368 410 cmds = exec_engine.split()
369 print cmds
411 dlist = []
412 log.msg("about to start engines...")
370 413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
371 415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
372 418
373 if send_furl:
374 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name)
375 cmd_list = scp_cmd.split()
376 cmd_list[1] = os.path.expanduser(cmd_list[1])
377 print cmd_list
378 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
379 d.addCallback(_scp_sshx)
380 else:
381 _scp_sshx(d=None)
419 def kill(self):
420 dlist = []
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
382 425
383 def _killall(self, host_name):
384 def _exec_err(d):
385 if d.getErrorMessage()[-18:] != "No such process\\n\'":
386 raise d
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
430 return d
387 431
388 def _exec_kill(d):
389 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER'])
390 kill_cmd = kill_cmd.split()
391 print kill_cmd
392 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
393 d.addErrback(_exec_err)
394 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER'])
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
435 hostname,
436 self.temp_dir,
437 os.environ['USER']
438 )
395 439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
396 441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
397 d.addCallback(_exec_kill)
398 d.addErrback(_exec_err)
442 return d
399 443
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
447 self.temp_dir,
448 os.environ['USER']
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
454
455 def _exec_err(self, r):
456 log.msg(r)
400 457
401 458 #-----------------------------------------------------------------------------
402 459 # Main functions for the different types of clusters
403 460 #-----------------------------------------------------------------------------
404 461
405 462 # TODO:
406 463 # The logic in these codes should be moved into classes like LocalCluster
407 464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
408 465 # The main functions should then just parse the command line arguments, create
409 466 # the appropriate class and call a 'start' method.
410 467
411 468 def check_security(args, cont_args):
412 469 if (not args.x or not args.y) and not have_crypto:
413 470 log.err("""
414 471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
415 472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
416 473 reactor.stop()
417 474 return False
418 475 if args.x:
419 476 cont_args.append('-x')
420 477 if args.y:
421 478 cont_args.append('-y')
422 479 return True
423 480
424 481
425 482 def main_local(args):
426 483 cont_args = []
427 484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
428 485
429 486 # Check security settings before proceeding
430 487 if not check_security(args, cont_args):
431 488 return
432 489
433 490 cl = ControllerLauncher(extra_args=cont_args)
434 491 dstart = cl.start()
435 492 def start_engines(cont_pid):
436 493 engine_args = []
437 494 engine_args.append('--logfile=%s' % \
438 495 pjoin(args.logdir,'ipengine%s-' % cont_pid))
439 496 eset = LocalEngineSet(extra_args=engine_args)
440 497 def shutdown(signum, frame):
441 498 log.msg('Stopping local cluster')
442 499 # We are still playing with the times here, but these seem
443 500 # to be reliable in allowing everything to exit cleanly.
444 501 eset.interrupt_then_kill(0.5)
445 502 cl.interrupt_then_kill(0.5)
446 503 reactor.callLater(1.0, reactor.stop)
447 504 signal.signal(signal.SIGINT,shutdown)
448 505 d = eset.start(args.n)
449 506 return d
450 507 def delay_start(cont_pid):
451 508 # This is needed because the controller doesn't start listening
452 509 # right when it starts and the controller needs to write
453 510 # furl files for the engine to pick up
454 511 reactor.callLater(1.0, start_engines, cont_pid)
455 512 dstart.addCallback(delay_start)
456 513 dstart.addErrback(lambda f: f.raiseException())
457 514
458 515
459 516 def main_mpirun(args):
460 517 cont_args = []
461 518 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
462 519
463 520 # Check security settings before proceeding
464 521 if not check_security(args, cont_args):
465 522 return
466 523
467 524 cl = ControllerLauncher(extra_args=cont_args)
468 525 dstart = cl.start()
469 526 def start_engines(cont_pid):
470 527 raw_args = ['mpirun']
471 528 raw_args.extend(['-n',str(args.n)])
472 529 raw_args.append('ipengine')
473 530 raw_args.append('-l')
474 531 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
475 532 if args.mpi:
476 533 raw_args.append('--mpi=%s' % args.mpi)
477 534 eset = ProcessLauncher(raw_args)
478 535 def shutdown(signum, frame):
479 536 log.msg('Stopping local cluster')
480 537 # We are still playing with the times here, but these seem
481 538 # to be reliable in allowing everything to exit cleanly.
482 539 eset.interrupt_then_kill(1.0)
483 540 cl.interrupt_then_kill(1.0)
484 541 reactor.callLater(2.0, reactor.stop)
485 542 signal.signal(signal.SIGINT,shutdown)
486 543 d = eset.start()
487 544 return d
488 545 def delay_start(cont_pid):
489 546 # This is needed because the controller doesn't start listening
490 547 # right when it starts and the controller needs to write
491 548 # furl files for the engine to pick up
492 549 reactor.callLater(1.0, start_engines, cont_pid)
493 550 dstart.addCallback(delay_start)
494 551 dstart.addErrback(lambda f: f.raiseException())
495 552
496 553
497 554 def main_pbs(args):
498 555 cont_args = []
499 556 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
500 557
501 558 # Check security settings before proceeding
502 559 if not check_security(args, cont_args):
503 560 return
504 561
505 562 cl = ControllerLauncher(extra_args=cont_args)
506 563 dstart = cl.start()
507 564 def start_engines(r):
508 565 pbs_set = PBSEngineSet(args.pbsscript)
509 566 def shutdown(signum, frame):
510 567 log.msg('Stopping pbs cluster')
511 568 d = pbs_set.kill()
512 569 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
513 570 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
514 571 signal.signal(signal.SIGINT,shutdown)
515 572 d = pbs_set.start(args.n)
516 573 return d
517 574 dstart.addCallback(start_engines)
518 575 dstart.addErrback(lambda f: f.raiseException())
519 576
520 577
521 # currently the ssh launcher only launches the controller on localhost.
522 578 def main_ssh(args):
523 # the clusterfile should look like:
524 # send_furl = False # True, if you want
525 # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2}
579 """Start a controller on localhost and engines using ssh.
580
581 Your clusterfile should look like::
582
583 send_furl = False # True, if you want
584 engines = {
585 'engine_host1' : engine_count,
586 'engine_host2' : engine_count2
587 }
588 """
526 589 clusterfile = {}
527 590 execfile(args.clusterfile, clusterfile)
528 591 if not clusterfile.has_key('send_furl'):
529 592 clusterfile['send_furl'] = False
530 593
531 594 cont_args = []
532 595 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
533 if args.x:
534 cont_args.append('-x')
535 if args.y:
536 cont_args.append('-y')
596
597 # Check security settings before proceeding
598 if not check_security(args, cont_args):
599 return
600
537 601 cl = ControllerLauncher(extra_args=cont_args)
538 602 dstart = cl.start()
539 603 def start_engines(cont_pid):
540 est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
541 est.start(clusterfile['send_furl'])
604 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
542 605 def shutdown(signum, frame):
543 est.killall()
544 cl.interrupt_then_kill(0.5)
606 d = ssh_set.kill()
607 # d.addErrback(log.err)
608 cl.interrupt_then_kill(1.0)
545 609 reactor.callLater(2.0, reactor.stop)
546 610 signal.signal(signal.SIGINT,shutdown)
611 d = ssh_set.start(clusterfile['send_furl'])
612 return d
547 613
548 614 def delay_start(cont_pid):
549 615 reactor.callLater(1.0, start_engines, cont_pid)
550 616
551 617 dstart.addCallback(delay_start)
552 618 dstart.addErrback(lambda f: f.raiseException())
553 619
554 620
555 621 def get_args():
556 622 base_parser = argparse.ArgumentParser(add_help=False)
557 623 base_parser.add_argument(
558 624 '-x',
559 625 action='store_true',
560 626 dest='x',
561 627 help='turn off client security'
562 628 )
563 629 base_parser.add_argument(
564 630 '-y',
565 631 action='store_true',
566 632 dest='y',
567 633 help='turn off engine security'
568 634 )
569 635 base_parser.add_argument(
570 636 "--logdir",
571 637 type=str,
572 638 dest="logdir",
573 639 help="directory to put log files (default=$IPYTHONDIR/log)",
574 640 default=pjoin(get_ipython_dir(),'log')
575 641 )
576 642 base_parser.add_argument(
577 643 "-n",
578 644 "--num",
579 645 type=int,
580 646 dest="n",
581 647 default=2,
582 648 help="the number of engines to start"
583 649 )
584 650
585 651 parser = argparse.ArgumentParser(
586 652 description='IPython cluster startup. This starts a controller and\
587 653 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
588 654 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
589 655 )
590 656 subparsers = parser.add_subparsers(
591 657 help='available cluster types. For help, do "ipcluster TYPE --help"')
592 658
593 659 parser_local = subparsers.add_parser(
594 660 'local',
595 661 help='run a local cluster',
596 662 parents=[base_parser]
597 663 )
598 664 parser_local.set_defaults(func=main_local)
599 665
600 666 parser_mpirun = subparsers.add_parser(
601 667 'mpirun',
602 668 help='run a cluster using mpirun',
603 669 parents=[base_parser]
604 670 )
605 671 parser_mpirun.add_argument(
606 672 "--mpi",
607 673 type=str,
608 674 dest="mpi", # Don't put a default here to allow no MPI support
609 675 help="how to call MPI_Init (default=mpi4py)"
610 676 )
611 677 parser_mpirun.set_defaults(func=main_mpirun)
612 678
613 679 parser_pbs = subparsers.add_parser(
614 680 'pbs',
615 681 help='run a pbs cluster',
616 682 parents=[base_parser]
617 683 )
618 684 parser_pbs.add_argument(
619 685 '--pbs-script',
620 686 type=str,
621 687 dest='pbsscript',
622 688 help='PBS script template',
623 689 default='pbs.template'
624 690 )
625 691 parser_pbs.set_defaults(func=main_pbs)
626 692
627 693 parser_ssh = subparsers.add_parser(
628 694 'ssh',
629 695 help='run a cluster using ssh, should have ssh-keys setup',
630 696 parents=[base_parser]
631 697 )
632 698 parser_ssh.add_argument(
633 699 '--clusterfile',
634 700 type=str,
635 701 dest='clusterfile',
636 702 help='python file describing the cluster',
637 703 default='clusterfile.py',
638 704 )
639 705 parser_ssh.add_argument(
640 706 '--sshx',
641 707 type=str,
642 708 dest='sshx',
643 help='sshx launcher helper',
644 default='sshx.sh',
709 help='sshx launcher helper'
645 710 )
646 711 parser_ssh.set_defaults(func=main_ssh)
647 712
648 713 args = parser.parse_args()
649 714 return args
650 715
651 716 def main():
652 717 args = get_args()
653 718 reactor.callWhenRunning(args.func, args)
654 719 log.startLogging(sys.stdout)
655 720 reactor.run()
656 721
657 722 if __name__ == '__main__':
658 723 main()
@@ -1,393 +1,398 b''
1 1 .. _changes:
2 2
3 3 ==========
4 4 What's new
5 5 ==========
6 6
7 7 .. contents::
8 8 ..
9 9 1 Release 0.9.1
10 10 2 Release 0.9
11 11 2.1 New features
12 12 2.2 Bug fixes
13 13 2.3 Backwards incompatible changes
14 14 2.4 Changes merged in from IPython1
15 15 2.4.1 New features
16 16 2.4.2 Bug fixes
17 17 2.4.3 Backwards incompatible changes
18 18 3 Release 0.8.4
19 19 4 Release 0.8.3
20 20 5 Release 0.8.2
21 21 6 Older releases
22 22 ..
23 23
24 24 Release dev
25 25 ===========
26 26
27 27 New features
28 28 ------------
29 29
30 * The new ipcluster now has a fully working ssh mode that should work on
31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32
30 33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
31 34 to Matt Foster for this patch.
32 35
33 36 * Fully refactored :command:`ipcluster` command line program for starting
34 37 IPython clusters. This new version is a complete rewrite and 1) is fully
35 38 cross platform (we now use Twisted's process management), 2) has much
36 39 improved performance, 3) uses subcommands for different types of clusters,
37 40 4) uses argparse for parsing command line options, 5) has better support
38 41 for starting clusters using :command:`mpirun`, 6) has experimental support
39 42 for starting engines using PBS. However, this new version of ipcluster
40 43 should be considered a technology preview. We plan on changing the API
41 44 in significant ways before it is final.
42 45
43 46 * The :mod:`argparse` module has been added to :mod:`IPython.external`.
44 47
45 48 * Fully description of the security model added to the docs.
46 49
47 50 * cd completer: show bookmarks if no other completions are available.
48 51
49 52 * sh profile: easy way to give 'title' to prompt: assign to variable
50 53 '_prompt_title'. It looks like this::
51 54
52 55 [~]|1> _prompt_title = 'sudo!'
53 56 sudo![~]|2>
54 57
55 58 * %edit: If you do '%edit pasted_block', pasted_block
56 59 variable gets updated with new data (so repeated
57 60 editing makes sense)
58 61
59 62 Bug fixes
60 63 ---------
61 64
65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66
62 67 * The ipengine and ipcontroller scripts now handle missing furl files
63 68 more gracefully by giving better error messages.
64 69
65 70 * %rehashx: Aliases no longer contain dots. python3.0 binary
66 71 will create alias python30. Fixes:
67 72 #259716 "commands with dots in them don't work"
68 73
69 74 * %cpaste: %cpaste -r repeats the last pasted block.
70 75 The block is assigned to pasted_block even if code
71 76 raises exception.
72 77
73 78 Backwards incompatible changes
74 79 ------------------------------
75 80
76 81 * The controller now has a ``-r`` flag that needs to be used if you want to
77 82 reuse existing furl files. Otherwise they are deleted (the default).
78 83
79 84 * Remove ipy_leo.py. "easy_install ipython-extension" to get it.
80 85 (done to decouple it from ipython release cycle)
81 86
82 87
83 88
84 89 Release 0.9.1
85 90 =============
86 91
87 92 This release was quickly made to restore compatibility with Python 2.4, which
88 93 version 0.9 accidentally broke. No new features were introduced, other than
89 94 some additional testing support for internal use.
90 95
91 96
92 97 Release 0.9
93 98 ===========
94 99
95 100 New features
96 101 ------------
97 102
98 103 * All furl files and security certificates are now put in a read-only
99 104 directory named ~./ipython/security.
100 105
101 106 * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that
102 107 determines the user's IPython directory in a robust manner.
103 108
104 109 * Laurent's WX application has been given a top-level script called
105 110 ipython-wx, and it has received numerous fixes. We expect this code to be
106 111 architecturally better integrated with Gael's WX 'ipython widget' over the
107 112 next few releases.
108 113
109 114 * The Editor synchronization work by Vivian De Smedt has been merged in. This
110 115 code adds a number of new editor hooks to synchronize with editors under
111 116 Windows.
112 117
113 118 * A new, still experimental but highly functional, WX shell by Gael Varoquaux.
114 119 This work was sponsored by Enthought, and while it's still very new, it is
115 120 based on a more cleanly organized arhictecture of the various IPython
116 121 components. We will continue to develop this over the next few releases as a
117 122 model for GUI components that use IPython.
118 123
119 124 * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework),
120 125 authored by Barry Wark. Currently the WX and the Cocoa ones have slightly
121 126 different internal organizations, but the whole team is working on finding
122 127 what the right abstraction points are for a unified codebase.
123 128
124 129 * As part of the frontend work, Barry Wark also implemented an experimental
125 130 event notification system that various ipython components can use. In the
126 131 next release the implications and use patterns of this system regarding the
127 132 various GUI options will be worked out.
128 133
129 134 * IPython finally has a full test system, that can test docstrings with
130 135 IPython-specific functionality. There are still a few pieces missing for it
131 136 to be widely accessible to all users (so they can run the test suite at any
132 137 time and report problems), but it now works for the developers. We are
133 138 working hard on continuing to improve it, as this was probably IPython's
134 139 major Achilles heel (the lack of proper test coverage made it effectively
135 140 impossible to do large-scale refactoring). The full test suite can now
136 141 be run using the :command:`iptest` command line program.
137 142
138 143 * The notion of a task has been completely reworked. An `ITask` interface has
139 144 been created. This interface defines the methods that tasks need to
140 145 implement. These methods are now responsible for things like submitting
141 146 tasks and processing results. There are two basic task types:
142 147 :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but
143 148 renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on
144 149 a function.
145 150
146 151 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
147 152 standardize the idea of a `map` method. This interface has a single `map`
148 153 method that has the same syntax as the built-in `map`. We have also defined
149 154 a `mapper` factory interface that creates objects that implement
150 155 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the
151 156 multiengine and task controller now have mapping capabilties.
152 157
153 158 * The parallel function capabilities have been reworks. The major changes are
154 159 that i) there is now an `@parallel` magic that creates parallel functions,
155 160 ii) the syntax for mulitple variable follows that of `map`, iii) both the
156 161 multiengine and task controller now have a parallel function implementation.
157 162
158 163 * All of the parallel computing capabilities from `ipython1-dev` have been
159 164 merged into IPython proper. This resulted in the following new subpackages:
160 165 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
161 166 :mod:`IPython.tools` and :mod:`IPython.testing`.
162 167
163 168 * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and
164 169 friends have been completely refactored. Now we are checking for
165 170 dependencies using the approach that matplotlib uses.
166 171
167 172 * The documentation has been completely reorganized to accept the
168 173 documentation from `ipython1-dev`.
169 174
170 175 * We have switched to using Foolscap for all of our network protocols in
171 176 :mod:`IPython.kernel`. This gives us secure connections that are both
172 177 encrypted and authenticated.
173 178
174 179 * We have a brand new `COPYING.txt` files that describes the IPython license
175 180 and copyright. The biggest change is that we are putting "The IPython
176 181 Development Team" as the copyright holder. We give more details about
177 182 exactly what this means in this file. All developer should read this and use
178 183 the new banner in all IPython source code files.
179 184
180 185 * sh profile: ./foo runs foo as system command, no need to do !./foo anymore
181 186
182 187 * String lists now support ``sort(field, nums = True)`` method (to easily sort
183 188 system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``.
184 189
185 190 * '%cpaste foo' now assigns the pasted block as string list, instead of string
186 191
187 192 * The ipcluster script now run by default with no security. This is done
188 193 because the main usage of the script is for starting things on localhost.
189 194 Eventually when ipcluster is able to start things on other hosts, we will put
190 195 security back.
191 196
192 197 * 'cd --foo' searches directory history for string foo, and jumps to that dir.
193 198 Last part of dir name is checked first. If no matches for that are found,
194 199 look at the whole path.
195 200
196 201
197 202 Bug fixes
198 203 ---------
199 204
200 205 * The Windows installer has been fixed. Now all IPython scripts have ``.bat``
201 206 versions created. Also, the Start Menu shortcuts have been updated.
202 207
203 208 * The colors escapes in the multiengine client are now turned off on win32 as
204 209 they don't print correctly.
205 210
206 211 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing
207 212 mpi_import_statement incorrectly, which was leading the engine to crash when
208 213 mpi was enabled.
209 214
210 215 * A few subpackages had missing ``__init__.py`` files.
211 216
212 217 * The documentation is only created if Sphinx is found. Previously, the
213 218 ``setup.py`` script would fail if it was missing.
214 219
215 220 * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as
216 221 it caused problems on certain platforms.
217 222
218 223
219 224 Backwards incompatible changes
220 225 ------------------------------
221 226
222 227 * The ``clusterfile`` options of the :command:`ipcluster` command has been
223 228 removed as it was not working and it will be replaced soon by something much
224 229 more robust.
225 230
226 231 * The :mod:`IPython.kernel` configuration now properly find the user's
227 232 IPython directory.
228 233
229 234 * In ipapi, the :func:`make_user_ns` function has been replaced with
230 235 :func:`make_user_namespaces`, to support dict subclasses in namespace
231 236 creation.
232 237
233 238 * :class:`IPython.kernel.client.Task` has been renamed
234 239 :class:`IPython.kernel.client.StringTask` to make way for new task types.
235 240
236 241 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
237 242 and `map`.
238 243
239 244 * Renamed the values that the rename `dist` keyword argument can have from
240 245 `'basic'` to `'b'`.
241 246
242 247 * IPython has a larger set of dependencies if you want all of its capabilities.
243 248 See the `setup.py` script for details.
244 249
245 250 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
246 251 :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple.
247 252 Instead they take the filename of a file that contains the FURL for that
248 253 client. If the FURL file is in your IPYTHONDIR, it will be found automatically
249 254 and the constructor can be left empty.
250 255
251 256 * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created
252 257 using the factory functions :func:`get_multiengine_client` and
253 258 :func:`get_task_client`. These return a `Deferred` to the actual client.
254 259
255 260 * The command line options to `ipcontroller` and `ipengine` have changed to
256 261 reflect the new Foolscap network protocol and the FURL files. Please see the
257 262 help for these scripts for details.
258 263
259 264 * The configuration files for the kernel have changed because of the Foolscap
260 265 stuff. If you were using custom config files before, you should delete them
261 266 and regenerate new ones.
262 267
263 268 Changes merged in from IPython1
264 269 -------------------------------
265 270
266 271 New features
267 272 ............
268 273
269 274 * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and
270 275 zope.interface are now easy installable, we can declare them as dependencies
271 276 in our setupegg.py script.
272 277
273 278 * IPython is now compatible with Twisted 2.5.0 and 8.x.
274 279
275 280 * Added a new example of how to use :mod:`ipython1.kernel.asynclient`.
276 281
277 282 * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not
278 283 been merged into IPython and is still in `ipython1-dev`.
279 284
280 285 * The ``TaskController`` now has methods for getting the queue status.
281 286
282 287 * The ``TaskResult`` objects not have information about how long the task
283 288 took to run.
284 289
285 290 * We are attaching additional attributes to exceptions ``(_ipython_*)`` that
286 291 we use to carry additional info around.
287 292
288 293 * New top-level module :mod:`asyncclient` that has asynchronous versions (that
289 294 return deferreds) of the client classes. This is designed to users who want
290 295 to run their own Twisted reactor.
291 296
292 297 * All the clients in :mod:`client` are now based on Twisted. This is done by
293 298 running the Twisted reactor in a separate thread and using the
294 299 :func:`blockingCallFromThread` function that is in recent versions of Twisted.
295 300
296 301 * Functions can now be pushed/pulled to/from engines using
297 302 :meth:`MultiEngineClient.push_function` and
298 303 :meth:`MultiEngineClient.pull_function`.
299 304
300 305 * Gather/scatter are now implemented in the client to reduce the work load
301 306 of the controller and improve performance.
302 307
303 308 * Complete rewrite of the IPython docuementation. All of the documentation
304 309 from the IPython website has been moved into docs/source as restructured
305 310 text documents. PDF and HTML documentation are being generated using
306 311 Sphinx.
307 312
308 313 * New developer oriented documentation: development guidelines and roadmap.
309 314
310 315 * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt``
311 316 file that is organized by release and is meant to provide something more
312 317 relevant for users.
313 318
314 319 Bug fixes
315 320 .........
316 321
317 322 * Created a proper ``MANIFEST.in`` file to create source distributions.
318 323
319 324 * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine
320 325 actions were being collected with a :class:`DeferredList` with
321 326 ``fireononeerrback=1``. This meant that methods were returning
322 327 before all engines had given their results. This was causing extremely odd
323 328 bugs in certain cases. To fix this problem, we have 1) set
324 329 ``fireononeerrback=0`` to make sure all results (or exceptions) are in
325 330 before returning and 2) introduced a :exc:`CompositeError` exception
326 331 that wraps all of the engine exceptions. This is a huge change as it means
327 332 that users will have to catch :exc:`CompositeError` rather than the actual
328 333 exception.
329 334
330 335 Backwards incompatible changes
331 336 ..............................
332 337
333 338 * All names have been renamed to conform to the lowercase_with_underscore
334 339 convention. This will require users to change references to all names like
335 340 ``queueStatus`` to ``queue_status``.
336 341
337 342 * Previously, methods like :meth:`MultiEngineClient.push` and
338 343 :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was
339 344 becoming a problem as we weren't able to introduce new keyword arguments into
340 345 the API. Now these methods simple take a dict or sequence. This has also
341 346 allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and
342 347 :meth:`pullAll`. These things are now handled with the ``targets`` keyword
343 348 argument that defaults to ``'all'``.
344 349
345 350 * The :attr:`MultiEngineClient.magicTargets` has been renamed to
346 351 :attr:`MultiEngineClient.targets`.
347 352
348 353 * All methods in the MultiEngine interface now accept the optional keyword
349 354 argument ``block``.
350 355
351 356 * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and
352 357 :class:`TaskController` to :class:`TaskClient`.
353 358
354 359 * Renamed the top-level module from :mod:`api` to :mod:`client`.
355 360
356 361 * Most methods in the multiengine interface now raise a :exc:`CompositeError`
357 362 exception that wraps the user's exceptions, rather than just raising the raw
358 363 user's exception.
359 364
360 365 * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push``
361 366 and ``pull``.
362 367
363 368
364 369 Release 0.8.4
365 370 =============
366 371
367 372 This was a quick release to fix an unfortunate bug that slipped into the 0.8.3
368 373 release. The ``--twisted`` option was disabled, as it turned out to be broken
369 374 across several platforms.
370 375
371 376
372 377 Release 0.8.3
373 378 =============
374 379
375 380 * pydb is now disabled by default (due to %run -d problems). You can enable
376 381 it by passing -pydb command line argument to IPython. Note that setting
377 382 it in config file won't work.
378 383
379 384
380 385 Release 0.8.2
381 386 =============
382 387
383 388 * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory
384 389 and jumps to /foo. The current behaviour is closer to the documented
385 390 behaviour, and should not trip anyone.
386 391
387 392
388 393 Older releases
389 394 ==============
390 395
391 396 Changes in earlier releases of IPython are described in the older file
392 397 ``ChangeLog``. Please refer to this document for details.
393 398
@@ -1,326 +1,324 b''
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19 19
20 20 General considerations
21 21 ======================
22 22
23 23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24 24
25 25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26 26
27 27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 28 ``host0``.
29 29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 32 :command:`ipengine`. This command has to be told where the FURL file
33 33 (:file:`ipcontroller-engine.furl`) is located.
34 34
35 35 At this point, the controller and engines will be connected. By default, the
36 36 FURL files created by the controller are put into the
37 37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 38 the controller, step 2 can be skipped as the engines will automatically look
39 39 at that location.
40 40
41 41 The final step required required to actually use the running controller from a
42 42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45 45
46 46 Using :command:`ipcluster`
47 47 ==========================
48 48
49 49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50 50
51 51 1. When the controller and engines are all run on localhost. This is useful
52 52 for testing or running on a multicore computer.
53 53 2. When engines are started using the :command:`mpirun` command that comes
54 54 with most MPI [MPI]_ implementations
55 55 3. When engines are started using the PBS [PBS]_ batch system.
56 56 4. When the controller is started on localhost and the engines are started on
57 57 remote nodes using :command:`ssh`.
58 58
59 59 .. note::
60 60
61 61 It is also possible for advanced users to add support to
62 62 :command:`ipcluster` for starting controllers and engines using other
63 63 methods (like Sun's Grid Engine for example).
64 64
65 65 .. note::
66 66
67 67 Currently :command:`ipcluster` requires that the
68 68 :file:`~/.ipython/security` directory live on a shared filesystem that is
69 69 seen by both the controller and engines. If you don't have a shared file
70 70 system you will need to use :command:`ipcontroller` and
71 71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 72 using the :command:`ssh` method to start the cluster.
73 73
74 74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
75 75 and :command:`ipengine` to perform the steps described above.
76 76
77 77 Using :command:`ipcluster` in local mode
78 78 ----------------------------------------
79 79
80 80 To start one controller and 4 engines on localhost, just do::
81 81
82 82 $ ipcluster local -n 4
83 83
84 84 To see other command line options for the local mode, do::
85 85
86 86 $ ipcluster local -h
87 87
88 88 Using :command:`ipcluster` in mpirun mode
89 89 -----------------------------------------
90 90
91 91 The mpirun mode is useful if you:
92 92
93 93 1. Have MPI installed.
94 94 2. Your systems are configured to use the :command:`mpirun` command to start
95 95 processes.
96 96
97 97 If these are satisfied, you can start an IPython cluster using::
98 98
99 99 $ ipcluster mpirun -n 4
100 100
101 101 This does the following:
102 102
103 103 1. Starts the IPython controller on current host.
104 104 2. Uses :command:`mpirun` to start 4 engines.
105 105
106 106 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
107 107
108 108 $ ipcluster mpirun -n 4 --mpi=mpi4py
109 109
110 110 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
111 111
112 112 Additional command line options for this mode can be found by doing::
113 113
114 114 $ ipcluster mpirun -h
115 115
116 116 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
117 117
118 118
119 119 Using :command:`ipcluster` in PBS mode
120 120 --------------------------------------
121 121
122 122 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
123 123
124 124 .. sourcecode:: bash
125 125
126 126 #PBS -N ipython
127 127 #PBS -j oe
128 128 #PBS -l walltime=00:10:00
129 129 #PBS -l nodes=${n/4}:ppn=4
130 130 #PBS -q parallel
131 131
132 132 cd $$PBS_O_WORKDIR
133 133 export PATH=$$HOME/usr/local/bin
134 134 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
135 135 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
136 136
137 137 There are a few important points about this template:
138 138
139 139 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
140 140 template engine.
141 141
142 142 2. Instead of putting in the actual number of engines, use the notation
143 143 ``${n}`` to indicate the number of engines to be started. You can also uses
144 144 expressions like ``${n/4}`` in the template to indicate the number of
145 145 nodes.
146 146
147 147 3. Because ``$`` is a special character used by the template engine, you must
148 148 escape any ``$`` by using ``$$``. This is important when referring to
149 149 environment variables in the template.
150 150
151 151 4. Any options to :command:`ipengine` should be given in the batch script
152 152 template.
153 153
154 154 5. Depending on the configuration of you system, you may have to set
155 155 environment variables in the script template.
156 156
157 157 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
158 158
159 159 $ ipcluster pbs -n 128 --pbs-script=pbs.template
160 160
161 161 Additional command line options for this mode can be found by doing::
162 162
163 163 $ ipcluster pbs -h
164 164
165 165 Using :command:`ipcluster` in SSH mode
166 166 --------------------------------------
167 167
168 168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 169 nodes and the :command:`ipcontroller` on localhost.
170 170
171 171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172 172
173 173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174 174
175 175 .. sourcecode:: python
176 176
177 177 send_furl = True
178 178 engines = { 'host1.example.com' : 2,
179 179 'host2.example.com' : 5,
180 180 'host3.example.com' : 1,
181 181 'host4.example.com' : 8 }
182 182
183 183 Since this is a regular python file usual python syntax applies. Things to note:
184 184
185 185 * The `engines` dict, where the keys is the host we want to run engines on and
186 186 the value is the number of engines to run on that host.
187 187 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 188 furl needed for :command:`ipengine` to each host.
189 189
190 190 The ``--clusterfile`` command line option lets you specify the file to use for
191 191 the cluster definition. Once you have your cluster file and you can
192 192 :command:`ssh` into the remote hosts with out an password you are ready to
193 193 start your cluster like so:
194 194
195 195 .. sourcecode:: bash
196 196
197 197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198 198
199 199
200 200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201 201
202 202 * sshx.sh
203 203 * engine_killer.sh
204 204
205 Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a
206 temp directory on the remote host and executed from there, on most Unix, Linux
207 and OS X systems this is /tmp.
205 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
208 206
209 The sshx.sh is as simple as:
207 The default sshx.sh is the following:
210 208
211 209 .. sourcecode:: bash
212 210
213 211 #!/bin/sh
214 212 "$@" &> /dev/null &
215 213 echo $!
216 214
217 215 If you want to use a custom sshx.sh script you need to use the ``--sshx``
218 216 option and specify the file to use. Using a custom sshx.sh file could be
219 217 helpful when you need to setup the environment on the remote host before
220 218 executing :command:`ipengine`.
221 219
222 220 For a detailed options list:
223 221
224 222 .. sourcecode:: bash
225 223
226 224 $ ipcluster ssh -h
227 225
228 226 Current limitations of the SSH mode of :command:`ipcluster` are:
229 227
230 228 * Untested on Windows. Would require a working :command:`ssh` on Windows.
231 229 Also, we are using shell scripts to setup and execute commands on remote
232 230 hosts.
233 231 * :command:`ipcontroller` is started on localhost, with no option to start it
234 on a remote node also.
232 on a remote node.
235 233
236 234 Using the :command:`ipcontroller` and :command:`ipengine` commands
237 235 ==================================================================
238 236
239 237 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
240 238
241 239 Starting the controller and engine on your local machine
242 240 --------------------------------------------------------
243 241
244 242 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
245 243 local machine, do the following.
246 244
247 245 First start the controller::
248 246
249 247 $ ipcontroller
250 248
251 249 Next, start however many instances of the engine you want using (repeatedly) the command::
252 250
253 251 $ ipengine
254 252
255 253 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
256 254
257 255 .. warning::
258 256
259 257 The order of the above operations is very important. You *must*
260 258 start the controller before the engines, since the engines connect
261 259 to the controller as they get started.
262 260
263 261 .. note::
264 262
265 263 On some platforms (OS X), to put the controller and engine into the
266 264 background you may need to give these commands in the form ``(ipcontroller
267 265 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
268 266 properly.
269 267
270 268 Starting the controller and engines on different hosts
271 269 ------------------------------------------------------
272 270
273 271 When the controller and engines are running on different hosts, things are
274 272 slightly more complicated, but the underlying ideas are the same:
275 273
276 274 1. Start the controller on a host using :command:`ipcontroller`.
277 275 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
278 276 3. Use :command:`ipengine` on the engine's hosts to start the engines.
279 277
280 278 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
281 279
282 280 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
283 281 directory on the engine's host, where it will be found automatically.
284 282 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
285 283 flag.
286 284
287 285 The ``--furl-file`` flag works like this::
288 286
289 287 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
290 288
291 289 .. note::
292 290
293 291 If the controller's and engine's hosts all have a shared file system
294 292 (:file:`~./ipython/security` is the same on all of them), then things
295 293 will just work!
296 294
297 295 Make FURL files persistent
298 296 ---------------------------
299 297
300 298 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
301 299
302 300 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
303 301
304 302 $ ipcontroller -r --client-port=10101 --engine-port=10102
305 303
306 304 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
307 305
308 306 .. note::
309 307
310 308 You may ask the question: what ports does the controller listen on if you
311 309 don't tell is to use specific ones? The default is to use high random port
312 310 numbers. We do this for two reasons: i) to increase security through
313 311 obscurity and ii) to multiple controllers on a given host to start and
314 312 automatically use different ports.
315 313
316 314 Log files
317 315 ---------
318 316
319 317 All of the components of IPython have log files associated with them.
320 318 These log files can be extremely useful in debugging problems with
321 319 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
322 320 the log files to us will often help us to debug any problems.
323 321
324 322
325 323 .. [PBS] Portable Batch System. http://www.openpbs.org/
326 324 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now