##// END OF EJS Templates
Merging from upstream
Fernando Perez -
r1918:37bba0ee merge
parent child Browse files
Show More
@@ -1,896 +1,951 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3 3
4 4 """General Classes for IMultiEngine clients."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22 import linecache
23 23
24 24 from twisted.internet import reactor
25 25 from twisted.python import components, log
26 26 from twisted.python.failure import Failure
27 27 from zope.interface import Interface, implements, Attribute
28 28
29 29 from IPython.ColorANSI import TermColors
30 30
31 31 from IPython.kernel.twistedutil import blockingCallFromThread
32 32 from IPython.kernel import error
33 33 from IPython.kernel.parallelfunction import ParallelFunction
34 34 from IPython.kernel.mapper import (
35 35 MultiEngineMapper,
36 36 IMultiEngineMapperFactory,
37 37 IMapper
38 38 )
39 39 from IPython.kernel import map as Map
40 40 from IPython.kernel import multiengine as me
41 41 from IPython.kernel.multiengine import (IFullMultiEngine,
42 42 IFullSynchronousMultiEngine)
43 43
44 44
45 45 #-------------------------------------------------------------------------------
46 46 # Pending Result things
47 47 #-------------------------------------------------------------------------------
48 48
49 49 class IPendingResult(Interface):
50 50 """A representation of a result that is pending.
51 51
52 52 This class is similar to Twisted's `Deferred` object, but is designed to be
53 53 used in a synchronous context.
54 54 """
55 55
56 56 result_id=Attribute("ID of the deferred on the other side")
57 57 client=Attribute("A client that I came from")
58 58 r=Attribute("An attribute that is a property that calls and returns get_result")
59 59
60 60 def get_result(default=None, block=True):
61 61 """
62 62 Get a result that is pending.
63 63
64 64 :Parameters:
65 65 default
66 66 The value to return if the result is not ready.
67 67 block : boolean
68 68 Should I block for the result.
69 69
70 70 :Returns: The actual result or the default value.
71 71 """
72 72
73 73 def add_callback(f, *args, **kwargs):
74 74 """
75 75 Add a callback that is called with the result.
76 76
77 77 If the original result is foo, adding a callback will cause
78 78 f(foo, *args, **kwargs) to be returned instead. If multiple
79 79 callbacks are registered, they are chained together: the result of
80 80 one is passed to the next and so on.
81 81
82 82 Unlike Twisted's Deferred object, there is no errback chain. Thus
83 83 any exception raised will not be caught and handled. User must
84 84 catch these by hand when calling `get_result`.
85 85 """
86 86
87 87
88 88 class PendingResult(object):
89 89 """A representation of a result that is not yet ready.
90 90
91 91 A user should not create a `PendingResult` instance by hand.
92 92
93 93 Methods
94 94 =======
95 95
96 96 * `get_result`
97 97 * `add_callback`
98 98
99 99 Properties
100 100 ==========
101 101 * `r`
102 102 """
103 103
104 104 def __init__(self, client, result_id):
105 105 """Create a PendingResult with a result_id and a client instance.
106 106
107 107 The client should implement `_getPendingResult(result_id, block)`.
108 108 """
109 109 self.client = client
110 110 self.result_id = result_id
111 111 self.called = False
112 112 self.raised = False
113 113 self.callbacks = []
114 114
115 115 def get_result(self, default=None, block=True):
116 116 """Get a result that is pending.
117 117
118 118 This method will connect to an IMultiEngine adapted controller
119 119 and see if the result is ready. If the action triggers an exception
120 120 raise it and record it. This method records the result/exception once it is
121 121 retrieved. Calling `get_result` again will get this cached result or will
122 122 re-raise the exception. The .r attribute is a property that calls
123 123 `get_result` with block=True.
124 124
125 125 :Parameters:
126 126 default
127 127 The value to return if the result is not ready.
128 128 block : boolean
129 129 Should I block for the result.
130 130
131 131 :Returns: The actual result or the default value.
132 132 """
133 133
134 134 if self.called:
135 135 if self.raised:
136 136 raise self.result[0], self.result[1], self.result[2]
137 137 else:
138 138 return self.result
139 139 try:
140 140 result = self.client.get_pending_deferred(self.result_id, block)
141 141 except error.ResultNotCompleted:
142 142 return default
143 143 except:
144 144 # Reraise other error, but first record them so they can be reraised
145 145 # later if .r or get_result is called again.
146 146 self.result = sys.exc_info()
147 147 self.called = True
148 148 self.raised = True
149 149 raise
150 150 else:
151 151 for cb in self.callbacks:
152 152 result = cb[0](result, *cb[1], **cb[2])
153 153 self.result = result
154 154 self.called = True
155 155 return result
156 156
157 157 def add_callback(self, f, *args, **kwargs):
158 158 """Add a callback that is called with the result.
159 159
160 160 If the original result is result, adding a callback will cause
161 161 f(result, *args, **kwargs) to be returned instead. If multiple
162 162 callbacks are registered, they are chained together: the result of
163 163 one is passed to the next and so on.
164 164
165 165 Unlike Twisted's Deferred object, there is no errback chain. Thus
166 166 any exception raised will not be caught and handled. User must
167 167 catch these by hand when calling `get_result`.
168 168 """
169 169 assert callable(f)
170 170 self.callbacks.append((f, args, kwargs))
171 171
172 172 def __cmp__(self, other):
173 173 if self.result_id < other.result_id:
174 174 return -1
175 175 else:
176 176 return 1
177 177
178 178 def _get_r(self):
179 179 return self.get_result(block=True)
180 180
181 181 r = property(_get_r)
182 182 """This property is a shortcut to a `get_result(block=True)`."""
183 183
184 184
185 185 #-------------------------------------------------------------------------------
186 186 # Pretty printing wrappers for certain lists
187 187 #-------------------------------------------------------------------------------
188 188
189 189 class ResultList(list):
190 190 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
191 191
192 192 def __repr__(self):
193 193 output = []
194 194 # These colored prompts were not working on Windows
195 195 if sys.platform == 'win32':
196 196 blue = normal = red = green = ''
197 197 else:
198 198 blue = TermColors.Blue
199 199 normal = TermColors.Normal
200 200 red = TermColors.Red
201 201 green = TermColors.Green
202 202 output.append("<Results List>\n")
203 203 for cmd in self:
204 204 if isinstance(cmd, Failure):
205 205 output.append(cmd)
206 206 else:
207 207 target = cmd.get('id',None)
208 208 cmd_num = cmd.get('number',None)
209 209 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
210 210 cmd_stdout = cmd.get('stdout', None)
211 211 cmd_stderr = cmd.get('stderr', None)
212 212 output.append("%s[%i]%s In [%i]:%s %s\n" % \
213 213 (green, target,
214 214 blue, cmd_num, normal, cmd_stdin))
215 215 if cmd_stdout:
216 216 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
217 217 (green, target,
218 218 red, cmd_num, normal, cmd_stdout))
219 219 if cmd_stderr:
220 220 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
221 221 (green, target,
222 222 red, cmd_num, normal, cmd_stderr))
223 223 return ''.join(output)
224 224
225 225
226 226 def wrapResultList(result):
227 227 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
228 228 if len(result) == 0:
229 229 result = [result]
230 230 return ResultList(result)
231 231
232 232
233 233 class QueueStatusList(list):
234 234 """A subclass of list that pretty prints the output of `queue_status`."""
235 235
236 236 def __repr__(self):
237 237 output = []
238 238 output.append("<Queue Status List>\n")
239 239 for e in self:
240 240 output.append("Engine: %s\n" % repr(e[0]))
241 241 output.append(" Pending: %s\n" % repr(e[1]['pending']))
242 242 for q in e[1]['queue']:
243 243 output.append(" Command: %s\n" % repr(q))
244 244 return ''.join(output)
245 245
246 246
247 247 #-------------------------------------------------------------------------------
248 248 # InteractiveMultiEngineClient
249 249 #-------------------------------------------------------------------------------
250 250
251 251 class InteractiveMultiEngineClient(object):
252 252 """A mixin class that add a few methods to a multiengine client.
253 253
254 254 The methods in this mixin class are designed for interactive usage.
255 255 """
256 256
257 257 def activate(self):
258 258 """Make this `MultiEngineClient` active for parallel magic commands.
259 259
260 260 IPython has a magic command syntax to work with `MultiEngineClient` objects.
261 261 In a given IPython session there is a single active one. While
262 262 there can be many `MultiEngineClient` created and used by the user,
263 263 there is only one active one. The active `MultiEngineClient` is used whenever
264 264 the magic commands %px and %autopx are used.
265 265
266 266 The activate() method is called on a given `MultiEngineClient` to make it
267 267 active. Once this has been done, the magic commands can be used.
268 268 """
269 269
270 270 try:
271 271 __IPYTHON__.activeController = self
272 272 except NameError:
273 273 print "The IPython Controller magics only work within IPython."
274 274
275 275 def __setitem__(self, key, value):
276 276 """Add a dictionary interface for pushing/pulling.
277 277
278 278 This functions as a shorthand for `push`.
279 279
280 280 :Parameters:
281 281 key : str
282 282 What to call the remote object.
283 283 value : object
284 284 The local Python object to push.
285 285 """
286 286 targets, block = self._findTargetsAndBlock()
287 287 return self.push({key:value}, targets=targets, block=block)
288 288
289 289 def __getitem__(self, key):
290 290 """Add a dictionary interface for pushing/pulling.
291 291
292 292 This functions as a shorthand to `pull`.
293 293
294 294 :Parameters:
295 295 - `key`: A string representing the key.
296 296 """
297 297 if isinstance(key, str):
298 298 targets, block = self._findTargetsAndBlock()
299 299 return self.pull(key, targets=targets, block=block)
300 300 else:
301 301 raise TypeError("__getitem__ only takes strs")
302 302
303 303 def __len__(self):
304 304 """Return the number of available engines."""
305 305 return len(self.get_ids())
306 306
307 307 #---------------------------------------------------------------------------
308 308 # Make this a context manager for with
309 309 #---------------------------------------------------------------------------
310 310
311 311 def findsource_file(self,f):
312 312 linecache.checkcache()
313 313 s = findsource(f.f_code)
314 314 lnum = f.f_lineno
315 315 wsource = s[0][f.f_lineno:]
316 316 return strip_whitespace(wsource)
317 317
318 318 def findsource_ipython(self,f):
319 319 from IPython import ipapi
320 320 self.ip = ipapi.get()
321 321 wsource = [l+'\n' for l in
322 322 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
323 323 return strip_whitespace(wsource)
324 324
325 325 def __enter__(self):
326 326 f = sys._getframe(1)
327 327 local_ns = f.f_locals
328 328 global_ns = f.f_globals
329 329 if f.f_code.co_filename == '<ipython console>':
330 330 s = self.findsource_ipython(f)
331 331 else:
332 332 s = self.findsource_file(f)
333 333
334 334 self._with_context_result = self.execute(s)
335 335
336 336 def __exit__ (self, etype, value, tb):
337 337 if issubclass(etype,error.StopLocalExecution):
338 338 return True
339 339
340 340
341 341 def remote():
342 342 m = 'Special exception to stop local execution of parallel code.'
343 343 raise error.StopLocalExecution(m)
344 344
345 345 def strip_whitespace(source):
346 346 # Expand tabs to avoid any confusion.
347 347 wsource = [l.expandtabs(4) for l in source]
348 348 # Detect the indentation level
349 349 done = False
350 350 for line in wsource:
351 351 if line.isspace():
352 352 continue
353 353 for col,char in enumerate(line):
354 354 if char != ' ':
355 355 done = True
356 356 break
357 357 if done:
358 358 break
359 359 # Now we know how much leading space there is in the code. Next, we
360 360 # extract up to the first line that has less indentation.
361 361 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
362 362 # detect triple quoted strings that may have flush left text.
363 363 for lno,line in enumerate(wsource):
364 364 lead = line[:col]
365 365 if lead.isspace():
366 366 continue
367 367 else:
368 368 if not lead.lstrip().startswith('#'):
369 369 break
370 370 # The real 'with' source is up to lno
371 371 src_lines = [l[col:] for l in wsource[:lno+1]]
372 372
373 373 # Finally, check that the source's first non-comment line begins with the
374 374 # special call 'remote()'
375 375 for nline,line in enumerate(src_lines):
376 376 if line.isspace() or line.startswith('#'):
377 377 continue
378 378 if 'remote()' in line:
379 379 break
380 380 else:
381 381 raise ValueError('remote() call missing at the start of code')
382 382 src = ''.join(src_lines[nline+1:])
383 383 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
384 384 return src
385 385
386 386
387 387 #-------------------------------------------------------------------------------
388 388 # The top-level MultiEngine client adaptor
389 389 #-------------------------------------------------------------------------------
390 390
391 391
392 392 class IFullBlockingMultiEngineClient(Interface):
393 393 pass
394 394
395 395
396 396 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
397 397 """
398 398 A blocking client to the `IMultiEngine` controller interface.
399 399
400 400 This class allows users to use a set of engines for a parallel
401 401 computation through the `IMultiEngine` interface. In this interface,
402 402 each engine has a specific id (an int) that is used to refer to the
403 403 engine, run code on it, etc.
404 404 """
405 405
406 406 implements(
407 407 IFullBlockingMultiEngineClient,
408 408 IMultiEngineMapperFactory,
409 409 IMapper
410 410 )
411 411
412 412 def __init__(self, smultiengine):
413 413 self.smultiengine = smultiengine
414 414 self.block = True
415 415 self.targets = 'all'
416 416
417 417 def _findBlock(self, block=None):
418 418 if block is None:
419 419 return self.block
420 420 else:
421 421 if block in (True, False):
422 422 return block
423 423 else:
424 424 raise ValueError("block must be True or False")
425 425
426 426 def _findTargets(self, targets=None):
427 427 if targets is None:
428 428 return self.targets
429 429 else:
430 430 if not isinstance(targets, (str,list,tuple,int)):
431 431 raise ValueError("targets must be a str, list, tuple or int")
432 432 return targets
433 433
434 434 def _findTargetsAndBlock(self, targets=None, block=None):
435 435 return self._findTargets(targets), self._findBlock(block)
436 436
437 437 def _blockFromThread(self, function, *args, **kwargs):
438 438 block = kwargs.get('block', None)
439 439 if block is None:
440 440 raise error.MissingBlockArgument("'block' keyword argument is missing")
441 441 result = blockingCallFromThread(function, *args, **kwargs)
442 442 if not block:
443 443 result = PendingResult(self, result)
444 444 return result
445 445
446 446 def get_pending_deferred(self, deferredID, block):
447 447 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
448 448
449 449 def barrier(self, pendingResults):
450 450 """Synchronize a set of `PendingResults`.
451 451
452 452 This method is a synchronization primitive that waits for a set of
453 453 `PendingResult` objects to complete. More specifically, barier does
454 454 the following.
455 455
456 456 * The `PendingResult`s are sorted by result_id.
457 457 * The `get_result` method is called for each `PendingResult` sequentially
458 458 with block=True.
459 459 * If a `PendingResult` gets a result that is an exception, it is
460 460 trapped and can be re-raised later by calling `get_result` again.
461 461 * The `PendingResult`s are flushed from the controller.
462 462
463 463 After barrier has been called on a `PendingResult`, its results can
464 464 be retrieved by calling `get_result` again or accesing the `r` attribute
465 465 of the instance.
466 466 """
467 467
468 468 # Convert to list for sorting and check class type
469 469 prList = list(pendingResults)
470 470 for pr in prList:
471 471 if not isinstance(pr, PendingResult):
472 472 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
473 473
474 474 # Sort the PendingResults so they are in order
475 475 prList.sort()
476 476 # Block on each PendingResult object
477 477 for pr in prList:
478 478 try:
479 479 result = pr.get_result(block=True)
480 480 except Exception:
481 481 pass
482 482
483 483 def flush(self):
484 484 """
485 485 Clear all pending deferreds/results from the controller.
486 486
487 487 For each `PendingResult` that is created by this client, the controller
488 488 holds on to the result for that `PendingResult`. This can be a problem
489 489 if there are a large number of `PendingResult` objects that are created.
490 490
491 491 Once the result of the `PendingResult` has been retrieved, the result
492 492 is removed from the controller, but if a user doesn't get a result (
493 493 they just ignore the `PendingResult`) the result is kept forever on the
494 494 controller. This method allows the user to clear out all un-retrieved
495 495 results on the controller.
496 496 """
497 497 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
498 498 return r
499 499
500 500 clear_pending_results = flush
501 501
502 502 #---------------------------------------------------------------------------
503 503 # IEngineMultiplexer related methods
504 504 #---------------------------------------------------------------------------
505 505
506 506 def execute(self, lines, targets=None, block=None):
507 507 """
508 508 Execute code on a set of engines.
509 509
510 510 :Parameters:
511 511 lines : str
512 512 The Python code to execute as a string
513 513 targets : id or list of ids
514 514 The engine to use for the execution
515 515 block : boolean
516 516 If False, this method will return the actual result. If False,
517 517 a `PendingResult` is returned which can be used to get the result
518 518 at a later time.
519 519 """
520 520 targets, block = self._findTargetsAndBlock(targets, block)
521 521 result = blockingCallFromThread(self.smultiengine.execute, lines,
522 522 targets=targets, block=block)
523 523 if block:
524 524 result = ResultList(result)
525 525 else:
526 526 result = PendingResult(self, result)
527 527 result.add_callback(wrapResultList)
528 528 return result
529 529
530 530 def push(self, namespace, targets=None, block=None):
531 531 """
532 532 Push a dictionary of keys and values to engines namespace.
533 533
534 534 Each engine has a persistent namespace. This method is used to push
535 535 Python objects into that namespace.
536 536
537 537 The objects in the namespace must be pickleable.
538 538
539 539 :Parameters:
540 540 namespace : dict
541 541 A dict that contains Python objects to be injected into
542 542 the engine persistent namespace.
543 543 targets : id or list of ids
544 544 The engine to use for the execution
545 545 block : boolean
546 546 If False, this method will return the actual result. If False,
547 547 a `PendingResult` is returned which can be used to get the result
548 548 at a later time.
549 549 """
550 550 targets, block = self._findTargetsAndBlock(targets, block)
551 551 return self._blockFromThread(self.smultiengine.push, namespace,
552 552 targets=targets, block=block)
553 553
554 554 def pull(self, keys, targets=None, block=None):
555 555 """
556 556 Pull Python objects by key out of engines namespaces.
557 557
558 558 :Parameters:
559 559 keys : str or list of str
560 560 The names of the variables to be pulled
561 561 targets : id or list of ids
562 562 The engine to use for the execution
563 563 block : boolean
564 564 If False, this method will return the actual result. If False,
565 565 a `PendingResult` is returned which can be used to get the result
566 566 at a later time.
567 567 """
568 568 targets, block = self._findTargetsAndBlock(targets, block)
569 569 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
570 570
571 571 def push_function(self, namespace, targets=None, block=None):
572 572 """
573 573 Push a Python function to an engine.
574 574
575 575 This method is used to push a Python function to an engine. This
576 576 method can then be used in code on the engines. Closures are not supported.
577 577
578 578 :Parameters:
579 579 namespace : dict
580 580 A dict whose values are the functions to be pushed. The keys give
581 581 that names that the function will appear as in the engines
582 582 namespace.
583 583 targets : id or list of ids
584 584 The engine to use for the execution
585 585 block : boolean
586 586 If False, this method will return the actual result. If False,
587 587 a `PendingResult` is returned which can be used to get the result
588 588 at a later time.
589 589 """
590 590 targets, block = self._findTargetsAndBlock(targets, block)
591 591 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
592 592
593 593 def pull_function(self, keys, targets=None, block=None):
594 594 """
595 595 Pull a Python function from an engine.
596 596
597 597 This method is used to pull a Python function from an engine.
598 598 Closures are not supported.
599 599
600 600 :Parameters:
601 601 keys : str or list of str
602 602 The names of the functions to be pulled
603 603 targets : id or list of ids
604 604 The engine to use for the execution
605 605 block : boolean
606 606 If False, this method will return the actual result. If False,
607 607 a `PendingResult` is returned which can be used to get the result
608 608 at a later time.
609 609 """
610 610 targets, block = self._findTargetsAndBlock(targets, block)
611 611 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
612 612
613 613 def push_serialized(self, namespace, targets=None, block=None):
614 614 targets, block = self._findTargetsAndBlock(targets, block)
615 615 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
616 616
617 617 def pull_serialized(self, keys, targets=None, block=None):
618 618 targets, block = self._findTargetsAndBlock(targets, block)
619 619 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
620 620
621 621 def get_result(self, i=None, targets=None, block=None):
622 622 """
623 623 Get a previous result.
624 624
625 625 When code is executed in an engine, a dict is created and returned. This
626 626 method retrieves that dict for previous commands.
627 627
628 628 :Parameters:
629 629 i : int
630 630 The number of the result to get
631 631 targets : id or list of ids
632 632 The engine to use for the execution
633 633 block : boolean
634 634 If False, this method will return the actual result. If False,
635 635 a `PendingResult` is returned which can be used to get the result
636 636 at a later time.
637 637 """
638 638 targets, block = self._findTargetsAndBlock(targets, block)
639 639 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
640 640 if block:
641 641 result = ResultList(result)
642 642 else:
643 643 result = PendingResult(self, result)
644 644 result.add_callback(wrapResultList)
645 645 return result
646 646
647 647 def reset(self, targets=None, block=None):
648 648 """
649 649 Reset an engine.
650 650
651 651 This method clears out the namespace of an engine.
652 652
653 653 :Parameters:
654 654 targets : id or list of ids
655 655 The engine to use for the execution
656 656 block : boolean
657 657 If False, this method will return the actual result. If False,
658 658 a `PendingResult` is returned which can be used to get the result
659 659 at a later time.
660 660 """
661 661 targets, block = self._findTargetsAndBlock(targets, block)
662 662 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
663 663
664 664 def keys(self, targets=None, block=None):
665 665 """
666 666 Get a list of all the variables in an engine's namespace.
667 667
668 668 :Parameters:
669 669 targets : id or list of ids
670 670 The engine to use for the execution
671 671 block : boolean
672 672 If False, this method will return the actual result. If False,
673 673 a `PendingResult` is returned which can be used to get the result
674 674 at a later time.
675 675 """
676 676 targets, block = self._findTargetsAndBlock(targets, block)
677 677 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
678 678
679 679 def kill(self, controller=False, targets=None, block=None):
680 680 """
681 681 Kill the engines and controller.
682 682
683 683 This method is used to stop the engine and controller by calling
684 684 `reactor.stop`.
685 685
686 686 :Parameters:
687 687 controller : boolean
688 688 If True, kill the engines and controller. If False, just the
689 689 engines
690 690 targets : id or list of ids
691 691 The engine to use for the execution
692 692 block : boolean
693 693 If False, this method will return the actual result. If False,
694 694 a `PendingResult` is returned which can be used to get the result
695 695 at a later time.
696 696 """
697 697 targets, block = self._findTargetsAndBlock(targets, block)
698 698 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
699 699
700 700 def clear_queue(self, targets=None, block=None):
701 701 """
702 702 Clear out the controller's queue for an engine.
703 703
704 704 The controller maintains a queue for each engine. This clear it out.
705 705
706 706 :Parameters:
707 707 targets : id or list of ids
708 708 The engine to use for the execution
709 709 block : boolean
710 710 If False, this method will return the actual result. If False,
711 711 a `PendingResult` is returned which can be used to get the result
712 712 at a later time.
713 713 """
714 714 targets, block = self._findTargetsAndBlock(targets, block)
715 715 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
716 716
717 717 def queue_status(self, targets=None, block=None):
718 718 """
719 719 Get the status of an engines queue.
720 720
721 721 :Parameters:
722 722 targets : id or list of ids
723 723 The engine to use for the execution
724 724 block : boolean
725 725 If False, this method will return the actual result. If False,
726 726 a `PendingResult` is returned which can be used to get the result
727 727 at a later time.
728 728 """
729 729 targets, block = self._findTargetsAndBlock(targets, block)
730 730 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
731 731
732 732 def set_properties(self, properties, targets=None, block=None):
733 733 targets, block = self._findTargetsAndBlock(targets, block)
734 734 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
735 735
736 736 def get_properties(self, keys=None, targets=None, block=None):
737 737 targets, block = self._findTargetsAndBlock(targets, block)
738 738 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
739 739
740 740 def has_properties(self, keys, targets=None, block=None):
741 741 targets, block = self._findTargetsAndBlock(targets, block)
742 742 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
743 743
744 744 def del_properties(self, keys, targets=None, block=None):
745 745 targets, block = self._findTargetsAndBlock(targets, block)
746 746 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
747 747
748 748 def clear_properties(self, targets=None, block=None):
749 749 targets, block = self._findTargetsAndBlock(targets, block)
750 750 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
751 751
752 752 #---------------------------------------------------------------------------
753 753 # IMultiEngine related methods
754 754 #---------------------------------------------------------------------------
755 755
756 756 def get_ids(self):
757 757 """
758 758 Returns the ids of currently registered engines.
759 759 """
760 760 result = blockingCallFromThread(self.smultiengine.get_ids)
761 761 return result
762 762
763 763 #---------------------------------------------------------------------------
764 764 # IMultiEngineCoordinator
765 765 #---------------------------------------------------------------------------
766 766
767 767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
768 768 """
769 769 Partition a Python sequence and send the partitions to a set of engines.
770 770 """
771 771 targets, block = self._findTargetsAndBlock(targets, block)
772 772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
773 773 dist, flatten, targets=targets, block=block)
774 774
775 775 def gather(self, key, dist='b', targets=None, block=None):
776 776 """
777 777 Gather a partitioned sequence on a set of engines as a single local seq.
778 778 """
779 779 targets, block = self._findTargetsAndBlock(targets, block)
780 780 return self._blockFromThread(self.smultiengine.gather, key, dist,
781 781 targets=targets, block=block)
782 782
783 783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
784 784 """
785 785 A parallelized version of Python's builtin map.
786 786
787 787 This has a slightly different syntax than the builtin `map`.
788 788 This is needed because we need to have keyword arguments and thus
789 789 can't use *args to capture all the sequences. Instead, they must
790 790 be passed in a list or tuple.
791 791
792 792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793 793
794 794 Most users will want to use parallel functions or the `mapper`
795 795 and `map` methods for an API that follows that of the builtin
796 796 `map`.
797 797 """
798 798 targets, block = self._findTargetsAndBlock(targets, block)
799 799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
800 800 dist, targets=targets, block=block)
801 801
802 802 def map(self, func, *sequences):
803 803 """
804 804 A parallel version of Python's builtin `map` function.
805 805
806 806 This method applies a function to sequences of arguments. It
807 807 follows the same syntax as the builtin `map`.
808 808
809 809 This method creates a mapper objects by calling `self.mapper` with
810 810 no arguments and then uses that mapper to do the mapping. See
811 811 the documentation of `mapper` for more details.
812 812 """
813 813 return self.mapper().map(func, *sequences)
814 814
815 815 def mapper(self, dist='b', targets='all', block=None):
816 816 """
817 817 Create a mapper object that has a `map` method.
818 818
819 819 This method returns an object that implements the `IMapper`
820 820 interface. This method is a factory that is used to control how
821 821 the map happens.
822 822
823 823 :Parameters:
824 824 dist : str
825 825 What decomposition to use, 'b' is the only one supported
826 826 currently
827 827 targets : str, int, sequence of ints
828 828 Which engines to use for the map
829 829 block : boolean
830 830 Should calls to `map` block or not
831 831 """
832 832 return MultiEngineMapper(self, dist, targets, block)
833 833
834 834 def parallel(self, dist='b', targets=None, block=None):
835 835 """
836 836 A decorator that turns a function into a parallel function.
837 837
838 838 This can be used as:
839 839
840 840 @parallel()
841 841 def f(x, y)
842 842 ...
843 843
844 844 f(range(10), range(10))
845 845
846 846 This causes f(0,0), f(1,1), ... to be called in parallel.
847 847
848 848 :Parameters:
849 849 dist : str
850 850 What decomposition to use, 'b' is the only one supported
851 851 currently
852 852 targets : str, int, sequence of ints
853 853 Which engines to use for the map
854 854 block : boolean
855 855 Should calls to `map` block or not
856 856 """
857 857 targets, block = self._findTargetsAndBlock(targets, block)
858 858 mapper = self.mapper(dist, targets, block)
859 859 pf = ParallelFunction(mapper)
860 860 return pf
861 861
862 862 #---------------------------------------------------------------------------
863 863 # IMultiEngineExtras
864 864 #---------------------------------------------------------------------------
865 865
866 866 def zip_pull(self, keys, targets=None, block=None):
867 867 targets, block = self._findTargetsAndBlock(targets, block)
868 868 return self._blockFromThread(self.smultiengine.zip_pull, keys,
869 869 targets=targets, block=block)
870 870
871 871 def run(self, filename, targets=None, block=None):
872 872 """
873 873 Run a Python code in a file on the engines.
874 874
875 875 :Parameters:
876 876 filename : str
877 877 The name of the local file to run
878 878 targets : id or list of ids
879 879 The engine to use for the execution
880 880 block : boolean
881 881 If False, this method will return the actual result. If False,
882 882 a `PendingResult` is returned which can be used to get the result
883 883 at a later time.
884 884 """
885 885 targets, block = self._findTargetsAndBlock(targets, block)
886 886 return self._blockFromThread(self.smultiengine.run, filename,
887 887 targets=targets, block=block)
888
889 def benchmark(self, push_size=10000):
890 """
891 Run performance benchmarks for the current IPython cluster.
892
893 This method tests both the latency of sending command and data to the
894 engines as well as the throughput of sending large objects to the
895 engines using push. The latency is measured by having one or more
896 engines execute the command 'pass'. The throughput is measure by
897 sending an NumPy array of size `push_size` to one or more engines.
898
899 These benchmarks will vary widely on different hardware and networks
900 and thus can be used to get an idea of the performance characteristics
901 of a particular configuration of an IPython controller and engines.
902
903 This function is not testable within our current testing framework.
904 """
905 import timeit, __builtin__
906 __builtin__._mec_self = self
907 benchmarks = {}
908 repeat = 3
909 count = 10
910
911 timer = timeit.Timer('_mec_self.execute("pass",0)')
912 result = 1000*min(timer.repeat(repeat,count))/count
913 benchmarks['single_engine_latency'] = (result,'msec')
914
915 timer = timeit.Timer('_mec_self.execute("pass")')
916 result = 1000*min(timer.repeat(repeat,count))/count
917 benchmarks['all_engine_latency'] = (result,'msec')
888 918
919 try:
920 import numpy as np
921 except:
922 pass
923 else:
924 timer = timeit.Timer(
925 "_mec_self.push(d)",
926 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
927 )
928 result = min(timer.repeat(repeat,count))/count
929 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
930
931 try:
932 import numpy as np
933 except:
934 pass
935 else:
936 timer = timeit.Timer(
937 "_mec_self.push(d,0)",
938 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
939 )
940 result = min(timer.repeat(repeat,count))/count
941 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
942
943 return benchmarks
889 944
890 945
891 946 components.registerAdapter(FullBlockingMultiEngineClient,
892 947 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
893 948
894 949
895 950
896 951
@@ -1,723 +1,783 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import get_ipython_dir, num_cpus
33 33 from IPython.kernel.fcutil import have_crypto
34 34 from IPython.kernel.error import SecurityError
35 35 from IPython.kernel.fcutil import have_crypto
36 36 from IPython.kernel.twistedutil import gatherBoth
37 37 from IPython.kernel.util import printer
38 38
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # General process handling code
42 42 #-----------------------------------------------------------------------------
43 43
44 44 def find_exe(cmd):
45 45 try:
46 46 import win32api
47 47 except ImportError:
48 48 raise ImportError('you need to have pywin32 installed for this to work')
49 49 else:
50 50 try:
51 51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 52 except:
53 53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 54 return path
55 55
56 56 class ProcessStateError(Exception):
57 57 pass
58 58
59 59 class UnknownStatus(Exception):
60 60 pass
61 61
62 62 class LauncherProcessProtocol(ProcessProtocol):
63 63 """
64 64 A ProcessProtocol to go with the ProcessLauncher.
65 65 """
66 66 def __init__(self, process_launcher):
67 67 self.process_launcher = process_launcher
68 68
69 69 def connectionMade(self):
70 70 self.process_launcher.fire_start_deferred(self.transport.pid)
71 71
72 72 def processEnded(self, status):
73 73 value = status.value
74 74 if isinstance(value, ProcessDone):
75 75 self.process_launcher.fire_stop_deferred(0)
76 76 elif isinstance(value, ProcessTerminated):
77 77 self.process_launcher.fire_stop_deferred(
78 78 {'exit_code':value.exitCode,
79 79 'signal':value.signal,
80 80 'status':value.status
81 81 }
82 82 )
83 83 else:
84 84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85 85
86 86 def outReceived(self, data):
87 87 log.msg(data)
88 88
89 89 def errReceived(self, data):
90 90 log.err(data)
91 91
92 92 class ProcessLauncher(object):
93 93 """
94 94 Start and stop an external process in an asynchronous manner.
95 95
96 96 Currently this uses deferreds to notify other parties of process state
97 97 changes. This is an awkward design and should be moved to using
98 98 a formal NotificationCenter.
99 99 """
100 100 def __init__(self, cmd_and_args):
101 101 self.cmd = cmd_and_args[0]
102 102 self.args = cmd_and_args
103 103 self._reset()
104 104
105 105 def _reset(self):
106 106 self.process_protocol = None
107 107 self.pid = None
108 108 self.start_deferred = None
109 109 self.stop_deferreds = []
110 110 self.state = 'before' # before, running, or after
111 111
112 112 @property
113 113 def running(self):
114 114 if self.state == 'running':
115 115 return True
116 116 else:
117 117 return False
118 118
119 119 def fire_start_deferred(self, pid):
120 120 self.pid = pid
121 121 self.state = 'running'
122 122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 123 self.start_deferred.callback(pid)
124 124
125 125 def start(self):
126 126 if self.state == 'before':
127 127 self.process_protocol = LauncherProcessProtocol(self)
128 128 self.start_deferred = defer.Deferred()
129 129 self.process_transport = reactor.spawnProcess(
130 130 self.process_protocol,
131 131 self.cmd,
132 132 self.args,
133 133 env=os.environ
134 134 )
135 135 return self.start_deferred
136 136 else:
137 137 s = 'the process has already been started and has state: %r' % \
138 138 self.state
139 139 return defer.fail(ProcessStateError(s))
140 140
141 141 def get_stop_deferred(self):
142 142 if self.state == 'running' or self.state == 'before':
143 143 d = defer.Deferred()
144 144 self.stop_deferreds.append(d)
145 145 return d
146 146 else:
147 147 s = 'this process is already complete'
148 148 return defer.fail(ProcessStateError(s))
149 149
150 150 def fire_stop_deferred(self, exit_code):
151 151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 152 self.state = 'after'
153 153 for d in self.stop_deferreds:
154 154 d.callback(exit_code)
155 155
156 156 def signal(self, sig):
157 157 """
158 158 Send a signal to the process.
159 159
160 160 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 161 """
162 162 if self.state == 'running':
163 163 self.process_transport.signalProcess(sig)
164 164
165 165 # def __del__(self):
166 166 # self.signal('KILL')
167 167
168 168 def interrupt_then_kill(self, delay=1.0):
169 169 self.signal('INT')
170 170 reactor.callLater(delay, self.signal, 'KILL')
171 171
172 172
173 173 #-----------------------------------------------------------------------------
174 174 # Code for launching controller and engines
175 175 #-----------------------------------------------------------------------------
176 176
177 177
178 178 class ControllerLauncher(ProcessLauncher):
179 179
180 180 def __init__(self, extra_args=None):
181 181 if sys.platform == 'win32':
182 182 # This logic is needed because the ipcontroller script doesn't
183 183 # always get installed in the same way or in the same location.
184 184 from IPython.kernel.scripts import ipcontroller
185 185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 186 # The -u option here turns on unbuffered output, which is required
187 187 # on Win32 to prevent wierd conflict and problems with Twisted
188 188 args = [find_exe('python'), '-u', script_location]
189 189 else:
190 190 args = ['ipcontroller']
191 191 self.extra_args = extra_args
192 192 if extra_args is not None:
193 193 args.extend(extra_args)
194 194
195 195 ProcessLauncher.__init__(self, args)
196 196
197 197
198 198 class EngineLauncher(ProcessLauncher):
199 199
200 200 def __init__(self, extra_args=None):
201 201 if sys.platform == 'win32':
202 202 # This logic is needed because the ipcontroller script doesn't
203 203 # always get installed in the same way or in the same location.
204 204 from IPython.kernel.scripts import ipengine
205 205 script_location = ipengine.__file__.replace('.pyc', '.py')
206 206 # The -u option here turns on unbuffered output, which is required
207 207 # on Win32 to prevent wierd conflict and problems with Twisted
208 208 args = [find_exe('python'), '-u', script_location]
209 209 else:
210 210 args = ['ipengine']
211 211 self.extra_args = extra_args
212 212 if extra_args is not None:
213 213 args.extend(extra_args)
214 214
215 215 ProcessLauncher.__init__(self, args)
216 216
217 217
218 218 class LocalEngineSet(object):
219 219
220 220 def __init__(self, extra_args=None):
221 221 self.extra_args = extra_args
222 222 self.launchers = []
223 223
224 224 def start(self, n):
225 225 dlist = []
226 226 for i in range(n):
227 227 el = EngineLauncher(extra_args=self.extra_args)
228 228 d = el.start()
229 229 self.launchers.append(el)
230 230 dlist.append(d)
231 231 dfinal = gatherBoth(dlist, consumeErrors=True)
232 232 dfinal.addCallback(self._handle_start)
233 233 return dfinal
234 234
235 235 def _handle_start(self, r):
236 236 log.msg('Engines started with pids: %r' % r)
237 237 return r
238 238
239 239 def _handle_stop(self, r):
240 240 log.msg('Engines received signal: %r' % r)
241 241 return r
242 242
243 243 def signal(self, sig):
244 244 dlist = []
245 245 for el in self.launchers:
246 246 d = el.get_stop_deferred()
247 247 dlist.append(d)
248 248 el.signal(sig)
249 249 dfinal = gatherBoth(dlist, consumeErrors=True)
250 250 dfinal.addCallback(self._handle_stop)
251 251 return dfinal
252 252
253 253 def interrupt_then_kill(self, delay=1.0):
254 254 dlist = []
255 255 for el in self.launchers:
256 256 d = el.get_stop_deferred()
257 257 dlist.append(d)
258 258 el.interrupt_then_kill(delay)
259 259 dfinal = gatherBoth(dlist, consumeErrors=True)
260 260 dfinal.addCallback(self._handle_stop)
261 261 return dfinal
262 262
263 263
264 264 class BatchEngineSet(object):
265 265
266 266 # Subclasses must fill these in. See PBSEngineSet
267 267 submit_command = ''
268 268 delete_command = ''
269 269 job_id_regexp = ''
270 270
271 271 def __init__(self, template_file, **kwargs):
272 272 self.template_file = template_file
273 273 self.context = {}
274 274 self.context.update(kwargs)
275 275 self.batch_file = self.template_file+'-run'
276 276
277 277 def parse_job_id(self, output):
278 278 m = re.match(self.job_id_regexp, output)
279 279 if m is not None:
280 280 job_id = m.group()
281 281 else:
282 282 raise Exception("job id couldn't be determined: %s" % output)
283 283 self.job_id = job_id
284 284 log.msg('Job started with job id: %r' % job_id)
285 285 return job_id
286 286
287 287 def write_batch_script(self, n):
288 288 self.context['n'] = n
289 289 template = open(self.template_file, 'r').read()
290 290 log.msg('Using template for batch script: %s' % self.template_file)
291 291 script_as_string = Itpl.itplns(template, self.context)
292 292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
293 293 f = open(self.batch_file,'w')
294 294 f.write(script_as_string)
295 295 f.close()
296 296
297 297 def handle_error(self, f):
298 298 f.printTraceback()
299 299 f.raiseException()
300 300
301 301 def start(self, n):
302 302 self.write_batch_script(n)
303 303 d = getProcessOutput(self.submit_command,
304 304 [self.batch_file],env=os.environ)
305 305 d.addCallback(self.parse_job_id)
306 306 d.addErrback(self.handle_error)
307 307 return d
308 308
309 309 def kill(self):
310 310 d = getProcessOutput(self.delete_command,
311 311 [self.job_id],env=os.environ)
312 312 return d
313 313
314 314 class PBSEngineSet(BatchEngineSet):
315 315
316 316 submit_command = 'qsub'
317 317 delete_command = 'qdel'
318 318 job_id_regexp = '\d+'
319 319
320 320 def __init__(self, template_file, **kwargs):
321 321 BatchEngineSet.__init__(self, template_file, **kwargs)
322 322
323 323
324 324 sshx_template="""#!/bin/sh
325 325 "$@" &> /dev/null &
326 326 echo $!
327 327 """
328 328
329 329 engine_killer_template="""#!/bin/sh
330 330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 331 """
332 332
333 333 class SSHEngineSet(object):
334 334 sshx_template=sshx_template
335 335 engine_killer_template=engine_killer_template
336 336
337 337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 338 """Start a controller on localhost and engines using ssh.
339 339
340 340 The engine_hosts argument is a dict with hostnames as keys and
341 341 the number of engine (int) as values. sshx is the name of a local
342 342 file that will be used to run remote commands. This file is used
343 343 to setup the environment properly.
344 344 """
345 345
346 346 self.temp_dir = tempfile.gettempdir()
347 347 if sshx is not None:
348 348 self.sshx = sshx
349 349 else:
350 350 # Write the sshx.sh file locally from our template.
351 351 self.sshx = os.path.join(
352 352 self.temp_dir,
353 353 '%s-main-sshx.sh' % os.environ['USER']
354 354 )
355 355 f = open(self.sshx, 'w')
356 356 f.writelines(self.sshx_template)
357 357 f.close()
358 358 self.engine_command = ipengine
359 359 self.engine_hosts = engine_hosts
360 360 # Write the engine killer script file locally from our template.
361 361 self.engine_killer = os.path.join(
362 362 self.temp_dir,
363 363 '%s-local-engine_killer.sh' % os.environ['USER']
364 364 )
365 365 f = open(self.engine_killer, 'w')
366 366 f.writelines(self.engine_killer_template)
367 367 f.close()
368 368
369 369 def start(self, send_furl=False):
370 370 dlist = []
371 371 for host in self.engine_hosts.keys():
372 372 count = self.engine_hosts[host]
373 373 d = self._start(host, count, send_furl)
374 374 dlist.append(d)
375 375 return gatherBoth(dlist, consumeErrors=True)
376 376
377 377 def _start(self, hostname, count=1, send_furl=False):
378 378 if send_furl:
379 379 d = self._scp_furl(hostname)
380 380 else:
381 381 d = defer.succeed(None)
382 382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 384 return d
385 385
386 386 def _scp_furl(self, hostname):
387 387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 388 cmd_list = scp_cmd.split()
389 389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 390 log.msg('Copying furl file: %s' % scp_cmd)
391 391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 392 return d
393 393
394 394 def _scp_sshx(self, hostname):
395 395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 396 self.sshx, hostname,
397 397 self.temp_dir, os.environ['USER']
398 398 )
399 399 print
400 400 log.msg("Copying sshx: %s" % scp_cmd)
401 401 sshx_scp = scp_cmd.split()
402 402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 403 return d
404 404
405 405 def _ssh_engine(self, hostname, count):
406 406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 407 hostname, self.temp_dir,
408 408 os.environ['USER'], self.engine_command
409 409 )
410 410 cmds = exec_engine.split()
411 411 dlist = []
412 412 log.msg("about to start engines...")
413 413 for i in range(count):
414 414 log.msg('Starting engines: %s' % exec_engine)
415 415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 416 dlist.append(d)
417 417 return gatherBoth(dlist, consumeErrors=True)
418 418
419 419 def kill(self):
420 420 dlist = []
421 421 for host in self.engine_hosts.keys():
422 422 d = self._killall(host)
423 423 dlist.append(d)
424 424 return gatherBoth(dlist, consumeErrors=True)
425 425
426 426 def _killall(self, hostname):
427 427 d = self._scp_engine_killer(hostname)
428 428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 429 # d.addErrback(self._exec_err)
430 430 return d
431 431
432 432 def _scp_engine_killer(self, hostname):
433 433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 434 self.engine_killer,
435 435 hostname,
436 436 self.temp_dir,
437 437 os.environ['USER']
438 438 )
439 439 cmds = scp_cmd.split()
440 440 log.msg('Copying engine_killer: %s' % scp_cmd)
441 441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
442 442 return d
443 443
444 444 def _ssh_kill(self, hostname):
445 445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 446 hostname,
447 447 self.temp_dir,
448 448 os.environ['USER']
449 449 )
450 450 log.msg('Killing engine: %s' % kill_cmd)
451 451 kill_cmd = kill_cmd.split()
452 452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 453 return d
454 454
455 455 def _exec_err(self, r):
456 456 log.msg(r)
457 457
458 458 #-----------------------------------------------------------------------------
459 459 # Main functions for the different types of clusters
460 460 #-----------------------------------------------------------------------------
461 461
462 462 # TODO:
463 463 # The logic in these codes should be moved into classes like LocalCluster
464 464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
465 465 # The main functions should then just parse the command line arguments, create
466 466 # the appropriate class and call a 'start' method.
467 467
468 468 def check_security(args, cont_args):
469 469 if (not args.x or not args.y) and not have_crypto:
470 470 log.err("""
471 471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
472 472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
473 473 reactor.stop()
474 474 return False
475 475 if args.x:
476 476 cont_args.append('-x')
477 477 if args.y:
478 478 cont_args.append('-y')
479 479 return True
480 480
481 def check_reuse(args, cont_args):
482 if args.r:
483 cont_args.append('-r')
484 if args.client_port == 0 or args.engine_port == 0:
485 log.err("""
486 To reuse FURL files, you must also set the client and engine ports using
487 the --client-port and --engine-port options.""")
488 reactor.stop()
489 return False
490 cont_args.append('--client-port=%i' % args.client_port)
491 cont_args.append('--engine-port=%i' % args.engine_port)
492 return True
481 493
482 494 def main_local(args):
483 495 cont_args = []
484 496 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
485
497
486 498 # Check security settings before proceeding
487 499 if not check_security(args, cont_args):
488 500 return
489
501
502 # See if we are reusing FURL files
503 if not check_reuse(args, cont_args):
504 return
505
490 506 cl = ControllerLauncher(extra_args=cont_args)
491 507 dstart = cl.start()
492 508 def start_engines(cont_pid):
493 509 engine_args = []
494 510 engine_args.append('--logfile=%s' % \
495 511 pjoin(args.logdir,'ipengine%s-' % cont_pid))
496 512 eset = LocalEngineSet(extra_args=engine_args)
497 513 def shutdown(signum, frame):
498 514 log.msg('Stopping local cluster')
499 515 # We are still playing with the times here, but these seem
500 516 # to be reliable in allowing everything to exit cleanly.
501 517 eset.interrupt_then_kill(0.5)
502 518 cl.interrupt_then_kill(0.5)
503 519 reactor.callLater(1.0, reactor.stop)
504 520 signal.signal(signal.SIGINT,shutdown)
505 521 d = eset.start(args.n)
506 522 return d
507 523 def delay_start(cont_pid):
508 524 # This is needed because the controller doesn't start listening
509 525 # right when it starts and the controller needs to write
510 526 # furl files for the engine to pick up
511 527 reactor.callLater(1.0, start_engines, cont_pid)
512 528 dstart.addCallback(delay_start)
513 529 dstart.addErrback(lambda f: f.raiseException())
514 530
515 531
516 def main_mpirun(args):
532 def main_mpi(args):
517 533 cont_args = []
518 534 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
519
535
520 536 # Check security settings before proceeding
521 537 if not check_security(args, cont_args):
522 538 return
523
539
540 # See if we are reusing FURL files
541 if not check_reuse(args, cont_args):
542 return
543
524 544 cl = ControllerLauncher(extra_args=cont_args)
525 545 dstart = cl.start()
526 546 def start_engines(cont_pid):
527 raw_args = ['mpirun']
547 raw_args = [args.cmd]
528 548 raw_args.extend(['-n',str(args.n)])
529 549 raw_args.append('ipengine')
530 550 raw_args.append('-l')
531 551 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
532 552 if args.mpi:
533 553 raw_args.append('--mpi=%s' % args.mpi)
534 554 eset = ProcessLauncher(raw_args)
535 555 def shutdown(signum, frame):
536 556 log.msg('Stopping local cluster')
537 557 # We are still playing with the times here, but these seem
538 558 # to be reliable in allowing everything to exit cleanly.
539 559 eset.interrupt_then_kill(1.0)
540 560 cl.interrupt_then_kill(1.0)
541 561 reactor.callLater(2.0, reactor.stop)
542 562 signal.signal(signal.SIGINT,shutdown)
543 563 d = eset.start()
544 564 return d
545 565 def delay_start(cont_pid):
546 566 # This is needed because the controller doesn't start listening
547 567 # right when it starts and the controller needs to write
548 568 # furl files for the engine to pick up
549 569 reactor.callLater(1.0, start_engines, cont_pid)
550 570 dstart.addCallback(delay_start)
551 571 dstart.addErrback(lambda f: f.raiseException())
552 572
553 573
554 574 def main_pbs(args):
555 575 cont_args = []
556 576 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
557
577
558 578 # Check security settings before proceeding
559 579 if not check_security(args, cont_args):
560 580 return
561
581
582 # See if we are reusing FURL files
583 if not check_reuse(args, cont_args):
584 return
585
562 586 cl = ControllerLauncher(extra_args=cont_args)
563 587 dstart = cl.start()
564 588 def start_engines(r):
565 589 pbs_set = PBSEngineSet(args.pbsscript)
566 590 def shutdown(signum, frame):
567 591 log.msg('Stopping pbs cluster')
568 592 d = pbs_set.kill()
569 593 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
570 594 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
571 595 signal.signal(signal.SIGINT,shutdown)
572 596 d = pbs_set.start(args.n)
573 597 return d
574 598 dstart.addCallback(start_engines)
575 599 dstart.addErrback(lambda f: f.raiseException())
576 600
577 601
578 602 def main_ssh(args):
579 603 """Start a controller on localhost and engines using ssh.
580 604
581 605 Your clusterfile should look like::
582 606
583 607 send_furl = False # True, if you want
584 608 engines = {
585 609 'engine_host1' : engine_count,
586 610 'engine_host2' : engine_count2
587 611 }
588 612 """
589 613 clusterfile = {}
590 614 execfile(args.clusterfile, clusterfile)
591 615 if not clusterfile.has_key('send_furl'):
592 616 clusterfile['send_furl'] = False
593 617
594 618 cont_args = []
595 619 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
596 620
597 621 # Check security settings before proceeding
598 622 if not check_security(args, cont_args):
599 623 return
600 624
625 # See if we are reusing FURL files
626 if not check_reuse(args, cont_args):
627 return
628
601 629 cl = ControllerLauncher(extra_args=cont_args)
602 630 dstart = cl.start()
603 631 def start_engines(cont_pid):
604 632 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
605 633 def shutdown(signum, frame):
606 634 d = ssh_set.kill()
607 # d.addErrback(log.err)
608 635 cl.interrupt_then_kill(1.0)
609 636 reactor.callLater(2.0, reactor.stop)
610 637 signal.signal(signal.SIGINT,shutdown)
611 638 d = ssh_set.start(clusterfile['send_furl'])
612 639 return d
613 640
614 641 def delay_start(cont_pid):
615 642 reactor.callLater(1.0, start_engines, cont_pid)
616 643
617 644 dstart.addCallback(delay_start)
618 645 dstart.addErrback(lambda f: f.raiseException())
619 646
620 647
621 648 def get_args():
622 649 base_parser = argparse.ArgumentParser(add_help=False)
623 650 base_parser.add_argument(
651 '-r',
652 action='store_true',
653 dest='r',
654 help='try to reuse FURL files. Use with --client-port and --engine-port'
655 )
656 base_parser.add_argument(
657 '--client-port',
658 type=int,
659 dest='client_port',
660 help='the port the controller will listen on for client connections',
661 default=0
662 )
663 base_parser.add_argument(
664 '--engine-port',
665 type=int,
666 dest='engine_port',
667 help='the port the controller will listen on for engine connections',
668 default=0
669 )
670 base_parser.add_argument(
624 671 '-x',
625 672 action='store_true',
626 673 dest='x',
627 674 help='turn off client security'
628 675 )
629 676 base_parser.add_argument(
630 677 '-y',
631 678 action='store_true',
632 679 dest='y',
633 680 help='turn off engine security'
634 681 )
635 682 base_parser.add_argument(
636 683 "--logdir",
637 684 type=str,
638 685 dest="logdir",
639 686 help="directory to put log files (default=$IPYTHONDIR/log)",
640 687 default=pjoin(get_ipython_dir(),'log')
641 688 )
642 689 base_parser.add_argument(
643 690 "-n",
644 691 "--num",
645 692 type=int,
646 693 dest="n",
647 694 default=2,
648 695 help="the number of engines to start"
649 696 )
650 697
651 698 parser = argparse.ArgumentParser(
652 699 description='IPython cluster startup. This starts a controller and\
653 700 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
654 701 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
655 702 )
656 703 subparsers = parser.add_subparsers(
657 704 help='available cluster types. For help, do "ipcluster TYPE --help"')
658 705
659 706 parser_local = subparsers.add_parser(
660 707 'local',
661 708 help='run a local cluster',
662 709 parents=[base_parser]
663 710 )
664 711 parser_local.set_defaults(func=main_local)
665 712
666 713 parser_mpirun = subparsers.add_parser(
667 714 'mpirun',
668 help='run a cluster using mpirun',
715 help='run a cluster using mpirun (mpiexec also works)',
669 716 parents=[base_parser]
670 717 )
671 718 parser_mpirun.add_argument(
672 719 "--mpi",
673 720 type=str,
674 721 dest="mpi", # Don't put a default here to allow no MPI support
675 722 help="how to call MPI_Init (default=mpi4py)"
676 723 )
677 parser_mpirun.set_defaults(func=main_mpirun)
724 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
725
726 parser_mpiexec = subparsers.add_parser(
727 'mpiexec',
728 help='run a cluster using mpiexec (mpirun also works)',
729 parents=[base_parser]
730 )
731 parser_mpiexec.add_argument(
732 "--mpi",
733 type=str,
734 dest="mpi", # Don't put a default here to allow no MPI support
735 help="how to call MPI_Init (default=mpi4py)"
736 )
737 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
678 738
679 739 parser_pbs = subparsers.add_parser(
680 740 'pbs',
681 741 help='run a pbs cluster',
682 742 parents=[base_parser]
683 743 )
684 744 parser_pbs.add_argument(
685 745 '--pbs-script',
686 746 type=str,
687 747 dest='pbsscript',
688 748 help='PBS script template',
689 749 default='pbs.template'
690 750 )
691 751 parser_pbs.set_defaults(func=main_pbs)
692 752
693 753 parser_ssh = subparsers.add_parser(
694 754 'ssh',
695 755 help='run a cluster using ssh, should have ssh-keys setup',
696 756 parents=[base_parser]
697 757 )
698 758 parser_ssh.add_argument(
699 759 '--clusterfile',
700 760 type=str,
701 761 dest='clusterfile',
702 762 help='python file describing the cluster',
703 763 default='clusterfile.py',
704 764 )
705 765 parser_ssh.add_argument(
706 766 '--sshx',
707 767 type=str,
708 768 dest='sshx',
709 769 help='sshx launcher helper'
710 770 )
711 771 parser_ssh.set_defaults(func=main_ssh)
712 772
713 773 args = parser.parse_args()
714 774 return args
715 775
716 776 def main():
717 777 args = get_args()
718 778 reactor.callWhenRunning(args.func, args)
719 779 log.startLogging(sys.stdout)
720 780 reactor.run()
721 781
722 782 if __name__ == '__main__':
723 783 main()
@@ -1,398 +1,401 b''
1 1 .. _changes:
2 2
3 3 ==========
4 4 What's new
5 5 ==========
6 6
7 7 .. contents::
8 8 ..
9 9 1 Release 0.9.1
10 10 2 Release 0.9
11 11 2.1 New features
12 12 2.2 Bug fixes
13 13 2.3 Backwards incompatible changes
14 14 2.4 Changes merged in from IPython1
15 15 2.4.1 New features
16 16 2.4.2 Bug fixes
17 17 2.4.3 Backwards incompatible changes
18 18 3 Release 0.8.4
19 19 4 Release 0.8.3
20 20 5 Release 0.8.2
21 21 6 Older releases
22 22 ..
23 23
24 24 Release dev
25 25 ===========
26 26
27 27 New features
28 28 ------------
29 29
30 30 * The new ipcluster now has a fully working ssh mode that should work on
31 31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32 32
33 33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
34 34 to Matt Foster for this patch.
35 35
36 36 * Fully refactored :command:`ipcluster` command line program for starting
37 37 IPython clusters. This new version is a complete rewrite and 1) is fully
38 38 cross platform (we now use Twisted's process management), 2) has much
39 39 improved performance, 3) uses subcommands for different types of clusters,
40 40 4) uses argparse for parsing command line options, 5) has better support
41 41 for starting clusters using :command:`mpirun`, 6) has experimental support
42 42 for starting engines using PBS. However, this new version of ipcluster
43 43 should be considered a technology preview. We plan on changing the API
44 44 in significant ways before it is final.
45 45
46 46 * The :mod:`argparse` module has been added to :mod:`IPython.external`.
47 47
48 48 * Fully description of the security model added to the docs.
49 49
50 50 * cd completer: show bookmarks if no other completions are available.
51 51
52 52 * sh profile: easy way to give 'title' to prompt: assign to variable
53 53 '_prompt_title'. It looks like this::
54 54
55 55 [~]|1> _prompt_title = 'sudo!'
56 56 sudo![~]|2>
57 57
58 58 * %edit: If you do '%edit pasted_block', pasted_block
59 59 variable gets updated with new data (so repeated
60 60 editing makes sense)
61 61
62 62 Bug fixes
63 63 ---------
64 64
65 65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66 66
67 67 * The ipengine and ipcontroller scripts now handle missing furl files
68 68 more gracefully by giving better error messages.
69 69
70 70 * %rehashx: Aliases no longer contain dots. python3.0 binary
71 71 will create alias python30. Fixes:
72 72 #259716 "commands with dots in them don't work"
73 73
74 74 * %cpaste: %cpaste -r repeats the last pasted block.
75 75 The block is assigned to pasted_block even if code
76 76 raises exception.
77 77
78 * Bug #274067 'The code in get_home_dir is broken for py2exe' was
79 fixed.
80
78 81 Backwards incompatible changes
79 82 ------------------------------
80 83
81 84 * The controller now has a ``-r`` flag that needs to be used if you want to
82 85 reuse existing furl files. Otherwise they are deleted (the default).
83 86
84 87 * Remove ipy_leo.py. "easy_install ipython-extension" to get it.
85 88 (done to decouple it from ipython release cycle)
86 89
87 90
88 91
89 92 Release 0.9.1
90 93 =============
91 94
92 95 This release was quickly made to restore compatibility with Python 2.4, which
93 96 version 0.9 accidentally broke. No new features were introduced, other than
94 97 some additional testing support for internal use.
95 98
96 99
97 100 Release 0.9
98 101 ===========
99 102
100 103 New features
101 104 ------------
102 105
103 106 * All furl files and security certificates are now put in a read-only
104 107 directory named ~./ipython/security.
105 108
106 109 * A single function :func:`get_ipython_dir`, in :mod:`IPython.genutils` that
107 110 determines the user's IPython directory in a robust manner.
108 111
109 112 * Laurent's WX application has been given a top-level script called
110 113 ipython-wx, and it has received numerous fixes. We expect this code to be
111 114 architecturally better integrated with Gael's WX 'ipython widget' over the
112 115 next few releases.
113 116
114 117 * The Editor synchronization work by Vivian De Smedt has been merged in. This
115 118 code adds a number of new editor hooks to synchronize with editors under
116 119 Windows.
117 120
118 121 * A new, still experimental but highly functional, WX shell by Gael Varoquaux.
119 122 This work was sponsored by Enthought, and while it's still very new, it is
120 123 based on a more cleanly organized arhictecture of the various IPython
121 124 components. We will continue to develop this over the next few releases as a
122 125 model for GUI components that use IPython.
123 126
124 127 * Another GUI frontend, Cocoa based (Cocoa is the OSX native GUI framework),
125 128 authored by Barry Wark. Currently the WX and the Cocoa ones have slightly
126 129 different internal organizations, but the whole team is working on finding
127 130 what the right abstraction points are for a unified codebase.
128 131
129 132 * As part of the frontend work, Barry Wark also implemented an experimental
130 133 event notification system that various ipython components can use. In the
131 134 next release the implications and use patterns of this system regarding the
132 135 various GUI options will be worked out.
133 136
134 137 * IPython finally has a full test system, that can test docstrings with
135 138 IPython-specific functionality. There are still a few pieces missing for it
136 139 to be widely accessible to all users (so they can run the test suite at any
137 140 time and report problems), but it now works for the developers. We are
138 141 working hard on continuing to improve it, as this was probably IPython's
139 142 major Achilles heel (the lack of proper test coverage made it effectively
140 143 impossible to do large-scale refactoring). The full test suite can now
141 144 be run using the :command:`iptest` command line program.
142 145
143 146 * The notion of a task has been completely reworked. An `ITask` interface has
144 147 been created. This interface defines the methods that tasks need to
145 148 implement. These methods are now responsible for things like submitting
146 149 tasks and processing results. There are two basic task types:
147 150 :class:`IPython.kernel.task.StringTask` (this is the old `Task` object, but
148 151 renamed) and the new :class:`IPython.kernel.task.MapTask`, which is based on
149 152 a function.
150 153
151 154 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
152 155 standardize the idea of a `map` method. This interface has a single `map`
153 156 method that has the same syntax as the built-in `map`. We have also defined
154 157 a `mapper` factory interface that creates objects that implement
155 158 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both the
156 159 multiengine and task controller now have mapping capabilties.
157 160
158 161 * The parallel function capabilities have been reworks. The major changes are
159 162 that i) there is now an `@parallel` magic that creates parallel functions,
160 163 ii) the syntax for mulitple variable follows that of `map`, iii) both the
161 164 multiengine and task controller now have a parallel function implementation.
162 165
163 166 * All of the parallel computing capabilities from `ipython1-dev` have been
164 167 merged into IPython proper. This resulted in the following new subpackages:
165 168 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
166 169 :mod:`IPython.tools` and :mod:`IPython.testing`.
167 170
168 171 * As part of merging in the `ipython1-dev` stuff, the `setup.py` script and
169 172 friends have been completely refactored. Now we are checking for
170 173 dependencies using the approach that matplotlib uses.
171 174
172 175 * The documentation has been completely reorganized to accept the
173 176 documentation from `ipython1-dev`.
174 177
175 178 * We have switched to using Foolscap for all of our network protocols in
176 179 :mod:`IPython.kernel`. This gives us secure connections that are both
177 180 encrypted and authenticated.
178 181
179 182 * We have a brand new `COPYING.txt` files that describes the IPython license
180 183 and copyright. The biggest change is that we are putting "The IPython
181 184 Development Team" as the copyright holder. We give more details about
182 185 exactly what this means in this file. All developer should read this and use
183 186 the new banner in all IPython source code files.
184 187
185 188 * sh profile: ./foo runs foo as system command, no need to do !./foo anymore
186 189
187 190 * String lists now support ``sort(field, nums = True)`` method (to easily sort
188 191 system command output). Try it with ``a = !ls -l ; a.sort(1, nums=1)``.
189 192
190 193 * '%cpaste foo' now assigns the pasted block as string list, instead of string
191 194
192 195 * The ipcluster script now run by default with no security. This is done
193 196 because the main usage of the script is for starting things on localhost.
194 197 Eventually when ipcluster is able to start things on other hosts, we will put
195 198 security back.
196 199
197 200 * 'cd --foo' searches directory history for string foo, and jumps to that dir.
198 201 Last part of dir name is checked first. If no matches for that are found,
199 202 look at the whole path.
200 203
201 204
202 205 Bug fixes
203 206 ---------
204 207
205 208 * The Windows installer has been fixed. Now all IPython scripts have ``.bat``
206 209 versions created. Also, the Start Menu shortcuts have been updated.
207 210
208 211 * The colors escapes in the multiengine client are now turned off on win32 as
209 212 they don't print correctly.
210 213
211 214 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing
212 215 mpi_import_statement incorrectly, which was leading the engine to crash when
213 216 mpi was enabled.
214 217
215 218 * A few subpackages had missing ``__init__.py`` files.
216 219
217 220 * The documentation is only created if Sphinx is found. Previously, the
218 221 ``setup.py`` script would fail if it was missing.
219 222
220 223 * Greedy ``cd`` completion has been disabled again (it was enabled in 0.8.4) as
221 224 it caused problems on certain platforms.
222 225
223 226
224 227 Backwards incompatible changes
225 228 ------------------------------
226 229
227 230 * The ``clusterfile`` options of the :command:`ipcluster` command has been
228 231 removed as it was not working and it will be replaced soon by something much
229 232 more robust.
230 233
231 234 * The :mod:`IPython.kernel` configuration now properly find the user's
232 235 IPython directory.
233 236
234 237 * In ipapi, the :func:`make_user_ns` function has been replaced with
235 238 :func:`make_user_namespaces`, to support dict subclasses in namespace
236 239 creation.
237 240
238 241 * :class:`IPython.kernel.client.Task` has been renamed
239 242 :class:`IPython.kernel.client.StringTask` to make way for new task types.
240 243
241 244 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
242 245 and `map`.
243 246
244 247 * Renamed the values that the rename `dist` keyword argument can have from
245 248 `'basic'` to `'b'`.
246 249
247 250 * IPython has a larger set of dependencies if you want all of its capabilities.
248 251 See the `setup.py` script for details.
249 252
250 253 * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and
251 254 :class:`IPython.kernel.client.TaskClient` no longer take the (ip,port) tuple.
252 255 Instead they take the filename of a file that contains the FURL for that
253 256 client. If the FURL file is in your IPYTHONDIR, it will be found automatically
254 257 and the constructor can be left empty.
255 258
256 259 * The asynchronous clients in :mod:`IPython.kernel.asyncclient` are now created
257 260 using the factory functions :func:`get_multiengine_client` and
258 261 :func:`get_task_client`. These return a `Deferred` to the actual client.
259 262
260 263 * The command line options to `ipcontroller` and `ipengine` have changed to
261 264 reflect the new Foolscap network protocol and the FURL files. Please see the
262 265 help for these scripts for details.
263 266
264 267 * The configuration files for the kernel have changed because of the Foolscap
265 268 stuff. If you were using custom config files before, you should delete them
266 269 and regenerate new ones.
267 270
268 271 Changes merged in from IPython1
269 272 -------------------------------
270 273
271 274 New features
272 275 ............
273 276
274 277 * Much improved ``setup.py`` and ``setupegg.py`` scripts. Because Twisted and
275 278 zope.interface are now easy installable, we can declare them as dependencies
276 279 in our setupegg.py script.
277 280
278 281 * IPython is now compatible with Twisted 2.5.0 and 8.x.
279 282
280 283 * Added a new example of how to use :mod:`ipython1.kernel.asynclient`.
281 284
282 285 * Initial draft of a process daemon in :mod:`ipython1.daemon`. This has not
283 286 been merged into IPython and is still in `ipython1-dev`.
284 287
285 288 * The ``TaskController`` now has methods for getting the queue status.
286 289
287 290 * The ``TaskResult`` objects not have information about how long the task
288 291 took to run.
289 292
290 293 * We are attaching additional attributes to exceptions ``(_ipython_*)`` that
291 294 we use to carry additional info around.
292 295
293 296 * New top-level module :mod:`asyncclient` that has asynchronous versions (that
294 297 return deferreds) of the client classes. This is designed to users who want
295 298 to run their own Twisted reactor.
296 299
297 300 * All the clients in :mod:`client` are now based on Twisted. This is done by
298 301 running the Twisted reactor in a separate thread and using the
299 302 :func:`blockingCallFromThread` function that is in recent versions of Twisted.
300 303
301 304 * Functions can now be pushed/pulled to/from engines using
302 305 :meth:`MultiEngineClient.push_function` and
303 306 :meth:`MultiEngineClient.pull_function`.
304 307
305 308 * Gather/scatter are now implemented in the client to reduce the work load
306 309 of the controller and improve performance.
307 310
308 311 * Complete rewrite of the IPython docuementation. All of the documentation
309 312 from the IPython website has been moved into docs/source as restructured
310 313 text documents. PDF and HTML documentation are being generated using
311 314 Sphinx.
312 315
313 316 * New developer oriented documentation: development guidelines and roadmap.
314 317
315 318 * Traditional ``ChangeLog`` has been changed to a more useful ``changes.txt``
316 319 file that is organized by release and is meant to provide something more
317 320 relevant for users.
318 321
319 322 Bug fixes
320 323 .........
321 324
322 325 * Created a proper ``MANIFEST.in`` file to create source distributions.
323 326
324 327 * Fixed a bug in the ``MultiEngine`` interface. Previously, multi-engine
325 328 actions were being collected with a :class:`DeferredList` with
326 329 ``fireononeerrback=1``. This meant that methods were returning
327 330 before all engines had given their results. This was causing extremely odd
328 331 bugs in certain cases. To fix this problem, we have 1) set
329 332 ``fireononeerrback=0`` to make sure all results (or exceptions) are in
330 333 before returning and 2) introduced a :exc:`CompositeError` exception
331 334 that wraps all of the engine exceptions. This is a huge change as it means
332 335 that users will have to catch :exc:`CompositeError` rather than the actual
333 336 exception.
334 337
335 338 Backwards incompatible changes
336 339 ..............................
337 340
338 341 * All names have been renamed to conform to the lowercase_with_underscore
339 342 convention. This will require users to change references to all names like
340 343 ``queueStatus`` to ``queue_status``.
341 344
342 345 * Previously, methods like :meth:`MultiEngineClient.push` and
343 346 :meth:`MultiEngineClient.push` used ``*args`` and ``**kwargs``. This was
344 347 becoming a problem as we weren't able to introduce new keyword arguments into
345 348 the API. Now these methods simple take a dict or sequence. This has also
346 349 allowed us to get rid of the ``*All`` methods like :meth:`pushAll` and
347 350 :meth:`pullAll`. These things are now handled with the ``targets`` keyword
348 351 argument that defaults to ``'all'``.
349 352
350 353 * The :attr:`MultiEngineClient.magicTargets` has been renamed to
351 354 :attr:`MultiEngineClient.targets`.
352 355
353 356 * All methods in the MultiEngine interface now accept the optional keyword
354 357 argument ``block``.
355 358
356 359 * Renamed :class:`RemoteController` to :class:`MultiEngineClient` and
357 360 :class:`TaskController` to :class:`TaskClient`.
358 361
359 362 * Renamed the top-level module from :mod:`api` to :mod:`client`.
360 363
361 364 * Most methods in the multiengine interface now raise a :exc:`CompositeError`
362 365 exception that wraps the user's exceptions, rather than just raising the raw
363 366 user's exception.
364 367
365 368 * Changed the ``setupNS`` and ``resultNames`` in the ``Task`` class to ``push``
366 369 and ``pull``.
367 370
368 371
369 372 Release 0.8.4
370 373 =============
371 374
372 375 This was a quick release to fix an unfortunate bug that slipped into the 0.8.3
373 376 release. The ``--twisted`` option was disabled, as it turned out to be broken
374 377 across several platforms.
375 378
376 379
377 380 Release 0.8.3
378 381 =============
379 382
380 383 * pydb is now disabled by default (due to %run -d problems). You can enable
381 384 it by passing -pydb command line argument to IPython. Note that setting
382 385 it in config file won't work.
383 386
384 387
385 388 Release 0.8.2
386 389 =============
387 390
388 391 * %pushd/%popd behave differently; now "pushd /foo" pushes CURRENT directory
389 392 and jumps to /foo. The current behaviour is closer to the documented
390 393 behaviour, and should not trip anyone.
391 394
392 395
393 396 Older releases
394 397 ==============
395 398
396 399 Changes in earlier releases of IPython are described in the older file
397 400 ``ChangeLog``. Please refer to this document for details.
398 401
@@ -1,157 +1,157 b''
1 1 .. _parallelmpi:
2 2
3 3 =======================
4 4 Using MPI with IPython
5 5 =======================
6 6
7 7 Often, a parallel algorithm will require moving data between the engines. One way of accomplishing this is by doing a pull and then a push using the multiengine client. However, this will be slow as all the data has to go through the controller to the client and then back through the controller, to its final destination.
8 8
9 9 A much better way of moving data between engines is to use a message passing library, such as the Message Passing Interface (MPI) [MPI]_. IPython's parallel computing architecture has been designed from the ground up to integrate with MPI. This document describes how to use MPI with IPython.
10 10
11 11 Additional installation requirements
12 12 ====================================
13 13
14 14 If you want to use MPI with IPython, you will need to install:
15 15
16 16 * A standard MPI implementation such as OpenMPI [OpenMPI]_ or MPICH.
17 17 * The mpi4py [mpi4py]_ package.
18 18
19 19 .. note::
20 20
21 21 The mpi4py package is not a strict requirement. However, you need to
22 22 have *some* way of calling MPI from Python. You also need some way of
23 23 making sure that :func:`MPI_Init` is called when the IPython engines start
24 24 up. There are a number of ways of doing this and a good number of
25 25 associated subtleties. We highly recommend just using mpi4py as it
26 26 takes care of most of these problems. If you want to do something
27 27 different, let us know and we can help you get started.
28 28
29 29 Starting the engines with MPI enabled
30 30 =====================================
31 31
32 32 To use code that calls MPI, there are typically two things that MPI requires.
33 33
34 34 1. The process that wants to call MPI must be started using
35 :command:`mpirun` or a batch system (like PBS) that has MPI support.
35 :command:`mpiexec` or a batch system (like PBS) that has MPI support.
36 36 2. Once the process starts, it must call :func:`MPI_Init`.
37 37
38 38 There are a couple of ways that you can start the IPython engines and get these things to happen.
39 39
40 Automatic starting using :command:`mpirun` and :command:`ipcluster`
40 Automatic starting using :command:`mpiexec` and :command:`ipcluster`
41 41 -------------------------------------------------------------------
42 42
43 The easiest approach is to use the `mpirun` mode of :command:`ipcluster`, which will first start a controller and then a set of engines using :command:`mpirun`::
43 The easiest approach is to use the `mpiexec` mode of :command:`ipcluster`, which will first start a controller and then a set of engines using :command:`mpiexec`::
44 44
45 $ ipcluster mpirun -n 4
45 $ ipcluster mpiexec -n 4
46 46
47 47 This approach is best as interrupting :command:`ipcluster` will automatically
48 48 stop and clean up the controller and engines.
49 49
50 Manual starting using :command:`mpirun`
50 Manual starting using :command:`mpiexec`
51 51 ---------------------------------------
52 52
53 If you want to start the IPython engines using the :command:`mpirun`, just do::
53 If you want to start the IPython engines using the :command:`mpiexec`, just do::
54 54
55 $ mpirun -n 4 ipengine --mpi=mpi4py
55 $ mpiexec -n 4 ipengine --mpi=mpi4py
56 56
57 57 This requires that you already have a controller running and that the FURL
58 58 files for the engines are in place. We also have built in support for
59 59 PyTrilinos [PyTrilinos]_, which can be used (assuming is installed) by
60 60 starting the engines with::
61 61
62 mpirun -n 4 ipengine --mpi=pytrilinos
62 mpiexec -n 4 ipengine --mpi=pytrilinos
63 63
64 64 Automatic starting using PBS and :command:`ipcluster`
65 65 -----------------------------------------------------
66 66
67 67 The :command:`ipcluster` command also has built-in integration with PBS. For more information on this approach, see our documentation on :ref:`ipcluster <parallel_process>`.
68 68
69 69 Actually using MPI
70 70 ==================
71 71
72 72 Once the engines are running with MPI enabled, you are ready to go. You can now call any code that uses MPI in the IPython engines. And, all of this can be done interactively. Here we show a simple example that uses mpi4py [mpi4py]_.
73 73
74 74 First, lets define a simply function that uses MPI to calculate the sum of a distributed array. Save the following text in a file called :file:`psum.py`:
75 75
76 76 .. sourcecode:: python
77 77
78 78 from mpi4py import MPI
79 79 import numpy as np
80 80
81 81 def psum(a):
82 82 s = np.sum(a)
83 83 return MPI.COMM_WORLD.Allreduce(s,MPI.SUM)
84 84
85 85 Now, start an IPython cluster in the same directory as :file:`psum.py`::
86 86
87 $ ipcluster mpirun -n 4
87 $ ipcluster mpiexec -n 4
88 88
89 89 Finally, connect to the cluster and use this function interactively. In this case, we create a random array on each engine and sum up all the random arrays using our :func:`psum` function:
90 90
91 91 .. sourcecode:: ipython
92 92
93 93 In [1]: from IPython.kernel import client
94 94
95 95 In [2]: mec = client.MultiEngineClient()
96 96
97 97 In [3]: mec.activate()
98 98
99 99 In [4]: px import numpy as np
100 100 Parallel execution on engines: all
101 101 Out[4]:
102 102 <Results List>
103 103 [0] In [13]: import numpy as np
104 104 [1] In [13]: import numpy as np
105 105 [2] In [13]: import numpy as np
106 106 [3] In [13]: import numpy as np
107 107
108 108 In [6]: px a = np.random.rand(100)
109 109 Parallel execution on engines: all
110 110 Out[6]:
111 111 <Results List>
112 112 [0] In [15]: a = np.random.rand(100)
113 113 [1] In [15]: a = np.random.rand(100)
114 114 [2] In [15]: a = np.random.rand(100)
115 115 [3] In [15]: a = np.random.rand(100)
116 116
117 117 In [7]: px from psum import psum
118 118 Parallel execution on engines: all
119 119 Out[7]:
120 120 <Results List>
121 121 [0] In [16]: from psum import psum
122 122 [1] In [16]: from psum import psum
123 123 [2] In [16]: from psum import psum
124 124 [3] In [16]: from psum import psum
125 125
126 126 In [8]: px s = psum(a)
127 127 Parallel execution on engines: all
128 128 Out[8]:
129 129 <Results List>
130 130 [0] In [17]: s = psum(a)
131 131 [1] In [17]: s = psum(a)
132 132 [2] In [17]: s = psum(a)
133 133 [3] In [17]: s = psum(a)
134 134
135 135 In [9]: px print s
136 136 Parallel execution on engines: all
137 137 Out[9]:
138 138 <Results List>
139 139 [0] In [18]: print s
140 140 [0] Out[18]: 187.451545803
141 141
142 142 [1] In [18]: print s
143 143 [1] Out[18]: 187.451545803
144 144
145 145 [2] In [18]: print s
146 146 [2] Out[18]: 187.451545803
147 147
148 148 [3] In [18]: print s
149 149 [3] Out[18]: 187.451545803
150 150
151 151 Any Python code that makes calls to MPI can be used in this manner, including
152 152 compiled C, C++ and Fortran libraries that have been exposed to Python.
153 153
154 154 .. [MPI] Message Passing Interface. http://www-unix.mcs.anl.gov/mpi/
155 155 .. [mpi4py] MPI for Python. mpi4py: http://mpi4py.scipy.org/
156 156 .. [OpenMPI] Open MPI. http://www.open-mpi.org/
157 157 .. [PyTrilinos] PyTrilinos. http://trilinos.sandia.gov/packages/pytrilinos/ No newline at end of file
@@ -1,324 +1,336 b''
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19 19
20 20 General considerations
21 21 ======================
22 22
23 23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24 24
25 25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26 26
27 27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 28 ``host0``.
29 29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 32 :command:`ipengine`. This command has to be told where the FURL file
33 33 (:file:`ipcontroller-engine.furl`) is located.
34 34
35 35 At this point, the controller and engines will be connected. By default, the
36 36 FURL files created by the controller are put into the
37 37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 38 the controller, step 2 can be skipped as the engines will automatically look
39 39 at that location.
40 40
41 41 The final step required required to actually use the running controller from a
42 42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45 45
46 46 Using :command:`ipcluster`
47 47 ==========================
48 48
49 49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50 50
51 51 1. When the controller and engines are all run on localhost. This is useful
52 52 for testing or running on a multicore computer.
53 53 2. When engines are started using the :command:`mpirun` command that comes
54 54 with most MPI [MPI]_ implementations
55 55 3. When engines are started using the PBS [PBS]_ batch system.
56 56 4. When the controller is started on localhost and the engines are started on
57 57 remote nodes using :command:`ssh`.
58 58
59 59 .. note::
60 60
61 61 It is also possible for advanced users to add support to
62 62 :command:`ipcluster` for starting controllers and engines using other
63 63 methods (like Sun's Grid Engine for example).
64 64
65 65 .. note::
66 66
67 67 Currently :command:`ipcluster` requires that the
68 68 :file:`~/.ipython/security` directory live on a shared filesystem that is
69 69 seen by both the controller and engines. If you don't have a shared file
70 70 system you will need to use :command:`ipcontroller` and
71 71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 72 using the :command:`ssh` method to start the cluster.
73 73
74 74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
75 75 and :command:`ipengine` to perform the steps described above.
76 76
77 77 Using :command:`ipcluster` in local mode
78 78 ----------------------------------------
79 79
80 80 To start one controller and 4 engines on localhost, just do::
81 81
82 82 $ ipcluster local -n 4
83 83
84 84 To see other command line options for the local mode, do::
85 85
86 86 $ ipcluster local -h
87 87
88 Using :command:`ipcluster` in mpirun mode
89 -----------------------------------------
88 Using :command:`ipcluster` in mpiexec/mpirun mode
89 -------------------------------------------------
90 90
91 The mpirun mode is useful if you:
91 The mpiexec/mpirun mode is useful if you:
92 92
93 93 1. Have MPI installed.
94 2. Your systems are configured to use the :command:`mpirun` command to start
95 processes.
94 2. Your systems are configured to use the :command:`mpiexec` or
95 :command:`mpirun` commands to start MPI processes.
96
97 .. note::
98
99 The preferred command to use is :command:`mpiexec`. However, we also
100 support :command:`mpirun` for backwards compatibility. The underlying
101 logic used is exactly the same, the only difference being the name of the
102 command line program that is called.
96 103
97 104 If these are satisfied, you can start an IPython cluster using::
98 105
99 $ ipcluster mpirun -n 4
106 $ ipcluster mpiexec -n 4
100 107
101 108 This does the following:
102 109
103 110 1. Starts the IPython controller on current host.
104 2. Uses :command:`mpirun` to start 4 engines.
111 2. Uses :command:`mpiexec` to start 4 engines.
105 112
106 113 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
107 114
108 $ ipcluster mpirun -n 4 --mpi=mpi4py
115 $ ipcluster mpiexec -n 4 --mpi=mpi4py
109 116
110 117 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
111 118
112 119 Additional command line options for this mode can be found by doing::
113 120
114 $ ipcluster mpirun -h
121 $ ipcluster mpiexec -h
115 122
116 123 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
117 124
118 125
119 126 Using :command:`ipcluster` in PBS mode
120 127 --------------------------------------
121 128
122 129 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
123 130
124 131 .. sourcecode:: bash
125 132
126 133 #PBS -N ipython
127 134 #PBS -j oe
128 135 #PBS -l walltime=00:10:00
129 136 #PBS -l nodes=${n/4}:ppn=4
130 137 #PBS -q parallel
131 138
132 139 cd $$PBS_O_WORKDIR
133 140 export PATH=$$HOME/usr/local/bin
134 141 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
135 142 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
136 143
137 144 There are a few important points about this template:
138 145
139 146 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
140 147 template engine.
141 148
142 149 2. Instead of putting in the actual number of engines, use the notation
143 150 ``${n}`` to indicate the number of engines to be started. You can also uses
144 151 expressions like ``${n/4}`` in the template to indicate the number of
145 152 nodes.
146 153
147 154 3. Because ``$`` is a special character used by the template engine, you must
148 155 escape any ``$`` by using ``$$``. This is important when referring to
149 156 environment variables in the template.
150 157
151 158 4. Any options to :command:`ipengine` should be given in the batch script
152 159 template.
153 160
154 161 5. Depending on the configuration of you system, you may have to set
155 162 environment variables in the script template.
156 163
157 164 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
158 165
159 166 $ ipcluster pbs -n 128 --pbs-script=pbs.template
160 167
161 168 Additional command line options for this mode can be found by doing::
162 169
163 170 $ ipcluster pbs -h
164 171
165 172 Using :command:`ipcluster` in SSH mode
166 173 --------------------------------------
167 174
168 175 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 176 nodes and the :command:`ipcontroller` on localhost.
170 177
171 178 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172 179
173 180 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174 181
175 182 .. sourcecode:: python
176 183
177 184 send_furl = True
178 185 engines = { 'host1.example.com' : 2,
179 186 'host2.example.com' : 5,
180 187 'host3.example.com' : 1,
181 188 'host4.example.com' : 8 }
182 189
183 190 Since this is a regular python file usual python syntax applies. Things to note:
184 191
185 192 * The `engines` dict, where the keys is the host we want to run engines on and
186 193 the value is the number of engines to run on that host.
187 194 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 195 furl needed for :command:`ipengine` to each host.
189 196
190 197 The ``--clusterfile`` command line option lets you specify the file to use for
191 198 the cluster definition. Once you have your cluster file and you can
192 199 :command:`ssh` into the remote hosts with out an password you are ready to
193 200 start your cluster like so:
194 201
195 202 .. sourcecode:: bash
196 203
197 204 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198 205
199 206
200 207 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201 208
202 209 * sshx.sh
203 210 * engine_killer.sh
204 211
205 212 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
206 213
207 214 The default sshx.sh is the following:
208 215
209 216 .. sourcecode:: bash
210 217
211 218 #!/bin/sh
212 219 "$@" &> /dev/null &
213 220 echo $!
214 221
215 222 If you want to use a custom sshx.sh script you need to use the ``--sshx``
216 223 option and specify the file to use. Using a custom sshx.sh file could be
217 224 helpful when you need to setup the environment on the remote host before
218 225 executing :command:`ipengine`.
219 226
220 227 For a detailed options list:
221 228
222 229 .. sourcecode:: bash
223 230
224 231 $ ipcluster ssh -h
225 232
226 233 Current limitations of the SSH mode of :command:`ipcluster` are:
227 234
228 235 * Untested on Windows. Would require a working :command:`ssh` on Windows.
229 236 Also, we are using shell scripts to setup and execute commands on remote
230 237 hosts.
231 238 * :command:`ipcontroller` is started on localhost, with no option to start it
232 239 on a remote node.
233 240
234 241 Using the :command:`ipcontroller` and :command:`ipengine` commands
235 242 ==================================================================
236 243
237 244 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
238 245
239 246 Starting the controller and engine on your local machine
240 247 --------------------------------------------------------
241 248
242 249 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
243 250 local machine, do the following.
244 251
245 252 First start the controller::
246 253
247 254 $ ipcontroller
248 255
249 256 Next, start however many instances of the engine you want using (repeatedly) the command::
250 257
251 258 $ ipengine
252 259
253 260 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
254 261
255 262 .. warning::
256 263
257 264 The order of the above operations is very important. You *must*
258 265 start the controller before the engines, since the engines connect
259 266 to the controller as they get started.
260 267
261 268 .. note::
262 269
263 270 On some platforms (OS X), to put the controller and engine into the
264 271 background you may need to give these commands in the form ``(ipcontroller
265 272 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
266 273 properly.
267 274
268 275 Starting the controller and engines on different hosts
269 276 ------------------------------------------------------
270 277
271 278 When the controller and engines are running on different hosts, things are
272 279 slightly more complicated, but the underlying ideas are the same:
273 280
274 281 1. Start the controller on a host using :command:`ipcontroller`.
275 282 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
276 283 3. Use :command:`ipengine` on the engine's hosts to start the engines.
277 284
278 285 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
279 286
280 287 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
281 288 directory on the engine's host, where it will be found automatically.
282 289 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
283 290 flag.
284 291
285 292 The ``--furl-file`` flag works like this::
286 293
287 294 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
288 295
289 296 .. note::
290 297
291 298 If the controller's and engine's hosts all have a shared file system
292 299 (:file:`~./ipython/security` is the same on all of them), then things
293 300 will just work!
294 301
295 302 Make FURL files persistent
296 303 ---------------------------
297 304
298 305 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
299 306
300 307 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
301 308
302 309 $ ipcontroller -r --client-port=10101 --engine-port=10102
303 310
311 These options also work with all of the various modes of
312 :command:`ipcluster`::
313
314 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
315
304 316 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
305 317
306 318 .. note::
307 319
308 320 You may ask the question: what ports does the controller listen on if you
309 321 don't tell is to use specific ones? The default is to use high random port
310 322 numbers. We do this for two reasons: i) to increase security through
311 323 obscurity and ii) to multiple controllers on a given host to start and
312 324 automatically use different ports.
313 325
314 326 Log files
315 327 ---------
316 328
317 329 All of the components of IPython have log files associated with them.
318 330 These log files can be extremely useful in debugging problems with
319 331 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
320 332 the log files to us will often help us to debug any problems.
321 333
322 334
323 335 .. [PBS] Portable Batch System. http://www.openpbs.org/
324 336 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now