##// END OF EJS Templates
Merging vvatsa's ipcluster-dev branch....
Brian Granger -
r1833:e4b173fe merge
parent child Browse files
Show More
@@ -1,903 +1,903 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3 3
4 4 """A Twisted Service Representation of the IPython core.
5 5
6 6 The IPython Core exposed to the network is called the Engine. Its
7 7 representation in Twisted in the EngineService. Interfaces and adapters
8 8 are used to abstract out the details of the actual network protocol used.
9 9 The EngineService is an Engine that knows nothing about the actual protocol
10 10 used.
11 11
12 12 The EngineService is exposed with various network protocols in modules like:
13 13
14 14 enginepb.py
15 15 enginevanilla.py
16 16
17 17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 18 felt that we had over-engineered things. To improve the maintainability of the
19 19 code we have taken out the ICompleteEngine interface and the completeEngine
20 20 method that automatically added methods to engines.
21 21
22 22 """
23 23
24 24 __docformat__ = "restructuredtext en"
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Copyright (C) 2008 The IPython Development Team
28 28 #
29 29 # Distributed under the terms of the BSD License. The full license is in
30 30 # the file COPYING, distributed as part of this software.
31 31 #-------------------------------------------------------------------------------
32 32
33 33 #-------------------------------------------------------------------------------
34 34 # Imports
35 35 #-------------------------------------------------------------------------------
36 36
37 37 import os, sys, copy
38 38 import cPickle as pickle
39 39 from new import instancemethod
40 40
41 41 from twisted.application import service
42 42 from twisted.internet import defer, reactor
43 43 from twisted.python import log, failure, components
44 44 import zope.interface as zi
45 45
46 46 from IPython.kernel.core.interpreter import Interpreter
47 47 from IPython.kernel import newserialized, error, util
48 48 from IPython.kernel.util import printer
49 49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
50 50 from IPython.kernel import codeutil
51 51
52 52
53 53 #-------------------------------------------------------------------------------
54 54 # Interface specification for the Engine
55 55 #-------------------------------------------------------------------------------
56 56
57 57 class IEngineCore(zi.Interface):
58 58 """The minimal required interface for the IPython Engine.
59 59
60 60 This interface provides a formal specification of the IPython core.
61 61 All these methods should return deferreds regardless of what side of a
62 62 network connection they are on.
63 63
64 64 In general, this class simply wraps a shell class and wraps its return
65 65 values as Deferred objects. If the underlying shell class method raises
66 66 an exception, this class should convert it to a twisted.failure.Failure
67 67 that will be propagated along the Deferred's errback chain.
68 68
69 69 In addition, Failures are aggressive. By this, we mean that if a method
70 70 is performing multiple actions (like pulling multiple object) if any
71 71 single one fails, the entire method will fail with that Failure. It is
72 72 all or nothing.
73 73 """
74 74
75 75 id = zi.interface.Attribute("the id of the Engine object")
76 76 properties = zi.interface.Attribute("A dict of properties of the Engine")
77 77
78 78 def execute(lines):
79 79 """Execute lines of Python code.
80 80
81 81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
82 82 upon success.
83 83
84 84 Returns a failure object if the execution of lines raises an exception.
85 85 """
86 86
87 87 def push(namespace):
88 88 """Push dict namespace into the user's namespace.
89 89
90 90 Returns a deferred to None or a failure.
91 91 """
92 92
93 93 def pull(keys):
94 94 """Pulls values out of the user's namespace by keys.
95 95
96 96 Returns a deferred to a tuple objects or a single object.
97 97
98 98 Raises NameError if any one of objects doess not exist.
99 99 """
100 100
101 101 def push_function(namespace):
102 102 """Push a dict of key, function pairs into the user's namespace.
103 103
104 104 Returns a deferred to None or a failure."""
105 105
106 106 def pull_function(keys):
107 107 """Pulls functions out of the user's namespace by keys.
108 108
109 109 Returns a deferred to a tuple of functions or a single function.
110 110
111 111 Raises NameError if any one of the functions does not exist.
112 112 """
113 113
114 114 def get_result(i=None):
115 115 """Get the stdin/stdout/stderr of command i.
116 116
117 117 Returns a deferred to a dict with keys
118 118 (id, number, stdin, stdout, stderr).
119 119
120 120 Raises IndexError if command i does not exist.
121 121 Raises TypeError if i in not an int.
122 122 """
123 123
124 124 def reset():
125 125 """Reset the shell.
126 126
127 127 This clears the users namespace. Won't cause modules to be
128 128 reloaded. Should also re-initialize certain variables like id.
129 129 """
130 130
131 131 def kill():
132 132 """Kill the engine by stopping the reactor."""
133 133
134 134 def keys():
135 135 """Return the top level variables in the users namspace.
136 136
137 137 Returns a deferred to a dict."""
138 138
139 139
140 140 class IEngineSerialized(zi.Interface):
141 141 """Push/Pull methods that take Serialized objects.
142 142
143 143 All methods should return deferreds.
144 144 """
145 145
146 146 def push_serialized(namespace):
147 147 """Push a dict of keys and Serialized objects into the user's namespace."""
148 148
149 149 def pull_serialized(keys):
150 150 """Pull objects by key from the user's namespace as Serialized.
151 151
152 152 Returns a list of or one Serialized.
153 153
154 154 Raises NameError is any one of the objects does not exist.
155 155 """
156 156
157 157
158 158 class IEngineProperties(zi.Interface):
159 159 """Methods for access to the properties object of an Engine"""
160 160
161 161 properties = zi.Attribute("A StrictDict object, containing the properties")
162 162
163 163 def set_properties(properties):
164 164 """set properties by key and value"""
165 165
166 166 def get_properties(keys=None):
167 167 """get a list of properties by `keys`, if no keys specified, get all"""
168 168
169 169 def del_properties(keys):
170 170 """delete properties by `keys`"""
171 171
172 172 def has_properties(keys):
173 173 """get a list of bool values for whether `properties` has `keys`"""
174 174
175 175 def clear_properties():
176 176 """clear the properties dict"""
177 177
178 178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
179 179 """The basic engine interface that EngineService will implement.
180 180
181 181 This exists so it is easy to specify adapters that adapt to and from the
182 182 API that the basic EngineService implements.
183 183 """
184 184 pass
185 185
186 186 class IEngineQueued(IEngineBase):
187 187 """Interface for adding a queue to an IEngineBase.
188 188
189 189 This interface extends the IEngineBase interface to add methods for managing
190 190 the engine's queue. The implicit details of this interface are that the
191 191 execution of all methods declared in IEngineBase should appropriately be
192 192 put through a queue before execution.
193 193
194 194 All methods should return deferreds.
195 195 """
196 196
197 197 def clear_queue():
198 198 """Clear the queue."""
199 199
200 200 def queue_status():
201 201 """Get the queued and pending commands in the queue."""
202 202
203 203 def register_failure_observer(obs):
204 204 """Register an observer of pending Failures.
205 205
206 206 The observer must implement IFailureObserver.
207 207 """
208 208
209 209 def unregister_failure_observer(obs):
210 210 """Unregister an observer of pending Failures."""
211 211
212 212
213 213 class IEngineThreaded(zi.Interface):
214 214 """A place holder for threaded commands.
215 215
216 216 All methods should return deferreds.
217 217 """
218 218 pass
219 219
220 220
221 221 #-------------------------------------------------------------------------------
222 222 # Functions and classes to implement the EngineService
223 223 #-------------------------------------------------------------------------------
224 224
225 225
226 226 class StrictDict(dict):
227 227 """This is a strict copying dictionary for use as the interface to the
228 228 properties of an Engine.
229 229
230 230 :IMPORTANT:
231 231 This object copies the values you set to it, and returns copies to you
232 232 when you request them. The only way to change properties os explicitly
233 233 through the setitem and getitem of the dictionary interface.
234 234
235 235 Example:
236 236 >>> e = get_engine(id)
237 237 >>> L = [1,2,3]
238 238 >>> e.properties['L'] = L
239 239 >>> L == e.properties['L']
240 240 True
241 241 >>> L.append(99)
242 242 >>> L == e.properties['L']
243 243 False
244 244
245 245 Note that getitem copies, so calls to methods of objects do not affect
246 246 the properties, as seen here:
247 247
248 248 >>> e.properties[1] = range(2)
249 249 >>> print e.properties[1]
250 250 [0, 1]
251 251 >>> e.properties[1].append(2)
252 252 >>> print e.properties[1]
253 253 [0, 1]
254 254 """
255 255 def __init__(self, *args, **kwargs):
256 256 dict.__init__(self, *args, **kwargs)
257 257 self.modified = True
258 258
259 259 def __getitem__(self, key):
260 260 return copy.deepcopy(dict.__getitem__(self, key))
261 261
262 262 def __setitem__(self, key, value):
263 263 # check if this entry is valid for transport around the network
264 264 # and copying
265 265 try:
266 266 pickle.dumps(key, 2)
267 267 pickle.dumps(value, 2)
268 268 newvalue = copy.deepcopy(value)
269 269 except:
270 270 raise error.InvalidProperty(value)
271 271 dict.__setitem__(self, key, newvalue)
272 272 self.modified = True
273 273
274 274 def __delitem__(self, key):
275 275 dict.__delitem__(self, key)
276 276 self.modified = True
277 277
278 278 def update(self, dikt):
279 279 for k,v in dikt.iteritems():
280 280 self[k] = v
281 281
282 282 def pop(self, key):
283 283 self.modified = True
284 284 return dict.pop(self, key)
285 285
286 286 def popitem(self):
287 287 self.modified = True
288 288 return dict.popitem(self)
289 289
290 290 def clear(self):
291 291 self.modified = True
292 292 dict.clear(self)
293 293
294 294 def subDict(self, *keys):
295 295 d = {}
296 296 for key in keys:
297 297 d[key] = self[key]
298 298 return d
299 299
300 300
301 301
302 302 class EngineAPI(object):
303 303 """This is the object through which the user can edit the `properties`
304 304 attribute of an Engine.
305 305 The Engine Properties object copies all object in and out of itself.
306 306 See the EngineProperties object for details.
307 307 """
308 308 _fix=False
309 309 def __init__(self, id):
310 310 self.id = id
311 311 self.properties = StrictDict()
312 312 self._fix=True
313 313
314 314 def __setattr__(self, k,v):
315 315 if self._fix:
316 316 raise error.KernelError("I am protected!")
317 317 else:
318 318 object.__setattr__(self, k, v)
319 319
320 320 def __delattr__(self, key):
321 321 raise error.KernelError("I am protected!")
322 322
323 323
324 324 _apiDict = {}
325 325
326 326 def get_engine(id):
327 327 """Get the Engine API object, whcih currently just provides the properties
328 328 object, by ID"""
329 329 global _apiDict
330 330 if not _apiDict.get(id):
331 331 _apiDict[id] = EngineAPI(id)
332 332 return _apiDict[id]
333 333
334 334 def drop_engine(id):
335 335 """remove an engine"""
336 336 global _apiDict
337 337 if _apiDict.has_key(id):
338 338 del _apiDict[id]
339 339
340 340 class EngineService(object, service.Service):
341 341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
342 342
343 343 zi.implements(IEngineBase)
344 344 name = 'EngineService'
345 345
346 346 def __init__(self, shellClass=Interpreter, mpi=None):
347 347 """Create an EngineService.
348 348
349 349 shellClass: something that implements IInterpreter or core1
350 350 mpi: an mpi module that has rank and size attributes
351 351 """
352 352 self.shellClass = shellClass
353 353 self.shell = self.shellClass()
354 354 self.mpi = mpi
355 355 self.id = None
356 356 self.properties = get_engine(self.id).properties
357 357 if self.mpi is not None:
358 358 log.msg("MPI started with rank = %i and size = %i" %
359 359 (self.mpi.rank, self.mpi.size))
360 360 self.id = self.mpi.rank
361 361 self._seedNamespace()
362 362
363 363 # Make id a property so that the shell can get the updated id
364 364
365 365 def _setID(self, id):
366 366 self._id = id
367 367 self.properties = get_engine(id).properties
368 368 self.shell.push({'id': id})
369 369
370 370 def _getID(self):
371 371 return self._id
372 372
373 373 id = property(_getID, _setID)
374 374
375 375 def _seedNamespace(self):
376 376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
377 377
378 378 def executeAndRaise(self, msg, callable, *args, **kwargs):
379 379 """Call a method of self.shell and wrap any exception."""
380 380 d = defer.Deferred()
381 381 try:
382 382 result = callable(*args, **kwargs)
383 383 except:
384 384 # This gives the following:
385 385 # et=exception class
386 386 # ev=exception class instance
387 387 # tb=traceback object
388 388 et,ev,tb = sys.exc_info()
389 389 # This call adds attributes to the exception value
390 390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
391 391 # Add another attribute
392 392 ev._ipython_engine_info = msg
393 393 f = failure.Failure(ev,et,None)
394 394 d.errback(f)
395 395 else:
396 396 d.callback(result)
397 397
398 398 return d
399 399
400 400
401 401 # The IEngine methods. See the interface for documentation.
402 402
403 403 def execute(self, lines):
404 404 msg = {'engineid':self.id,
405 405 'method':'execute',
406 406 'args':[lines]}
407 407 d = self.executeAndRaise(msg, self.shell.execute, lines)
408 408 d.addCallback(self.addIDToResult)
409 409 return d
410 410
411 411 def addIDToResult(self, result):
412 412 result['id'] = self.id
413 413 return result
414 414
415 415 def push(self, namespace):
416 416 msg = {'engineid':self.id,
417 417 'method':'push',
418 418 'args':[repr(namespace.keys())]}
419 419 d = self.executeAndRaise(msg, self.shell.push, namespace)
420 420 return d
421 421
422 422 def pull(self, keys):
423 423 msg = {'engineid':self.id,
424 424 'method':'pull',
425 425 'args':[repr(keys)]}
426 426 d = self.executeAndRaise(msg, self.shell.pull, keys)
427 427 return d
428 428
429 429 def push_function(self, namespace):
430 430 msg = {'engineid':self.id,
431 431 'method':'push_function',
432 432 'args':[repr(namespace.keys())]}
433 433 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
434 434 return d
435 435
436 436 def pull_function(self, keys):
437 437 msg = {'engineid':self.id,
438 438 'method':'pull_function',
439 439 'args':[repr(keys)]}
440 440 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
441 441 return d
442 442
443 443 def get_result(self, i=None):
444 444 msg = {'engineid':self.id,
445 445 'method':'get_result',
446 446 'args':[repr(i)]}
447 447 d = self.executeAndRaise(msg, self.shell.getCommand, i)
448 448 d.addCallback(self.addIDToResult)
449 449 return d
450 450
451 451 def reset(self):
452 452 msg = {'engineid':self.id,
453 453 'method':'reset',
454 454 'args':[]}
455 455 del self.shell
456 456 self.shell = self.shellClass()
457 457 self.properties.clear()
458 458 d = self.executeAndRaise(msg, self._seedNamespace)
459 459 return d
460 460
461 461 def kill(self):
462 462 drop_engine(self.id)
463 463 try:
464 464 reactor.stop()
465 465 except RuntimeError:
466 466 log.msg('The reactor was not running apparently.')
467 467 return defer.fail()
468 468 else:
469 469 return defer.succeed(None)
470 470
471 471 def keys(self):
472 472 """Return a list of variables names in the users top level namespace.
473 473
474 474 This used to return a dict of all the keys/repr(values) in the
475 475 user's namespace. This was too much info for the ControllerService
476 476 to handle so it is now just a list of keys.
477 477 """
478 478
479 479 remotes = []
480 480 for k in self.shell.user_ns.iterkeys():
481 481 if k not in ['__name__', '_ih', '_oh', '__builtins__',
482 482 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
483 483 remotes.append(k)
484 484 return defer.succeed(remotes)
485 485
486 486 def set_properties(self, properties):
487 487 msg = {'engineid':self.id,
488 488 'method':'set_properties',
489 489 'args':[repr(properties.keys())]}
490 490 return self.executeAndRaise(msg, self.properties.update, properties)
491 491
492 492 def get_properties(self, keys=None):
493 493 msg = {'engineid':self.id,
494 494 'method':'get_properties',
495 495 'args':[repr(keys)]}
496 496 if keys is None:
497 497 keys = self.properties.keys()
498 498 return self.executeAndRaise(msg, self.properties.subDict, *keys)
499 499
500 500 def _doDel(self, keys):
501 501 for key in keys:
502 502 del self.properties[key]
503 503
504 504 def del_properties(self, keys):
505 505 msg = {'engineid':self.id,
506 506 'method':'del_properties',
507 507 'args':[repr(keys)]}
508 508 return self.executeAndRaise(msg, self._doDel, keys)
509 509
510 510 def _doHas(self, keys):
511 511 return [self.properties.has_key(key) for key in keys]
512 512
513 513 def has_properties(self, keys):
514 514 msg = {'engineid':self.id,
515 515 'method':'has_properties',
516 516 'args':[repr(keys)]}
517 517 return self.executeAndRaise(msg, self._doHas, keys)
518 518
519 519 def clear_properties(self):
520 520 msg = {'engineid':self.id,
521 521 'method':'clear_properties',
522 522 'args':[]}
523 523 return self.executeAndRaise(msg, self.properties.clear)
524 524
525 525 def push_serialized(self, sNamespace):
526 526 msg = {'engineid':self.id,
527 527 'method':'push_serialized',
528 528 'args':[repr(sNamespace.keys())]}
529 529 ns = {}
530 530 for k,v in sNamespace.iteritems():
531 531 try:
532 532 unserialized = newserialized.IUnSerialized(v)
533 533 ns[k] = unserialized.getObject()
534 534 except:
535 535 return defer.fail()
536 536 return self.executeAndRaise(msg, self.shell.push, ns)
537 537
538 538 def pull_serialized(self, keys):
539 539 msg = {'engineid':self.id,
540 540 'method':'pull_serialized',
541 541 'args':[repr(keys)]}
542 542 if isinstance(keys, str):
543 543 keys = [keys]
544 544 if len(keys)==1:
545 545 d = self.executeAndRaise(msg, self.shell.pull, keys)
546 546 d.addCallback(newserialized.serialize)
547 547 return d
548 548 elif len(keys)>1:
549 549 d = self.executeAndRaise(msg, self.shell.pull, keys)
550 550 @d.addCallback
551 551 def packThemUp(values):
552 552 serials = []
553 553 for v in values:
554 554 try:
555 555 serials.append(newserialized.serialize(v))
556 556 except:
557 557 return defer.fail(failure.Failure())
558 558 return serials
559 559 return packThemUp
560 560
561 561
562 562 def queue(methodToQueue):
563 563 def queuedMethod(this, *args, **kwargs):
564 564 name = methodToQueue.__name__
565 565 return this.submitCommand(Command(name, *args, **kwargs))
566 566 return queuedMethod
567 567
568 568 class QueuedEngine(object):
569 569 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
570 570
571 571 The resulting object will implement IEngineQueued which extends
572 572 IEngineCore which extends (IEngineBase, IEngineSerialized).
573 573
574 574 This seems like the best way of handling it, but I am not sure. The
575 575 other option is to have the various base interfaces be used like
576 576 mix-in intefaces. The problem I have with this is adpatation is
577 577 more difficult and complicated because there can be can multiple
578 578 original and final Interfaces.
579 579 """
580 580
581 581 zi.implements(IEngineQueued)
582 582
583 583 def __init__(self, engine):
584 584 """Create a QueuedEngine object from an engine
585 585
586 586 engine: An implementor of IEngineCore and IEngineSerialized
587 587 keepUpToDate: whether to update the remote status when the
588 588 queue is empty. Defaults to False.
589 589 """
590 590
591 591 # This is the right way to do these tests rather than
592 592 # IEngineCore in list(zi.providedBy(engine)) which will only
593 593 # picks of the interfaces that are directly declared by engine.
594 594 assert IEngineBase.providedBy(engine), \
595 595 "engine passed to QueuedEngine doesn't provide IEngineBase"
596 596
597 597 self.engine = engine
598 598 self.id = engine.id
599 599 self.queued = []
600 600 self.history = {}
601 601 self.engineStatus = {}
602 602 self.currentCommand = None
603 603 self.failureObservers = []
604 604
605 605 def _get_properties(self):
606 606 return self.engine.properties
607 607
608 608 properties = property(_get_properties, lambda self, _: None)
609 609 # Queue management methods. You should not call these directly
610 610
611 611 def submitCommand(self, cmd):
612 612 """Submit command to queue."""
613 613
614 614 d = defer.Deferred()
615 615 cmd.setDeferred(d)
616 616 if self.currentCommand is not None:
617 617 if self.currentCommand.finished:
618 618 # log.msg("Running command immediately: %r" % cmd)
619 619 self.currentCommand = cmd
620 620 self.runCurrentCommand()
621 621 else: # command is still running
622 622 # log.msg("Command is running: %r" % self.currentCommand)
623 623 # log.msg("Queueing: %r" % cmd)
624 624 self.queued.append(cmd)
625 625 else:
626 626 # log.msg("No current commands, running: %r" % cmd)
627 627 self.currentCommand = cmd
628 628 self.runCurrentCommand()
629 629 return d
630 630
631 631 def runCurrentCommand(self):
632 632 """Run current command."""
633 633
634 634 cmd = self.currentCommand
635 635 f = getattr(self.engine, cmd.remoteMethod, None)
636 636 if f:
637 637 d = f(*cmd.args, **cmd.kwargs)
638 638 if cmd.remoteMethod is 'execute':
639 639 d.addCallback(self.saveResult)
640 640 d.addCallback(self.finishCommand)
641 641 d.addErrback(self.abortCommand)
642 642 else:
643 643 return defer.fail(AttributeError(cmd.remoteMethod))
644 644
645 645 def _flushQueue(self):
646 646 """Pop next command in queue and run it."""
647 647
648 648 if len(self.queued) > 0:
649 649 self.currentCommand = self.queued.pop(0)
650 650 self.runCurrentCommand()
651 651
652 652 def saveResult(self, result):
653 653 """Put the result in the history."""
654 654 self.history[result['number']] = result
655 655 return result
656 656
657 657 def finishCommand(self, result):
658 658 """Finish currrent command."""
659 659
660 660 # The order of these commands is absolutely critical.
661 661 self.currentCommand.handleResult(result)
662 662 self.currentCommand.finished = True
663 663 self._flushQueue()
664 664 return result
665 665
666 666 def abortCommand(self, reason):
667 667 """Abort current command.
668 668
669 669 This eats the Failure but first passes it onto the Deferred that the
670 670 user has.
671 671
672 672 It also clear out the queue so subsequence commands don't run.
673 673 """
674 674
675 675 # The order of these 3 commands is absolutely critical. The currentCommand
676 676 # must first be marked as finished BEFORE the queue is cleared and before
677 677 # the current command is sent the failure.
678 678 # Also, the queue must be cleared BEFORE the current command is sent the Failure
679 679 # otherwise the errback chain could trigger new commands to be added to the
680 680 # queue before we clear it. We should clear ONLY the commands that were in
681 681 # the queue when the error occured.
682 682 self.currentCommand.finished = True
683 683 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
684 684 self.clear_queue(msg=s)
685 685 self.currentCommand.handleError(reason)
686 686
687 687 return None
688 688
689 689 #---------------------------------------------------------------------------
690 690 # IEngineCore methods
691 691 #---------------------------------------------------------------------------
692 692
693 693 @queue
694 694 def execute(self, lines):
695 695 pass
696
696
697 697 @queue
698 698 def push(self, namespace):
699 699 pass
700 700
701 701 @queue
702 702 def pull(self, keys):
703 703 pass
704 704
705 705 @queue
706 706 def push_function(self, namespace):
707 707 pass
708 708
709 709 @queue
710 710 def pull_function(self, keys):
711 711 pass
712 712
713 713 def get_result(self, i=None):
714 714 if i is None:
715 715 i = max(self.history.keys()+[None])
716 716
717 717 cmd = self.history.get(i, None)
718 718 # Uncomment this line to disable chaching of results
719 719 #cmd = None
720 720 if cmd is None:
721 721 return self.submitCommand(Command('get_result', i))
722 722 else:
723 723 return defer.succeed(cmd)
724 724
725 725 def reset(self):
726 726 self.clear_queue()
727 727 self.history = {} # reset the cache - I am not sure we should do this
728 728 return self.submitCommand(Command('reset'))
729 729
730 730 def kill(self):
731 731 self.clear_queue()
732 732 return self.submitCommand(Command('kill'))
733 733
734 734 @queue
735 735 def keys(self):
736 736 pass
737 737
738 738 #---------------------------------------------------------------------------
739 739 # IEngineSerialized methods
740 740 #---------------------------------------------------------------------------
741 741
742 742 @queue
743 743 def push_serialized(self, namespace):
744 744 pass
745 745
746 746 @queue
747 747 def pull_serialized(self, keys):
748 748 pass
749 749
750 750 #---------------------------------------------------------------------------
751 751 # IEngineProperties methods
752 752 #---------------------------------------------------------------------------
753 753
754 754 @queue
755 755 def set_properties(self, namespace):
756 756 pass
757 757
758 758 @queue
759 759 def get_properties(self, keys=None):
760 760 pass
761 761
762 762 @queue
763 763 def del_properties(self, keys):
764 764 pass
765 765
766 766 @queue
767 767 def has_properties(self, keys):
768 768 pass
769 769
770 770 @queue
771 771 def clear_properties(self):
772 772 pass
773 773
774 774 #---------------------------------------------------------------------------
775 775 # IQueuedEngine methods
776 776 #---------------------------------------------------------------------------
777 777
778 778 def clear_queue(self, msg=''):
779 779 """Clear the queue, but doesn't cancel the currently running commmand."""
780 780
781 781 for cmd in self.queued:
782 782 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
783 783 self.queued = []
784 784 return defer.succeed(None)
785 785
786 786 def queue_status(self):
787 787 if self.currentCommand is not None:
788 788 if self.currentCommand.finished:
789 789 pending = repr(None)
790 790 else:
791 791 pending = repr(self.currentCommand)
792 792 else:
793 793 pending = repr(None)
794 794 dikt = {'queue':map(repr,self.queued), 'pending':pending}
795 795 return defer.succeed(dikt)
796 796
797 797 def register_failure_observer(self, obs):
798 798 self.failureObservers.append(obs)
799 799
800 800 def unregister_failure_observer(self, obs):
801 801 self.failureObservers.remove(obs)
802 802
803 803
804 804 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
805 805 # IEngineQueued.
806 806 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
807 807
808 808
809 809 class Command(object):
810 810 """A command object that encapslates queued commands.
811 811
812 812 This class basically keeps track of a command that has been queued
813 813 in a QueuedEngine. It manages the deferreds and hold the method to be called
814 814 and the arguments to that method.
815 815 """
816 816
817 817
818 818 def __init__(self, remoteMethod, *args, **kwargs):
819 819 """Build a new Command object."""
820 820
821 821 self.remoteMethod = remoteMethod
822 822 self.args = args
823 823 self.kwargs = kwargs
824 824 self.finished = False
825 825
826 826 def setDeferred(self, d):
827 827 """Sets the deferred attribute of the Command."""
828 828
829 829 self.deferred = d
830 830
831 831 def __repr__(self):
832 832 if not self.args:
833 833 args = ''
834 834 else:
835 835 args = str(self.args)[1:-2] #cut off (...,)
836 836 for k,v in self.kwargs.iteritems():
837 837 if args:
838 838 args += ', '
839 839 args += '%s=%r' %(k,v)
840 840 return "%s(%s)" %(self.remoteMethod, args)
841 841
842 842 def handleResult(self, result):
843 843 """When the result is ready, relay it to self.deferred."""
844 844
845 845 self.deferred.callback(result)
846 846
847 847 def handleError(self, reason):
848 848 """When an error has occured, relay it to self.deferred."""
849 849
850 850 self.deferred.errback(reason)
851 851
852 852 class ThreadedEngineService(EngineService):
853 853 """An EngineService subclass that defers execute commands to a separate
854 854 thread.
855 855
856 856 ThreadedEngineService uses twisted.internet.threads.deferToThread to
857 857 defer execute requests to a separate thread. GUI frontends may want to
858 858 use ThreadedEngineService as the engine in an
859 859 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
860 860 block execution from blocking the GUI thread.
861 861 """
862 862
863 863 zi.implements(IEngineBase)
864 864
865 865 def __init__(self, shellClass=Interpreter, mpi=None):
866 866 EngineService.__init__(self, shellClass, mpi)
867 867
868 868 def wrapped_execute(self, msg, lines):
869 869 """Wrap self.shell.execute to add extra information to tracebacks"""
870 870
871 871 try:
872 872 result = self.shell.execute(lines)
873 873 except Exception,e:
874 874 # This gives the following:
875 875 # et=exception class
876 876 # ev=exception class instance
877 877 # tb=traceback object
878 878 et,ev,tb = sys.exc_info()
879 879 # This call adds attributes to the exception value
880 880 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
881 881 # Add another attribute
882 882
883 883 # Create a new exception with the new attributes
884 884 e = et(ev._ipython_traceback_text)
885 885 e._ipython_engine_info = msg
886 886
887 887 # Re-raise
888 888 raise e
889 889
890 890 return result
891 891
892 892
893 893 def execute(self, lines):
894 894 # Only import this if we are going to use this class
895 895 from twisted.internet import threads
896 896
897 897 msg = {'engineid':self.id,
898 898 'method':'execute',
899 899 'args':[lines]}
900 900
901 901 d = threads.deferToThread(self.wrapped_execute, msg, lines)
902 902 d.addCallback(self.addIDToResult)
903 903 return d
@@ -1,757 +1,757 b''
1 1 # encoding: utf-8
2 2
3 3 """
4 4 Expose the multiengine controller over the Foolscap network protocol.
5 5 """
6 6
7 7 __docformat__ = "restructuredtext en"
8 8
9 9 #-------------------------------------------------------------------------------
10 10 # Copyright (C) 2008 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-------------------------------------------------------------------------------
15 15
16 16 #-------------------------------------------------------------------------------
17 17 # Imports
18 18 #-------------------------------------------------------------------------------
19 19
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22
23 23 from zope.interface import Interface, implements
24 24 from twisted.internet import defer
25 25 from twisted.python import components, failure, log
26 26
27 27 from foolscap import Referenceable
28 28
29 29 from IPython.kernel import error
30 30 from IPython.kernel.util import printer
31 31 from IPython.kernel import map as Map
32 32 from IPython.kernel.parallelfunction import ParallelFunction
33 33 from IPython.kernel.mapper import (
34 34 MultiEngineMapper,
35 35 IMultiEngineMapperFactory,
36 36 IMapper
37 37 )
38 38 from IPython.kernel.twistedutil import gatherBoth
39 39 from IPython.kernel.multiengine import (MultiEngine,
40 40 IMultiEngine,
41 41 IFullSynchronousMultiEngine,
42 42 ISynchronousMultiEngine)
43 43 from IPython.kernel.multiengineclient import wrapResultList
44 44 from IPython.kernel.pendingdeferred import PendingDeferredManager
45 45 from IPython.kernel.pickleutil import (can, canDict,
46 46 canSequence, uncan, uncanDict, uncanSequence)
47 47
48 48 from IPython.kernel.clientinterfaces import (
49 49 IFCClientInterfaceProvider,
50 50 IBlockingClientAdaptor
51 51 )
52 52
53 53 # Needed to access the true globals from __main__.__dict__
54 54 import __main__
55 55
56 56 #-------------------------------------------------------------------------------
57 57 # The Controller side of things
58 58 #-------------------------------------------------------------------------------
59 59
60 60 def packageResult(wrappedMethod):
61 61
62 62 def wrappedPackageResult(self, *args, **kwargs):
63 63 d = wrappedMethod(self, *args, **kwargs)
64 64 d.addCallback(self.packageSuccess)
65 65 d.addErrback(self.packageFailure)
66 66 return d
67 67 return wrappedPackageResult
68 68
69 69
70 70 class IFCSynchronousMultiEngine(Interface):
71 71 """Foolscap interface to `ISynchronousMultiEngine`.
72 72
73 73 The methods in this interface are similar to those of
74 74 `ISynchronousMultiEngine`, but their arguments and return values are pickled
75 75 if they are not already simple Python types that can be send over XML-RPC.
76 76
77 77 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
78 78 documentation about the methods.
79 79
80 80 Most methods in this interface act like the `ISynchronousMultiEngine`
81 81 versions and can be called in blocking or non-blocking mode.
82 82 """
83 83 pass
84 84
85 85
86 86 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
87 87 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
88 88 """
89 89
90 90 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
91 91
92 92 addSlash = True
93 93
94 94 def __init__(self, multiengine):
95 95 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
96 96 # it. This allow this class to do two adaptation steps.
97 97 self.smultiengine = ISynchronousMultiEngine(multiengine)
98 98 self._deferredIDCallbacks = {}
99 99
100 100 #---------------------------------------------------------------------------
101 101 # Non interface methods
102 102 #---------------------------------------------------------------------------
103 103
104 104 def packageFailure(self, f):
105 105 f.cleanFailure()
106 106 return self.packageSuccess(f)
107 107
108 108 def packageSuccess(self, obj):
109 109 serial = pickle.dumps(obj, 2)
110 110 return serial
111 111
112 112 #---------------------------------------------------------------------------
113 113 # Things related to PendingDeferredManager
114 114 #---------------------------------------------------------------------------
115 115
116 116 @packageResult
117 117 def remote_get_pending_deferred(self, deferredID, block):
118 118 d = self.smultiengine.get_pending_deferred(deferredID, block)
119 119 try:
120 120 callback = self._deferredIDCallbacks.pop(deferredID)
121 121 except KeyError:
122 122 callback = None
123 123 if callback is not None:
124 124 d.addCallback(callback[0], *callback[1], **callback[2])
125 125 return d
126 126
127 127 @packageResult
128 128 def remote_clear_pending_deferreds(self):
129 129 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
130 130
131 131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
132 132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
133 133 return did
134
134
135 135 #---------------------------------------------------------------------------
136 136 # IEngineMultiplexer related methods
137 137 #---------------------------------------------------------------------------
138 138
139 139 @packageResult
140 140 def remote_execute(self, lines, targets, block):
141 141 return self.smultiengine.execute(lines, targets=targets, block=block)
142 142
143 143 @packageResult
144 144 def remote_push(self, binaryNS, targets, block):
145 145 try:
146 146 namespace = pickle.loads(binaryNS)
147 147 except:
148 148 d = defer.fail(failure.Failure())
149 149 else:
150 150 d = self.smultiengine.push(namespace, targets=targets, block=block)
151 151 return d
152 152
153 153 @packageResult
154 154 def remote_pull(self, keys, targets, block):
155 155 d = self.smultiengine.pull(keys, targets=targets, block=block)
156 156 return d
157 157
158 158 @packageResult
159 159 def remote_push_function(self, binaryNS, targets, block):
160 160 try:
161 161 namespace = pickle.loads(binaryNS)
162 162 except:
163 163 d = defer.fail(failure.Failure())
164 164 else:
165 165 namespace = uncanDict(namespace)
166 166 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
167 167 return d
168 168
169 169 def _canMultipleKeys(self, result):
170 170 return [canSequence(r) for r in result]
171 171
172 172 @packageResult
173 173 def remote_pull_function(self, keys, targets, block):
174 174 def can_functions(r, keys):
175 175 if len(keys)==1 or isinstance(keys, str):
176 176 result = canSequence(r)
177 177 elif len(keys)>1:
178 178 result = [canSequence(s) for s in r]
179 179 return result
180 180 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
181 181 if block:
182 182 d.addCallback(can_functions, keys)
183 183 else:
184 184 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
185 185 return d
186 186
187 187 @packageResult
188 188 def remote_push_serialized(self, binaryNS, targets, block):
189 189 try:
190 190 namespace = pickle.loads(binaryNS)
191 191 except:
192 192 d = defer.fail(failure.Failure())
193 193 else:
194 194 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
195 195 return d
196 196
197 197 @packageResult
198 198 def remote_pull_serialized(self, keys, targets, block):
199 199 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
200 200 return d
201 201
202 202 @packageResult
203 203 def remote_get_result(self, i, targets, block):
204 204 if i == 'None':
205 205 i = None
206 206 return self.smultiengine.get_result(i, targets=targets, block=block)
207 207
208 208 @packageResult
209 209 def remote_reset(self, targets, block):
210 210 return self.smultiengine.reset(targets=targets, block=block)
211 211
212 212 @packageResult
213 213 def remote_keys(self, targets, block):
214 214 return self.smultiengine.keys(targets=targets, block=block)
215 215
216 216 @packageResult
217 217 def remote_kill(self, controller, targets, block):
218 218 return self.smultiengine.kill(controller, targets=targets, block=block)
219 219
220 220 @packageResult
221 221 def remote_clear_queue(self, targets, block):
222 222 return self.smultiengine.clear_queue(targets=targets, block=block)
223 223
224 224 @packageResult
225 225 def remote_queue_status(self, targets, block):
226 226 return self.smultiengine.queue_status(targets=targets, block=block)
227 227
228 228 @packageResult
229 229 def remote_set_properties(self, binaryNS, targets, block):
230 230 try:
231 231 ns = pickle.loads(binaryNS)
232 232 except:
233 233 d = defer.fail(failure.Failure())
234 234 else:
235 235 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
236 236 return d
237 237
238 238 @packageResult
239 239 def remote_get_properties(self, keys, targets, block):
240 240 if keys=='None':
241 241 keys=None
242 242 return self.smultiengine.get_properties(keys, targets=targets, block=block)
243 243
244 244 @packageResult
245 245 def remote_has_properties(self, keys, targets, block):
246 246 return self.smultiengine.has_properties(keys, targets=targets, block=block)
247 247
248 248 @packageResult
249 249 def remote_del_properties(self, keys, targets, block):
250 250 return self.smultiengine.del_properties(keys, targets=targets, block=block)
251 251
252 252 @packageResult
253 253 def remote_clear_properties(self, targets, block):
254 254 return self.smultiengine.clear_properties(targets=targets, block=block)
255 255
256 256 #---------------------------------------------------------------------------
257 257 # IMultiEngine related methods
258 258 #---------------------------------------------------------------------------
259 259
260 260 def remote_get_ids(self):
261 261 """Get the ids of the registered engines.
262 262
263 263 This method always blocks.
264 264 """
265 265 return self.smultiengine.get_ids()
266 266
267 267 #---------------------------------------------------------------------------
268 268 # IFCClientInterfaceProvider related methods
269 269 #---------------------------------------------------------------------------
270 270
271 271 def remote_get_client_name(self):
272 272 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
273 273
274 274
275 275 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
276 276 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
277 277 # two phase adaptation.
278 278 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
279 279 IMultiEngine, IFCSynchronousMultiEngine)
280 280
281 281
282 282 #-------------------------------------------------------------------------------
283 283 # The Client side of things
284 284 #-------------------------------------------------------------------------------
285 285
286 286
287 287 class FCFullSynchronousMultiEngineClient(object):
288 288
289 289 implements(
290 290 IFullSynchronousMultiEngine,
291 291 IBlockingClientAdaptor,
292 292 IMultiEngineMapperFactory,
293 293 IMapper
294 294 )
295 295
296 296 def __init__(self, remote_reference):
297 297 self.remote_reference = remote_reference
298 298 self._deferredIDCallbacks = {}
299 299 # This class manages some pending deferreds through this instance. This
300 300 # is required for methods like gather/scatter as it enables us to
301 301 # create our own pending deferreds for composite operations.
302 302 self.pdm = PendingDeferredManager()
303 303
304 304 #---------------------------------------------------------------------------
305 305 # Non interface methods
306 306 #---------------------------------------------------------------------------
307 307
308 308 def unpackage(self, r):
309 309 return pickle.loads(r)
310 310
311 311 #---------------------------------------------------------------------------
312 312 # Things related to PendingDeferredManager
313 313 #---------------------------------------------------------------------------
314 314
315 315 def get_pending_deferred(self, deferredID, block=True):
316 316
317 317 # Because we are managing some pending deferreds locally (through
318 318 # self.pdm) and some remotely (on the controller), we first try the
319 319 # local one and then the remote one.
320 320 if self.pdm.quick_has_id(deferredID):
321 321 d = self.pdm.get_pending_deferred(deferredID, block)
322 322 return d
323 323 else:
324 324 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
325 325 d.addCallback(self.unpackage)
326 326 try:
327 327 callback = self._deferredIDCallbacks.pop(deferredID)
328 328 except KeyError:
329 329 callback = None
330 330 if callback is not None:
331 331 d.addCallback(callback[0], *callback[1], **callback[2])
332 332 return d
333 333
334 334 def clear_pending_deferreds(self):
335 335
336 336 # This clear both the local (self.pdm) and remote pending deferreds
337 337 self.pdm.clear_pending_deferreds()
338 338 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
339 339 d2.addCallback(self.unpackage)
340 340 return d2
341 341
342 342 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
343 343 self._deferredIDCallbacks[did] = (callback, args, kwargs)
344 344 return did
345 345
346 346 #---------------------------------------------------------------------------
347 347 # IEngineMultiplexer related methods
348 348 #---------------------------------------------------------------------------
349
349
350 350 def execute(self, lines, targets='all', block=True):
351 351 d = self.remote_reference.callRemote('execute', lines, targets, block)
352 352 d.addCallback(self.unpackage)
353 353 return d
354 354
355 355 def push(self, namespace, targets='all', block=True):
356 356 serial = pickle.dumps(namespace, 2)
357 357 d = self.remote_reference.callRemote('push', serial, targets, block)
358 358 d.addCallback(self.unpackage)
359 359 return d
360 360
361 361 def pull(self, keys, targets='all', block=True):
362 362 d = self.remote_reference.callRemote('pull', keys, targets, block)
363 363 d.addCallback(self.unpackage)
364 364 return d
365 365
366 366 def push_function(self, namespace, targets='all', block=True):
367 367 cannedNamespace = canDict(namespace)
368 368 serial = pickle.dumps(cannedNamespace, 2)
369 369 d = self.remote_reference.callRemote('push_function', serial, targets, block)
370 370 d.addCallback(self.unpackage)
371 371 return d
372 372
373 373 def pull_function(self, keys, targets='all', block=True):
374 374 def uncan_functions(r, keys):
375 375 if len(keys)==1 or isinstance(keys, str):
376 376 return uncanSequence(r)
377 377 elif len(keys)>1:
378 378 return [uncanSequence(s) for s in r]
379 379 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
380 380 if block:
381 381 d.addCallback(self.unpackage)
382 382 d.addCallback(uncan_functions, keys)
383 383 else:
384 384 d.addCallback(self.unpackage)
385 385 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
386 386 return d
387 387
388 388 def push_serialized(self, namespace, targets='all', block=True):
389 389 cannedNamespace = canDict(namespace)
390 390 serial = pickle.dumps(cannedNamespace, 2)
391 391 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
392 392 d.addCallback(self.unpackage)
393 393 return d
394 394
395 395 def pull_serialized(self, keys, targets='all', block=True):
396 396 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
397 397 d.addCallback(self.unpackage)
398 398 return d
399 399
400 400 def get_result(self, i=None, targets='all', block=True):
401 401 if i is None: # This is because None cannot be marshalled by xml-rpc
402 402 i = 'None'
403 403 d = self.remote_reference.callRemote('get_result', i, targets, block)
404 404 d.addCallback(self.unpackage)
405 405 return d
406 406
407 407 def reset(self, targets='all', block=True):
408 408 d = self.remote_reference.callRemote('reset', targets, block)
409 409 d.addCallback(self.unpackage)
410 410 return d
411 411
412 412 def keys(self, targets='all', block=True):
413 413 d = self.remote_reference.callRemote('keys', targets, block)
414 414 d.addCallback(self.unpackage)
415 415 return d
416 416
417 417 def kill(self, controller=False, targets='all', block=True):
418 418 d = self.remote_reference.callRemote('kill', controller, targets, block)
419 419 d.addCallback(self.unpackage)
420 420 return d
421 421
422 422 def clear_queue(self, targets='all', block=True):
423 423 d = self.remote_reference.callRemote('clear_queue', targets, block)
424 424 d.addCallback(self.unpackage)
425 425 return d
426 426
427 427 def queue_status(self, targets='all', block=True):
428 428 d = self.remote_reference.callRemote('queue_status', targets, block)
429 429 d.addCallback(self.unpackage)
430 430 return d
431 431
432 432 def set_properties(self, properties, targets='all', block=True):
433 433 serial = pickle.dumps(properties, 2)
434 434 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
435 435 d.addCallback(self.unpackage)
436 436 return d
437 437
438 438 def get_properties(self, keys=None, targets='all', block=True):
439 439 if keys==None:
440 440 keys='None'
441 441 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
442 442 d.addCallback(self.unpackage)
443 443 return d
444 444
445 445 def has_properties(self, keys, targets='all', block=True):
446 446 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
447 447 d.addCallback(self.unpackage)
448 448 return d
449 449
450 450 def del_properties(self, keys, targets='all', block=True):
451 451 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
452 452 d.addCallback(self.unpackage)
453 453 return d
454 454
455 455 def clear_properties(self, targets='all', block=True):
456 456 d = self.remote_reference.callRemote('clear_properties', targets, block)
457 457 d.addCallback(self.unpackage)
458 458 return d
459 459
460 460 #---------------------------------------------------------------------------
461 461 # IMultiEngine related methods
462 462 #---------------------------------------------------------------------------
463 463
464 464 def get_ids(self):
465 465 d = self.remote_reference.callRemote('get_ids')
466 466 return d
467 467
468 468 #---------------------------------------------------------------------------
469 469 # ISynchronousMultiEngineCoordinator related methods
470 470 #---------------------------------------------------------------------------
471 471
472 472 def _process_targets(self, targets):
473 473 def create_targets(ids):
474 474 if isinstance(targets, int):
475 475 engines = [targets]
476 476 elif targets=='all':
477 477 engines = ids
478 478 elif isinstance(targets, (list, tuple)):
479 479 engines = targets
480 480 for t in engines:
481 481 if not t in ids:
482 482 raise error.InvalidEngineID("engine with id %r does not exist"%t)
483 483 return engines
484 484
485 485 d = self.get_ids()
486 486 d.addCallback(create_targets)
487 487 return d
488 488
489 489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
490 490
491 491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
492 492 # This enables us to collect a bunch fo deferred ids and make a secondary
493 493 # deferred id that corresponds to the entire group. This logic is extremely
494 494 # difficult to get right though.
495 495 def do_scatter(engines):
496 496 nEngines = len(engines)
497 497 mapClass = Map.dists[dist]
498 498 mapObject = mapClass()
499 499 d_list = []
500 500 # Loop through and push to each engine in non-blocking mode.
501 501 # This returns a set of deferreds to deferred_ids
502 502 for index, engineid in enumerate(engines):
503 503 partition = mapObject.getPartition(seq, index, nEngines)
504 504 if flatten and len(partition) == 1:
505 505 d = self.push({key: partition[0]}, targets=engineid, block=False)
506 506 else:
507 507 d = self.push({key: partition}, targets=engineid, block=False)
508 508 d_list.append(d)
509 509 # Collect the deferred to deferred_ids
510 510 d = gatherBoth(d_list,
511 511 fireOnOneErrback=0,
512 512 consumeErrors=1,
513 513 logErrors=0)
514 514 # Now d has a list of deferred_ids or Failures coming
515 515 d.addCallback(error.collect_exceptions, 'scatter')
516 516 def process_did_list(did_list):
517 517 """Turn a list of deferred_ids into a final result or failure."""
518 518 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
519 519 final_d = gatherBoth(new_d_list,
520 520 fireOnOneErrback=0,
521 521 consumeErrors=1,
522 522 logErrors=0)
523 523 final_d.addCallback(error.collect_exceptions, 'scatter')
524 524 final_d.addCallback(lambda lop: [i[0] for i in lop])
525 525 return final_d
526 526 # Now, depending on block, we need to handle the list deferred_ids
527 527 # coming down the pipe diferently.
528 528 if block:
529 529 # If we are blocking register a callback that will transform the
530 530 # list of deferred_ids into the final result.
531 531 d.addCallback(process_did_list)
532 532 return d
533 533 else:
534 534 # Here we are going to use a _local_ PendingDeferredManager.
535 535 deferred_id = self.pdm.get_deferred_id()
536 536 # This is the deferred we will return to the user that will fire
537 537 # with the local deferred_id AFTER we have received the list of
538 538 # primary deferred_ids
539 539 d_to_return = defer.Deferred()
540 540 def do_it(did_list):
541 541 """Produce a deferred to the final result, but first fire the
542 542 deferred we will return to the user that has the local
543 543 deferred id."""
544 544 d_to_return.callback(deferred_id)
545 545 return process_did_list(did_list)
546 546 d.addCallback(do_it)
547 547 # Now save the deferred to the final result
548 548 self.pdm.save_pending_deferred(d, deferred_id)
549 549 return d_to_return
550 550
551 551 d = self._process_targets(targets)
552 552 d.addCallback(do_scatter)
553 553 return d
554 554
555 555 def gather(self, key, dist='b', targets='all', block=True):
556 556
557 557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
558 558 # This enables us to collect a bunch fo deferred ids and make a secondary
559 559 # deferred id that corresponds to the entire group. This logic is extremely
560 560 # difficult to get right though.
561 561 def do_gather(engines):
562 562 nEngines = len(engines)
563 563 mapClass = Map.dists[dist]
564 564 mapObject = mapClass()
565 565 d_list = []
566 566 # Loop through and push to each engine in non-blocking mode.
567 567 # This returns a set of deferreds to deferred_ids
568 568 for index, engineid in enumerate(engines):
569 569 d = self.pull(key, targets=engineid, block=False)
570 570 d_list.append(d)
571 571 # Collect the deferred to deferred_ids
572 572 d = gatherBoth(d_list,
573 573 fireOnOneErrback=0,
574 574 consumeErrors=1,
575 575 logErrors=0)
576 576 # Now d has a list of deferred_ids or Failures coming
577 577 d.addCallback(error.collect_exceptions, 'scatter')
578 578 def process_did_list(did_list):
579 579 """Turn a list of deferred_ids into a final result or failure."""
580 580 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
581 581 final_d = gatherBoth(new_d_list,
582 582 fireOnOneErrback=0,
583 583 consumeErrors=1,
584 584 logErrors=0)
585 585 final_d.addCallback(error.collect_exceptions, 'gather')
586 586 final_d.addCallback(lambda lop: [i[0] for i in lop])
587 587 final_d.addCallback(mapObject.joinPartitions)
588 588 return final_d
589 589 # Now, depending on block, we need to handle the list deferred_ids
590 590 # coming down the pipe diferently.
591 591 if block:
592 592 # If we are blocking register a callback that will transform the
593 593 # list of deferred_ids into the final result.
594 594 d.addCallback(process_did_list)
595 595 return d
596 596 else:
597 597 # Here we are going to use a _local_ PendingDeferredManager.
598 598 deferred_id = self.pdm.get_deferred_id()
599 599 # This is the deferred we will return to the user that will fire
600 600 # with the local deferred_id AFTER we have received the list of
601 601 # primary deferred_ids
602 602 d_to_return = defer.Deferred()
603 603 def do_it(did_list):
604 604 """Produce a deferred to the final result, but first fire the
605 605 deferred we will return to the user that has the local
606 606 deferred id."""
607 607 d_to_return.callback(deferred_id)
608 608 return process_did_list(did_list)
609 609 d.addCallback(do_it)
610 610 # Now save the deferred to the final result
611 611 self.pdm.save_pending_deferred(d, deferred_id)
612 612 return d_to_return
613 613
614 614 d = self._process_targets(targets)
615 615 d.addCallback(do_gather)
616 616 return d
617 617
618 618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
619 619 """
620 620 A parallelized version of Python's builtin map.
621 621
622 622 This has a slightly different syntax than the builtin `map`.
623 623 This is needed because we need to have keyword arguments and thus
624 624 can't use *args to capture all the sequences. Instead, they must
625 625 be passed in a list or tuple.
626 626
627 627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628 628
629 629 Most users will want to use parallel functions or the `mapper`
630 630 and `map` methods for an API that follows that of the builtin
631 631 `map`.
632 632 """
633 633 if not isinstance(sequences, (list, tuple)):
634 634 raise TypeError('sequences must be a list or tuple')
635 635 max_len = max(len(s) for s in sequences)
636 636 for s in sequences:
637 637 if len(s)!=max_len:
638 638 raise ValueError('all sequences must have equal length')
639 639 if isinstance(func, FunctionType):
640 640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
641 641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
642 642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
643 643 elif isinstance(func, str):
644 644 d = defer.succeed(None)
645 645 sourceToRun = \
646 646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
647 647 else:
648 648 raise TypeError("func must be a function or str")
649 649
650 650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
651 651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
652 652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
653 653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
654 654 return d
655 655
656 656 def map(self, func, *sequences):
657 657 """
658 658 A parallel version of Python's builtin `map` function.
659 659
660 660 This method applies a function to sequences of arguments. It
661 661 follows the same syntax as the builtin `map`.
662 662
663 663 This method creates a mapper objects by calling `self.mapper` with
664 664 no arguments and then uses that mapper to do the mapping. See
665 665 the documentation of `mapper` for more details.
666 666 """
667 667 return self.mapper().map(func, *sequences)
668 668
669 669 def mapper(self, dist='b', targets='all', block=True):
670 670 """
671 671 Create a mapper object that has a `map` method.
672 672
673 673 This method returns an object that implements the `IMapper`
674 674 interface. This method is a factory that is used to control how
675 675 the map happens.
676 676
677 677 :Parameters:
678 678 dist : str
679 679 What decomposition to use, 'b' is the only one supported
680 680 currently
681 681 targets : str, int, sequence of ints
682 682 Which engines to use for the map
683 683 block : boolean
684 684 Should calls to `map` block or not
685 685 """
686 686 return MultiEngineMapper(self, dist, targets, block)
687 687
688 688 def parallel(self, dist='b', targets='all', block=True):
689 689 """
690 690 A decorator that turns a function into a parallel function.
691 691
692 692 This can be used as:
693 693
694 694 @parallel()
695 695 def f(x, y)
696 696 ...
697 697
698 698 f(range(10), range(10))
699 699
700 700 This causes f(0,0), f(1,1), ... to be called in parallel.
701 701
702 702 :Parameters:
703 703 dist : str
704 704 What decomposition to use, 'b' is the only one supported
705 705 currently
706 706 targets : str, int, sequence of ints
707 707 Which engines to use for the map
708 708 block : boolean
709 709 Should calls to `map` block or not
710 710 """
711 711 mapper = self.mapper(dist, targets, block)
712 712 pf = ParallelFunction(mapper)
713 713 return pf
714 714
715 715 #---------------------------------------------------------------------------
716 716 # ISynchronousMultiEngineExtras related methods
717 717 #---------------------------------------------------------------------------
718 718
719 719 def _transformPullResult(self, pushResult, multitargets, lenKeys):
720 720 if not multitargets:
721 721 result = pushResult[0]
722 722 elif lenKeys > 1:
723 723 result = zip(*pushResult)
724 724 elif lenKeys is 1:
725 725 result = list(pushResult)
726 726 return result
727 727
728 728 def zip_pull(self, keys, targets='all', block=True):
729 729 multitargets = not isinstance(targets, int) and len(targets) > 1
730 730 lenKeys = len(keys)
731 731 d = self.pull(keys, targets=targets, block=block)
732 732 if block:
733 733 d.addCallback(self._transformPullResult, multitargets, lenKeys)
734 734 else:
735 735 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
736 736 return d
737 737
738 738 def run(self, fname, targets='all', block=True):
739 739 fileobj = open(fname,'r')
740 740 source = fileobj.read()
741 741 fileobj.close()
742 742 # if the compilation blows, we get a local error right away
743 743 try:
744 744 code = compile(source,fname,'exec')
745 745 except:
746 746 return defer.fail(failure.Failure())
747 747 # Now run the code
748 748 d = self.execute(source, targets=targets, block=block)
749 749 return d
750 750
751 751 #---------------------------------------------------------------------------
752 752 # IBlockingClientAdaptor related methods
753 753 #---------------------------------------------------------------------------
754 754
755 755 def adapt_to_blocking_client(self):
756 756 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
757 757 return IFullBlockingMultiEngineClient(self)
@@ -1,521 +1,723 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 import tempfile
21 22 pjoin = os.path.join
22 23
23 24 from twisted.internet import reactor, defer
24 25 from twisted.internet.protocol import ProcessProtocol
25 26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 27 from twisted.internet.utils import getProcessOutput
27 28 from twisted.python import failure, log
28 29
29 30 from IPython.external import argparse
30 31 from IPython.external import Itpl
31 32 from IPython.genutils import get_ipython_dir, num_cpus
32 33 from IPython.kernel.fcutil import have_crypto
33 34 from IPython.kernel.error import SecurityError
34 35 from IPython.kernel.fcutil import have_crypto
35 36 from IPython.kernel.twistedutil import gatherBoth
36 37 from IPython.kernel.util import printer
37 38
38 39
39 40 #-----------------------------------------------------------------------------
40 41 # General process handling code
41 42 #-----------------------------------------------------------------------------
42 43
43 44 def find_exe(cmd):
44 45 try:
45 46 import win32api
46 47 except ImportError:
47 48 raise ImportError('you need to have pywin32 installed for this to work')
48 49 else:
49 50 try:
50 51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 52 except:
52 53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
53 54 return path
54 55
55 56 class ProcessStateError(Exception):
56 57 pass
57 58
58 59 class UnknownStatus(Exception):
59 60 pass
60 61
61 62 class LauncherProcessProtocol(ProcessProtocol):
62 63 """
63 64 A ProcessProtocol to go with the ProcessLauncher.
64 65 """
65 66 def __init__(self, process_launcher):
66 67 self.process_launcher = process_launcher
67 68
68 69 def connectionMade(self):
69 70 self.process_launcher.fire_start_deferred(self.transport.pid)
70 71
71 72 def processEnded(self, status):
72 73 value = status.value
73 74 if isinstance(value, ProcessDone):
74 75 self.process_launcher.fire_stop_deferred(0)
75 76 elif isinstance(value, ProcessTerminated):
76 77 self.process_launcher.fire_stop_deferred(
77 78 {'exit_code':value.exitCode,
78 79 'signal':value.signal,
79 80 'status':value.status
80 81 }
81 82 )
82 83 else:
83 84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84
85
85 86 def outReceived(self, data):
86 87 log.msg(data)
87
88
88 89 def errReceived(self, data):
89 90 log.err(data)
90 91
91 92 class ProcessLauncher(object):
92 93 """
93 94 Start and stop an external process in an asynchronous manner.
94 95
95 96 Currently this uses deferreds to notify other parties of process state
96 97 changes. This is an awkward design and should be moved to using
97 98 a formal NotificationCenter.
98 99 """
99 100 def __init__(self, cmd_and_args):
100 101 self.cmd = cmd_and_args[0]
101 102 self.args = cmd_and_args
102 103 self._reset()
103 104
104 105 def _reset(self):
105 106 self.process_protocol = None
106 107 self.pid = None
107 108 self.start_deferred = None
108 109 self.stop_deferreds = []
109 110 self.state = 'before' # before, running, or after
110 111
111 112 @property
112 113 def running(self):
113 114 if self.state == 'running':
114 115 return True
115 116 else:
116 117 return False
117 118
118 119 def fire_start_deferred(self, pid):
119 120 self.pid = pid
120 121 self.state = 'running'
121 122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
122 123 self.start_deferred.callback(pid)
123 124
124 125 def start(self):
125 126 if self.state == 'before':
126 127 self.process_protocol = LauncherProcessProtocol(self)
127 128 self.start_deferred = defer.Deferred()
128 129 self.process_transport = reactor.spawnProcess(
129 130 self.process_protocol,
130 131 self.cmd,
131 132 self.args,
132 133 env=os.environ
133 134 )
134 135 return self.start_deferred
135 136 else:
136 137 s = 'the process has already been started and has state: %r' % \
137 138 self.state
138 139 return defer.fail(ProcessStateError(s))
139 140
140 141 def get_stop_deferred(self):
141 142 if self.state == 'running' or self.state == 'before':
142 143 d = defer.Deferred()
143 144 self.stop_deferreds.append(d)
144 145 return d
145 146 else:
146 147 s = 'this process is already complete'
147 148 return defer.fail(ProcessStateError(s))
148 149
149 150 def fire_stop_deferred(self, exit_code):
150 151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
151 152 self.state = 'after'
152 153 for d in self.stop_deferreds:
153 154 d.callback(exit_code)
154 155
155 156 def signal(self, sig):
156 157 """
157 158 Send a signal to the process.
158 159
159 160 The argument sig can be ('KILL','INT', etc.) or any signal number.
160 161 """
161 162 if self.state == 'running':
162 163 self.process_transport.signalProcess(sig)
163 164
164 165 # def __del__(self):
165 166 # self.signal('KILL')
166 167
167 168 def interrupt_then_kill(self, delay=1.0):
168 169 self.signal('INT')
169 170 reactor.callLater(delay, self.signal, 'KILL')
170 171
171 172
172 173 #-----------------------------------------------------------------------------
173 174 # Code for launching controller and engines
174 175 #-----------------------------------------------------------------------------
175 176
176 177
177 178 class ControllerLauncher(ProcessLauncher):
178 179
179 180 def __init__(self, extra_args=None):
180 181 if sys.platform == 'win32':
181 182 # This logic is needed because the ipcontroller script doesn't
182 183 # always get installed in the same way or in the same location.
183 184 from IPython.kernel.scripts import ipcontroller
184 185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
185 186 # The -u option here turns on unbuffered output, which is required
186 187 # on Win32 to prevent wierd conflict and problems with Twisted
187 188 args = [find_exe('python'), '-u', script_location]
188 189 else:
189 190 args = ['ipcontroller']
190 191 self.extra_args = extra_args
191 192 if extra_args is not None:
192 193 args.extend(extra_args)
193 194
194 195 ProcessLauncher.__init__(self, args)
195 196
196 197
197 198 class EngineLauncher(ProcessLauncher):
198 199
199 200 def __init__(self, extra_args=None):
200 201 if sys.platform == 'win32':
201 202 # This logic is needed because the ipcontroller script doesn't
202 203 # always get installed in the same way or in the same location.
203 204 from IPython.kernel.scripts import ipengine
204 205 script_location = ipengine.__file__.replace('.pyc', '.py')
205 206 # The -u option here turns on unbuffered output, which is required
206 207 # on Win32 to prevent wierd conflict and problems with Twisted
207 208 args = [find_exe('python'), '-u', script_location]
208 209 else:
209 210 args = ['ipengine']
210 211 self.extra_args = extra_args
211 212 if extra_args is not None:
212 213 args.extend(extra_args)
213 214
214 215 ProcessLauncher.__init__(self, args)
215 216
216 217
217 218 class LocalEngineSet(object):
218 219
219 220 def __init__(self, extra_args=None):
220 221 self.extra_args = extra_args
221 222 self.launchers = []
222 223
223 224 def start(self, n):
224 225 dlist = []
225 226 for i in range(n):
226 227 el = EngineLauncher(extra_args=self.extra_args)
227 228 d = el.start()
228 229 self.launchers.append(el)
229 230 dlist.append(d)
230 231 dfinal = gatherBoth(dlist, consumeErrors=True)
231 232 dfinal.addCallback(self._handle_start)
232 233 return dfinal
233 234
234 235 def _handle_start(self, r):
235 236 log.msg('Engines started with pids: %r' % r)
236 237 return r
237 238
238 239 def _handle_stop(self, r):
239 240 log.msg('Engines received signal: %r' % r)
240 241 return r
241 242
242 243 def signal(self, sig):
243 244 dlist = []
244 245 for el in self.launchers:
245 246 d = el.get_stop_deferred()
246 247 dlist.append(d)
247 248 el.signal(sig)
248 249 dfinal = gatherBoth(dlist, consumeErrors=True)
249 250 dfinal.addCallback(self._handle_stop)
250 251 return dfinal
251 252
252 253 def interrupt_then_kill(self, delay=1.0):
253 254 dlist = []
254 255 for el in self.launchers:
255 256 d = el.get_stop_deferred()
256 257 dlist.append(d)
257 258 el.interrupt_then_kill(delay)
258 259 dfinal = gatherBoth(dlist, consumeErrors=True)
259 260 dfinal.addCallback(self._handle_stop)
260 261 return dfinal
261 262
262 263
263 264 class BatchEngineSet(object):
264 265
265 266 # Subclasses must fill these in. See PBSEngineSet
266 267 submit_command = ''
267 268 delete_command = ''
268 269 job_id_regexp = ''
269 270
270 271 def __init__(self, template_file, **kwargs):
271 272 self.template_file = template_file
272 273 self.context = {}
273 274 self.context.update(kwargs)
274 275 self.batch_file = self.template_file+'-run'
275
276
276 277 def parse_job_id(self, output):
277 278 m = re.match(self.job_id_regexp, output)
278 279 if m is not None:
279 280 job_id = m.group()
280 281 else:
281 282 raise Exception("job id couldn't be determined: %s" % output)
282 283 self.job_id = job_id
283 284 log.msg('Job started with job id: %r' % job_id)
284 285 return job_id
285 286
286 287 def write_batch_script(self, n):
287 288 self.context['n'] = n
288 289 template = open(self.template_file, 'r').read()
289 290 log.msg('Using template for batch script: %s' % self.template_file)
290 291 script_as_string = Itpl.itplns(template, self.context)
291 292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
292 293 f = open(self.batch_file,'w')
293 294 f.write(script_as_string)
294 295 f.close()
295 296
296 297 def handle_error(self, f):
297 298 f.printTraceback()
298 299 f.raiseException()
299 300
300 301 def start(self, n):
301 302 self.write_batch_script(n)
302 303 d = getProcessOutput(self.submit_command,
303 304 [self.batch_file],env=os.environ)
304 305 d.addCallback(self.parse_job_id)
305 306 d.addErrback(self.handle_error)
306 307 return d
307 308
308 309 def kill(self):
309 310 d = getProcessOutput(self.delete_command,
310 311 [self.job_id],env=os.environ)
311 312 return d
312 313
313 314 class PBSEngineSet(BatchEngineSet):
314 315
315 316 submit_command = 'qsub'
316 317 delete_command = 'qdel'
317 318 job_id_regexp = '\d+'
318 319
319 320 def __init__(self, template_file, **kwargs):
320 321 BatchEngineSet.__init__(self, template_file, **kwargs)
321 322
322 323
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
326 echo $!
327 """
328
329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
332
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
336
337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
339
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
344 """
345
346 self.temp_dir = tempfile.gettempdir()
347 if sshx is not None:
348 self.sshx = sshx
349 else:
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
355 f = open(self.sshx, 'w')
356 f.writelines(self.sshx_template)
357 f.close()
358 self.engine_command = ipengine
359 self.engine_hosts = engine_hosts
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
365 f = open(self.engine_killer, 'w')
366 f.writelines(self.engine_killer_template)
367 f.close()
368
369 def start(self, send_furl=False):
370 dlist = []
371 for host in self.engine_hosts.keys():
372 count = self.engine_hosts[host]
373 d = self._start(host, count, send_furl)
374 dlist.append(d)
375 return gatherBoth(dlist, consumeErrors=True)
376
377 def _start(self, hostname, count=1, send_furl=False):
378 if send_furl:
379 d = self._scp_furl(hostname)
380 else:
381 d = defer.succeed(None)
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 return d
385
386 def _scp_furl(self, hostname):
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 cmd_list = scp_cmd.split()
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 log.msg('Copying furl file: %s' % scp_cmd)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 return d
393
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
398 )
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
401 sshx_scp = scp_cmd.split()
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 return d
404
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
409 )
410 cmds = exec_engine.split()
411 dlist = []
412 log.msg("about to start engines...")
413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
418
419 def kill(self):
420 dlist = []
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
425
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
430 return d
431
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
435 hostname,
436 self.temp_dir,
437 os.environ['USER']
438 )
439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
442 return d
443
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
447 self.temp_dir,
448 os.environ['USER']
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
454
455 def _exec_err(self, r):
456 log.msg(r)
457
323 458 #-----------------------------------------------------------------------------
324 459 # Main functions for the different types of clusters
325 460 #-----------------------------------------------------------------------------
326 461
327 462 # TODO:
328 463 # The logic in these codes should be moved into classes like LocalCluster
329 464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
330 465 # The main functions should then just parse the command line arguments, create
331 466 # the appropriate class and call a 'start' method.
332 467
333 468 def check_security(args, cont_args):
334 469 if (not args.x or not args.y) and not have_crypto:
335 470 log.err("""
336 471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
337 472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
338 473 reactor.stop()
339 474 return False
340 475 if args.x:
341 476 cont_args.append('-x')
342 477 if args.y:
343 478 cont_args.append('-y')
344 479 return True
345 480
481
346 482 def main_local(args):
347 483 cont_args = []
348 484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
349 485
350 486 # Check security settings before proceeding
351 487 if not check_security(args, cont_args):
352 488 return
353 489
354 490 cl = ControllerLauncher(extra_args=cont_args)
355 491 dstart = cl.start()
356 492 def start_engines(cont_pid):
357 493 engine_args = []
358 494 engine_args.append('--logfile=%s' % \
359 495 pjoin(args.logdir,'ipengine%s-' % cont_pid))
360 496 eset = LocalEngineSet(extra_args=engine_args)
361 497 def shutdown(signum, frame):
362 498 log.msg('Stopping local cluster')
363 499 # We are still playing with the times here, but these seem
364 500 # to be reliable in allowing everything to exit cleanly.
365 501 eset.interrupt_then_kill(0.5)
366 502 cl.interrupt_then_kill(0.5)
367 503 reactor.callLater(1.0, reactor.stop)
368 504 signal.signal(signal.SIGINT,shutdown)
369 505 d = eset.start(args.n)
370 506 return d
371 507 def delay_start(cont_pid):
372 508 # This is needed because the controller doesn't start listening
373 509 # right when it starts and the controller needs to write
374 510 # furl files for the engine to pick up
375 511 reactor.callLater(1.0, start_engines, cont_pid)
376 512 dstart.addCallback(delay_start)
377 513 dstart.addErrback(lambda f: f.raiseException())
378 514
515
379 516 def main_mpirun(args):
380 517 cont_args = []
381 518 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
382 519
383 520 # Check security settings before proceeding
384 521 if not check_security(args, cont_args):
385 522 return
386 523
387 524 cl = ControllerLauncher(extra_args=cont_args)
388 525 dstart = cl.start()
389 526 def start_engines(cont_pid):
390 527 raw_args = ['mpirun']
391 528 raw_args.extend(['-n',str(args.n)])
392 529 raw_args.append('ipengine')
393 530 raw_args.append('-l')
394 531 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
395 532 if args.mpi:
396 533 raw_args.append('--mpi=%s' % args.mpi)
397 534 eset = ProcessLauncher(raw_args)
398 535 def shutdown(signum, frame):
399 536 log.msg('Stopping local cluster')
400 537 # We are still playing with the times here, but these seem
401 538 # to be reliable in allowing everything to exit cleanly.
402 539 eset.interrupt_then_kill(1.0)
403 540 cl.interrupt_then_kill(1.0)
404 541 reactor.callLater(2.0, reactor.stop)
405 542 signal.signal(signal.SIGINT,shutdown)
406 543 d = eset.start()
407 544 return d
408 545 def delay_start(cont_pid):
409 546 # This is needed because the controller doesn't start listening
410 547 # right when it starts and the controller needs to write
411 548 # furl files for the engine to pick up
412 549 reactor.callLater(1.0, start_engines, cont_pid)
413 550 dstart.addCallback(delay_start)
414 551 dstart.addErrback(lambda f: f.raiseException())
415 552
553
416 554 def main_pbs(args):
417 555 cont_args = []
418 556 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
419 557
420 558 # Check security settings before proceeding
421 559 if not check_security(args, cont_args):
422 560 return
423 561
424 562 cl = ControllerLauncher(extra_args=cont_args)
425 563 dstart = cl.start()
426 564 def start_engines(r):
427 565 pbs_set = PBSEngineSet(args.pbsscript)
428 566 def shutdown(signum, frame):
429 567 log.msg('Stopping pbs cluster')
430 568 d = pbs_set.kill()
431 569 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
432 570 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
433 571 signal.signal(signal.SIGINT,shutdown)
434 572 d = pbs_set.start(args.n)
435 573 return d
436 574 dstart.addCallback(start_engines)
437 575 dstart.addErrback(lambda f: f.raiseException())
438 576
439 577
578 def main_ssh(args):
579 """Start a controller on localhost and engines using ssh.
580
581 Your clusterfile should look like::
582
583 send_furl = False # True, if you want
584 engines = {
585 'engine_host1' : engine_count,
586 'engine_host2' : engine_count2
587 }
588 """
589 clusterfile = {}
590 execfile(args.clusterfile, clusterfile)
591 if not clusterfile.has_key('send_furl'):
592 clusterfile['send_furl'] = False
593
594 cont_args = []
595 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
596
597 # Check security settings before proceeding
598 if not check_security(args, cont_args):
599 return
600
601 cl = ControllerLauncher(extra_args=cont_args)
602 dstart = cl.start()
603 def start_engines(cont_pid):
604 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
605 def shutdown(signum, frame):
606 d = ssh_set.kill()
607 # d.addErrback(log.err)
608 cl.interrupt_then_kill(1.0)
609 reactor.callLater(2.0, reactor.stop)
610 signal.signal(signal.SIGINT,shutdown)
611 d = ssh_set.start(clusterfile['send_furl'])
612 return d
613
614 def delay_start(cont_pid):
615 reactor.callLater(1.0, start_engines, cont_pid)
616
617 dstart.addCallback(delay_start)
618 dstart.addErrback(lambda f: f.raiseException())
619
620
440 621 def get_args():
441 622 base_parser = argparse.ArgumentParser(add_help=False)
442 623 base_parser.add_argument(
443 624 '-x',
444 625 action='store_true',
445 626 dest='x',
446 627 help='turn off client security'
447 628 )
448 629 base_parser.add_argument(
449 630 '-y',
450 631 action='store_true',
451 632 dest='y',
452 633 help='turn off engine security'
453 634 )
454 635 base_parser.add_argument(
455 636 "--logdir",
456 637 type=str,
457 638 dest="logdir",
458 639 help="directory to put log files (default=$IPYTHONDIR/log)",
459 640 default=pjoin(get_ipython_dir(),'log')
460 641 )
461 642 base_parser.add_argument(
462 643 "-n",
463 644 "--num",
464 645 type=int,
465 646 dest="n",
466 647 default=2,
467 648 help="the number of engines to start"
468 649 )
469 650
470 651 parser = argparse.ArgumentParser(
471 652 description='IPython cluster startup. This starts a controller and\
472 653 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
473 654 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
474 655 )
475 656 subparsers = parser.add_subparsers(
476 657 help='available cluster types. For help, do "ipcluster TYPE --help"')
477 658
478 659 parser_local = subparsers.add_parser(
479 660 'local',
480 661 help='run a local cluster',
481 662 parents=[base_parser]
482 663 )
483 664 parser_local.set_defaults(func=main_local)
484 665
485 666 parser_mpirun = subparsers.add_parser(
486 667 'mpirun',
487 668 help='run a cluster using mpirun',
488 669 parents=[base_parser]
489 670 )
490 671 parser_mpirun.add_argument(
491 672 "--mpi",
492 673 type=str,
493 674 dest="mpi", # Don't put a default here to allow no MPI support
494 675 help="how to call MPI_Init (default=mpi4py)"
495 676 )
496 677 parser_mpirun.set_defaults(func=main_mpirun)
497 678
498 679 parser_pbs = subparsers.add_parser(
499 680 'pbs',
500 681 help='run a pbs cluster',
501 682 parents=[base_parser]
502 683 )
503 684 parser_pbs.add_argument(
504 685 '--pbs-script',
505 686 type=str,
506 687 dest='pbsscript',
507 688 help='PBS script template',
508 689 default='pbs.template'
509 690 )
510 691 parser_pbs.set_defaults(func=main_pbs)
692
693 parser_ssh = subparsers.add_parser(
694 'ssh',
695 help='run a cluster using ssh, should have ssh-keys setup',
696 parents=[base_parser]
697 )
698 parser_ssh.add_argument(
699 '--clusterfile',
700 type=str,
701 dest='clusterfile',
702 help='python file describing the cluster',
703 default='clusterfile.py',
704 )
705 parser_ssh.add_argument(
706 '--sshx',
707 type=str,
708 dest='sshx',
709 help='sshx launcher helper'
710 )
711 parser_ssh.set_defaults(func=main_ssh)
712
511 713 args = parser.parse_args()
512 714 return args
513 715
514 716 def main():
515 717 args = get_args()
516 718 reactor.callWhenRunning(args.func, args)
517 719 log.startLogging(sys.stdout)
518 720 reactor.run()
519 721
520 722 if __name__ == '__main__':
521 723 main()
@@ -1,393 +1,398 b''
1 1 .. _changes:
2 2
3 3 ==========
4 4 What's new
5 5 ==========
6 6
7 7 .. contents::
8 8 ..
9 9 1 Release 0.9.1
10 10 2 Release 0.9
11 11 2.1 New features
12 12 2.2 Bug fixes
13 13 2.3 Backwards incompatible changes
14 14 2.4 Changes merged in from IPython1
15 15 2.4.1 New features
16 16 2.4.2 Bug fixes
17 17 2.4.3 Backwards incompatible changes
18 18 3 Release 0.8.4
19 19 4 Release 0.8.3
20 20 5 Release 0.8.2
21 21 6 Older releases
22 22 ..
23 23
24 24 Release dev
25 25 ===========
26 26
27 27 New features
28 28 ------------
29 29
30 * The new ipcluster now has a fully working ssh mode that should work on
31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32
30 33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
31 34 to Matt Foster for this patch.
32 35
33 36 * Fully refactored :command:`ipcluster` command line program for starting
34 37 IPython clusters. This new version is a complete rewrite and 1) is fully
35 38 cross platform (we now use Twisted's process management), 2) has much
36 39 improved performance, 3) uses subcommands for different types of clusters,
37 40 4) uses argparse for parsing command line options, 5) has better support
38 41 for starting clusters using :command:`mpirun`, 6) has experimental support
39 42 for starting engines using PBS. However, this new version of ipcluster
40 43 should be considered a technology preview. We plan on changing the API
41 44 in significant ways before it is final.
42 45
43 46 * The :mod:`argparse` module has been added to :mod:`IPython.external`.
44 47
45 48 * Fully description of the security model added to the docs.
46 49
47 50 * cd completer: show bookmarks if no other completions are available.
48 51
49 52 * sh profile: easy way to give 'title' to prompt: assign to variable
50 53 '_prompt_title'. It looks like this::
51 54
52 55 [~]|1> _prompt_title = 'sudo!'
53 56 sudo![~]|2>
54 57
55 58 * %edit: If you do '%edit pasted_block', pasted_block
56 59 variable gets updated with new data (so repeated
57 60 editing makes sense)
58 61
59 62 Bug fixes
60 63 ---------
61 64
65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66
62 67 * The ipengine and ipcontroller scripts now handle missing furl files
63 68 more gracefully by giving better error messages.
64 69
65 70 * %rehashx: Aliases no longer contain dots. python3.0 binary
66 71 will create alias python30. Fixes:
67 72 #259716 "commands with dots in them don't work"
68 73
69 74 * %cpaste: %cpaste -r repeats the last pasted block.
70 75 The block is assigned to pasted_block even if code
71 76 raises exception.
72 77
73 78 Backwards incompatible changes
74 79 ------------------------------
75 80
76 81 * The controller now has a ``-r`` flag that needs to be used if you want to
77 82 reuse existing furl files. Otherwise they are deleted (the default).
78 83
79 84 * Remove ipy_leo.py. "easy_install ipython-extension" to get it.
80 85 (done to decouple it from ipython release cycle)
81 86
82 87
83 88
84 89 Release 0.9.1
85 90 =============
86 91
87 92 This release was quickly made to restore compatibility with Python 2.4, which
88 93 version 0.9 accidentally broke. No new features were introduced, other than
89 94 some additional testing support for internal use.
90 95
91 96
92 97 Release 0.9
93 98 ===========
94 99
95 100 New features
96 101 ------------
97 102
98 103 * All furl files and security certificates are now put in a read-only
99 104 directory named ~./ipython/security.
100 105
101 106 * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that
102 107 determines the user's IPython directory in a robust manner.
103 108
104 109 * Laurent's WX application has been given a top-level script called
105 110 ipython-wx, and it has received numerous fixes. We expect this code to be
106 111 architecturally better integrated with Gael's WX 'ipython widget' over the
107 112 next few releases.
108 113
109 114 * The Editor synchronization work by Vivian De Smedt has been merged in. This
110 115 code adds a number of new editor hooks to synchronize with editors under
111 116 Windows.
112 117
113 118 * A new, still experimental but highly functional, WX shell by Gael Varoquaux.
114 119 This work was sponsored by Enthought, and while it's still very new, it is
115 120 based on a more cleanly organized arhictecture of the various IPython
116 121 components. We will continue to develop this over the next few releases as a
117 122 model for GUI components that use IPython.
118 123
119 124 * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework),
120 125 authored by Barry Wark. Currently the WX and the Cocoa ones have slightly
121 126 different internal organizations, but the whole team is working on finding
122 127 what the right abstraction points are for a unified codebase.
123 128
124 129 * As part of the frontend work, Barry Wark also implemented an experimental
125 130 event notification system that various ipython components can use. In the
126 131 next release the implications and use patterns of this system regarding the
127 132 various GUI options will be worked out.
128 133
129 134 * IPython finally has a full test system, that can test docstrings with
130 135 IPython-specific functionality. There are still a few pieces missing for it
131 136 to be widely accessible to all users (so they can run the test suite at any
132 137 time and report problems), but it now works for the developers. We are
133 138 working hard on continuing to improve it, as this was probably IPython's
134 139 major Achilles heel (the lack of proper test coverage made it effectively
135 140 impossible to do large-scale refactoring). The full test suite can now
136 141 be run using the :command:`iptest` command line program.
137 142
138 143 * The notion of a task has been completely reworked. An `ITask` interface has
139 144 been created. This interface defines the methods that tasks need to
140 145 implement. These methods are now responsible for things like submitting
141 146 tasks and processing results. There are two basic task types:
142 147 :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but
143 148 renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on
144 149 a function.
145 150
146 151 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
147 152 standardize the idea of a `map` method. This interface has a single `map`
148 153 method that has the same syntax as the built-in `map`. We have also defined
149 154 a `mapper` factory interface that creates objects that implement
150 155 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the
151 156 multiengine and task controller now have mapping capabilties.
152 157
153 158 * The parallel function capabilities have been reworks. The major changes are
154 159 that i) there is now an `@parallel` magic that creates parallel functions,
155 160 ii) the syntax for mulitple variable follows that of `map`, iii) both the
156 161 multiengine and task controller now have a parallel function implementation.
157 162
158 163 * All of the parallel computing capabilities from `ipython1-dev` have been
159 164 merged into IPython proper. This resulted in the following new subpackages:
160 165 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
161 166 :mod:`IPython.tools` and :mod:`IPython.testing`.
162 167
163 168 * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and
164 169 friends have been completely refactored. Now we are checking for
165 170 dependencies using the approach that matplotlib uses.
166 171
167 172 * The documentation has been completely reorganized to accept the
168 173 documentation from `ipython1-dev`.
169 174
170 175 * We have switched to using Foolscap for all of our network protocols in
171 176 :mod:`IPython.kernel`. This gives us secure connections that are both
172 177 encrypted and authenticated.
173 178
174 179 * We have a brand new `COPYING.txt` files that describes the IPython license
175 180 and copyright. The biggest change is that we are putting "The IPython
176 181 Development Team" as the copyright holder. We give more details about
177 182 exactly what this means in this file. All developer should read this and use
178 183 the new banner in all IPython source code files.
179 184
180 185 * sh profile: ./foo runs foo as system command, no need to do !./foo anymore
181 186
182 187 * String lists now support ``sort(field, nums = True)`` method (to easily sort
183 188 system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``.
184 189
185 190 * '%cpaste foo' now assigns the pasted block as string list, instead of string
186 191
187 192 * The ipcluster script now run by default with no security. This is done
188 193 because the main usage of the script is for starting things on localhost.
189 194 Eventually when ipcluster is able to start things on other hosts, we will put
190 195 security back.
191 196
192 197 * 'cd --foo' searches directory history for string foo, and jumps to that dir.
193 198 Last part of dir name is checked first. If no matches for that are found,
194 199 look at the whole path.
195 200
196 201
197 202 Bug fixes
198 203 ---------
199 204
200 205 * The Windows installer has been fixed. Now all IPython scripts have ``.bat``
201 206 versions created. Also, the Start Menu shortcuts have been updated.
202 207
203 208 * The colors escapes in the multiengine client are now turned off on win32 as
204 209 they don't print correctly.
205 210
206 211 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing
207 212 mpi_import_statement incorrectly, which was leading the engine to crash when
208 213 mpi was enabled.
209 214
210 215 * A few subpackages had missing ``__init__.py`` files.
211 216
212 217 * The documentation is only created if Sphinx is found. Previously, the
213 218 ``setup.py`` script would fail if it was missing.
214 219
215 220 * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as
216 221 it caused problems on certain platforms.
217 222
218 223
219 224 Backwards incompatible changes
220 225 ------------------------------
221 226
222 227 * The ``clusterfile`` options of the :command:`ipcluster` command has been
223 228 removed as it was not working and it will be replaced soon by something much
224 229 more robust.
225 230
226 231 * The :mod:`IPython.kernel` configuration now properly find the user's
227 232 IPython directory.
228 233
229 234 * In ipapi, the :func:`make_user_ns` function has been replaced with
230 235 :func:`make_user_namespaces`, to support dict subclasses in namespace
231 236 creation.
232 237
233 238 * :class:`IPython.kernel.client.Task` has been renamed
234 239 :class:`IPython.kernel.client.StringTask` to make way for new task types.
235 240
236 241 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
237 242 and `map`.
238 243
239 244 * Renamed the values that the rename `dist` keyword argument can have from
240 245 `'basic'` to `'b'`.
241 246
242 247 * IPython has a larger set of dependencies if you want all of its capabilities.
243 248 See the `setup.py` script for details.
244 249
245 250 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
246 251 :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple.
247 252 Instead they take the filename of a file that contains the FURL for that
248 253 client. If the FURL file is in your IPYTHONDIR, it will be found automatically
249 254 and the constructor can be left empty.
250 255
251 256 * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created
252 257 using the factory functions :func:`get_multiengine_client` and
253 258 :func:`get_task_client`. These return a `Deferred` to the actual client.
254 259
255 260 * The command line options to `ipcontroller` and `ipengine` have changed to
256 261 reflect the new Foolscap network protocol and the FURL files. Please see the
257 262 help for these scripts for details.
258 263
259 264 * The configuration files for the kernel have changed because of the Foolscap
260 265 stuff. If you were using custom config files before, you should delete them
261 266 and regenerate new ones.
262 267
263 268 Changes merged in from IPython1
264 269 -------------------------------
265 270
266 271 New features
267 272 ............
268 273
269 274 * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and
270 275 zope.interface are now easy installable, we can declare them as dependencies
271 276 in our setupegg.py script.
272 277
273 278 * IPython is now compatible with Twisted 2.5.0 and 8.x.
274 279
275 280 * Added a new example of how to use :mod:`ipython1.kernel.asynclient`.
276 281
277 282 * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not
278 283 been merged into IPython and is still in `ipython1-dev`.
279 284
280 285 * The ``TaskController`` now has methods for getting the queue status.
281 286
282 287 * The ``TaskResult`` objects not have information about how long the task
283 288 took to run.
284 289
285 290 * We are attaching additional attributes to exceptions ``(_ipython_*)`` that
286 291 we use to carry additional info around.
287 292
288 293 * New top-level module :mod:`asyncclient` that has asynchronous versions (that
289 294 return deferreds) of the client classes. This is designed to users who want
290 295 to run their own Twisted reactor.
291 296
292 297 * All the clients in :mod:`client` are now based on Twisted. This is done by
293 298 running the Twisted reactor in a separate thread and using the
294 299 :func:`blockingCallFromThread` function that is in recent versions of Twisted.
295 300
296 301 * Functions can now be pushed/pulled to/from engines using
297 302 :meth:`MultiEngineClient.push_function` and
298 303 :meth:`MultiEngineClient.pull_function`.
299 304
300 305 * Gather/scatter are now implemented in the client to reduce the work load
301 306 of the controller and improve performance.
302 307
303 308 * Complete rewrite of the IPython docuementation. All of the documentation
304 309 from the IPython website has been moved into docs/source as restructured
305 310 text documents. PDF and HTML documentation are being generated using
306 311 Sphinx.
307 312
308 313 * New developer oriented documentation: development guidelines and roadmap.
309 314
310 315 * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt``
311 316 file that is organized by release and is meant to provide something more
312 317 relevant for users.
313 318
314 319 Bug fixes
315 320 .........
316 321
317 322 * Created a proper ``MANIFEST.in`` file to create source distributions.
318 323
319 324 * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine
320 325 actions were being collected with a :class:`DeferredList` with
321 326 ``fireononeerrback=1``. This meant that methods were returning
322 327 before all engines had given their results. This was causing extremely odd
323 328 bugs in certain cases. To fix this problem, we have 1) set
324 329 ``fireononeerrback=0`` to make sure all results (or exceptions) are in
325 330 before returning and 2) introduced a :exc:`CompositeError` exception
326 331 that wraps all of the engine exceptions. This is a huge change as it means
327 332 that users will have to catch :exc:`CompositeError` rather than the actual
328 333 exception.
329 334
330 335 Backwards incompatible changes
331 336 ..............................
332 337
333 338 * All names have been renamed to conform to the lowercase_with_underscore
334 339 convention. This will require users to change references to all names like
335 340 ``queueStatus`` to ``queue_status``.
336 341
337 342 * Previously, methods like :meth:`MultiEngineClient.push` and
338 343 :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was
339 344 becoming a problem as we weren't able to introduce new keyword arguments into
340 345 the API. Now these methods simple take a dict or sequence. This has also
341 346 allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and
342 347 :meth:`pullAll`. These things are now handled with the ``targets`` keyword
343 348 argument that defaults to ``'all'``.
344 349
345 350 * The :attr:`MultiEngineClient.magicTargets` has been renamed to
346 351 :attr:`MultiEngineClient.targets`.
347 352
348 353 * All methods in the MultiEngine interface now accept the optional keyword
349 354 argument ``block``.
350 355
351 356 * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and
352 357 :class:`TaskController` to :class:`TaskClient`.
353 358
354 359 * Renamed the top-level module from :mod:`api` to :mod:`client`.
355 360
356 361 * Most methods in the multiengine interface now raise a :exc:`CompositeError`
357 362 exception that wraps the user's exceptions, rather than just raising the raw
358 363 user's exception.
359 364
360 365 * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push``
361 366 and ``pull``.
362 367
363 368
364 369 Release 0.8.4
365 370 =============
366 371
367 372 This was a quick release to fix an unfortunate bug that slipped into the 0.8.3
368 373 release. The ``--twisted`` option was disabled, as it turned out to be broken
369 374 across several platforms.
370 375
371 376
372 377 Release 0.8.3
373 378 =============
374 379
375 380 * pydb is now disabled by default (due to %run -d problems). You can enable
376 381 it by passing -pydb command line argument to IPython. Note that setting
377 382 it in config file won't work.
378 383
379 384
380 385 Release 0.8.2
381 386 =============
382 387
383 388 * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory
384 389 and jumps to /foo. The current behaviour is closer to the documented
385 390 behaviour, and should not trip anyone.
386 391
387 392
388 393 Older releases
389 394 ==============
390 395
391 396 Changes in earlier releases of IPython are described in the older file
392 397 ``ChangeLog``. Please refer to this document for details.
393 398
@@ -1,251 +1,324 b''
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19 19
20 20 General considerations
21 21 ======================
22 22
23 23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24 24
25 25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26 26
27 27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 28 ``host0``.
29 29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 32 :command:`ipengine`. This command has to be told where the FURL file
33 33 (:file:`ipcontroller-engine.furl`) is located.
34 34
35 35 At this point, the controller and engines will be connected. By default, the
36 36 FURL files created by the controller are put into the
37 37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 38 the controller, step 2 can be skipped as the engines will automatically look
39 39 at that location.
40 40
41 41 The final step required required to actually use the running controller from a
42 42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45 45
46 46 Using :command:`ipcluster`
47 47 ==========================
48 48
49 49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50 50
51 51 1. When the controller and engines are all run on localhost. This is useful
52 52 for testing or running on a multicore computer.
53 53 2. When engines are started using the :command:`mpirun` command that comes
54 54 with most MPI [MPI]_ implementations
55 55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
57 remote nodes using :command:`ssh`.
56 58
57 59 .. note::
58 60
59 61 It is also possible for advanced users to add support to
60 62 :command:`ipcluster` for starting controllers and engines using other
61 63 methods (like Sun's Grid Engine for example).
62 64
63 65 .. note::
64 66
65 67 Currently :command:`ipcluster` requires that the
66 68 :file:`~/.ipython/security` directory live on a shared filesystem that is
67 69 seen by both the controller and engines. If you don't have a shared file
68 70 system you will need to use :command:`ipcontroller` and
69 :command:`ipengine` directly.
71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 using the :command:`ssh` method to start the cluster.
70 73
71 74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
72 75 and :command:`ipengine` to perform the steps described above.
73 76
74 77 Using :command:`ipcluster` in local mode
75 78 ----------------------------------------
76 79
77 80 To start one controller and 4 engines on localhost, just do::
78 81
79 82 $ ipcluster local -n 4
80 83
81 84 To see other command line options for the local mode, do::
82 85
83 86 $ ipcluster local -h
84 87
85 88 Using :command:`ipcluster` in mpirun mode
86 89 -----------------------------------------
87 90
88 91 The mpirun mode is useful if you:
89 92
90 93 1. Have MPI installed.
91 94 2. Your systems are configured to use the :command:`mpirun` command to start
92 95 processes.
93 96
94 97 If these are satisfied, you can start an IPython cluster using::
95 98
96 99 $ ipcluster mpirun -n 4
97 100
98 101 This does the following:
99 102
100 103 1. Starts the IPython controller on current host.
101 104 2. Uses :command:`mpirun` to start 4 engines.
102 105
103 106 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
104 107
105 108 $ ipcluster mpirun -n 4 --mpi=mpi4py
106 109
107 110 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
108 111
109 112 Additional command line options for this mode can be found by doing::
110 113
111 114 $ ipcluster mpirun -h
112 115
113 116 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
114 117
115 118
116 119 Using :command:`ipcluster` in PBS mode
117 120 --------------------------------------
118 121
119 122 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
120 123
121 124 .. sourcecode:: bash
122 125
123 126 #PBS -N ipython
124 127 #PBS -j oe
125 128 #PBS -l walltime=00:10:00
126 129 #PBS -l nodes=${n/4}:ppn=4
127 130 #PBS -q parallel
128 131
129 132 cd $$PBS_O_WORKDIR
130 133 export PATH=$$HOME/usr/local/bin
131 134 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
132 135 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
133 136
134 137 There are a few important points about this template:
135 138
136 139 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
137 140 template engine.
138 141
139 142 2. Instead of putting in the actual number of engines, use the notation
140 143 ``${n}`` to indicate the number of engines to be started. You can also uses
141 144 expressions like ``${n/4}`` in the template to indicate the number of
142 145 nodes.
143 146
144 147 3. Because ``$`` is a special character used by the template engine, you must
145 148 escape any ``$`` by using ``$$``. This is important when referring to
146 149 environment variables in the template.
147 150
148 151 4. Any options to :command:`ipengine` should be given in the batch script
149 152 template.
150 153
151 154 5. Depending on the configuration of you system, you may have to set
152 155 environment variables in the script template.
153 156
154 157 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
155 158
156 159 $ ipcluster pbs -n 128 --pbs-script=pbs.template
157 160
158 161 Additional command line options for this mode can be found by doing::
159 162
160 163 $ ipcluster pbs -h
161 164
165 Using :command:`ipcluster` in SSH mode
166 --------------------------------------
167
168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 nodes and the :command:`ipcontroller` on localhost.
170
171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172
173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174
175 .. sourcecode:: python
176
177 send_furl = True
178 engines = { 'host1.example.com' : 2,
179 'host2.example.com' : 5,
180 'host3.example.com' : 1,
181 'host4.example.com' : 8 }
182
183 Since this is a regular python file usual python syntax applies. Things to note:
184
185 * The `engines` dict, where the keys is the host we want to run engines on and
186 the value is the number of engines to run on that host.
187 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 furl needed for :command:`ipengine` to each host.
189
190 The ``--clusterfile`` command line option lets you specify the file to use for
191 the cluster definition. Once you have your cluster file and you can
192 :command:`ssh` into the remote hosts with out an password you are ready to
193 start your cluster like so:
194
195 .. sourcecode:: bash
196
197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198
199
200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201
202 * sshx.sh
203 * engine_killer.sh
204
205 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
206
207 The default sshx.sh is the following:
208
209 .. sourcecode:: bash
210
211 #!/bin/sh
212 "$@" &> /dev/null &
213 echo $!
214
215 If you want to use a custom sshx.sh script you need to use the ``--sshx``
216 option and specify the file to use. Using a custom sshx.sh file could be
217 helpful when you need to setup the environment on the remote host before
218 executing :command:`ipengine`.
219
220 For a detailed options list:
221
222 .. sourcecode:: bash
223
224 $ ipcluster ssh -h
225
226 Current limitations of the SSH mode of :command:`ipcluster` are:
227
228 * Untested on Windows. Would require a working :command:`ssh` on Windows.
229 Also, we are using shell scripts to setup and execute commands on remote
230 hosts.
231 * :command:`ipcontroller` is started on localhost, with no option to start it
232 on a remote node.
233
162 234 Using the :command:`ipcontroller` and :command:`ipengine` commands
163 235 ==================================================================
164 236
165 237 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
166 238
167 239 Starting the controller and engine on your local machine
168 240 --------------------------------------------------------
169 241
170 242 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
171 243 local machine, do the following.
172 244
173 245 First start the controller::
174 246
175 247 $ ipcontroller
176 248
177 249 Next, start however many instances of the engine you want using (repeatedly) the command::
178 250
179 251 $ ipengine
180 252
181 253 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
182 254
183 255 .. warning::
184 256
185 257 The order of the above operations is very important. You *must*
186 258 start the controller before the engines, since the engines connect
187 259 to the controller as they get started.
188 260
189 261 .. note::
190 262
191 263 On some platforms (OS X), to put the controller and engine into the
192 264 background you may need to give these commands in the form ``(ipcontroller
193 265 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
194 266 properly.
195 267
196 268 Starting the controller and engines on different hosts
197 269 ------------------------------------------------------
198 270
199 271 When the controller and engines are running on different hosts, things are
200 272 slightly more complicated, but the underlying ideas are the same:
201 273
202 274 1. Start the controller on a host using :command:`ipcontroller`.
203 275 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
204 276 3. Use :command:`ipengine` on the engine's hosts to start the engines.
205 277
206 278 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
207 279
208 280 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
209 281 directory on the engine's host, where it will be found automatically.
210 282 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
211 283 flag.
212 284
213 285 The ``--furl-file`` flag works like this::
214 286
215 287 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
216 288
217 289 .. note::
218 290
219 291 If the controller's and engine's hosts all have a shared file system
220 292 (:file:`~./ipython/security` is the same on all of them), then things
221 293 will just work!
222 294
223 295 Make FURL files persistent
224 296 ---------------------------
225 297
226 298 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
227 299
228 300 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
229 301
230 302 $ ipcontroller -r --client-port=10101 --engine-port=10102
231 303
232 304 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
233 305
234 306 .. note::
235 307
236 308 You may ask the question: what ports does the controller listen on if you
237 309 don't tell is to use specific ones? The default is to use high random port
238 310 numbers. We do this for two reasons: i) to increase security through
239 311 obscurity and ii) to multiple controllers on a given host to start and
240 312 automatically use different ports.
241 313
242 314 Log files
243 315 ---------
244 316
245 317 All of the components of IPython have log files associated with them.
246 318 These log files can be extremely useful in debugging problems with
247 319 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
248 320 the log files to us will often help us to debug any problems.
249 321
250 322
251 323 .. [PBS] Portable Batch System. http://www.openpbs.org/
324 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now