##// END OF EJS Templates
Fix typos in parallel doc
Adam Riggall -
Show More
@@ -1,697 +1,697 b''
1 1 .. _parallel_multiengine:
2 2
3 3 ==========================
4 4 IPython's Direct interface
5 5 ==========================
6 6
7 7 The direct, or multiengine, interface represents one possible way of working with a set of
8 8 IPython engines. The basic idea behind the multiengine interface is that the
9 9 capabilities of each engine are directly and explicitly exposed to the user.
10 10 Thus, in the multiengine interface, each engine is given an id that is used to
11 11 identify the engine and give it work to do. This interface is very intuitive
12 12 and is designed with interactive usage in mind, and is the best place for
13 13 new users of IPython to begin.
14 14
15 15 Starting the IPython controller and engines
16 16 ===========================================
17 17
18 18 To follow along with this tutorial, you will need to start the IPython
19 19 controller and four IPython engines. The simplest way of doing this is to use
20 20 the :command:`ipcluster` command::
21 21
22 22 $ ipcluster start -n 4
23 23
24 24 For more detailed information about starting the controller and engines, see
25 25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26 26
27 27 Creating a ``DirectView`` instance
28 28 ==================================
29 29
30 30 The first step is to import the IPython :mod:`IPython.parallel`
31 31 module and then create a :class:`.Client` instance:
32 32
33 33 .. sourcecode:: ipython
34 34
35 35 In [1]: from IPython.parallel import Client
36 36
37 37 In [2]: rc = Client()
38 38
39 39 This form assumes that the default connection information (stored in
40 40 :file:`ipcontroller-client.json` found in :file:`IPYTHONDIR/profile_default/security`) is
41 41 accurate. If the controller was started on a remote machine, you must copy that connection
42 42 file to the client machine, or enter its contents as arguments to the Client constructor:
43 43
44 44 .. sourcecode:: ipython
45 45
46 46 # If you have copied the json connector file from the controller:
47 47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
48 48 # or to connect with a specific profile you have set up:
49 49 In [3]: rc = Client(profile='mpi')
50 50
51 51
52 52 To make sure there are engines connected to the controller, users can get a list
53 53 of engine ids:
54 54
55 55 .. sourcecode:: ipython
56 56
57 57 In [3]: rc.ids
58 58 Out[3]: [0, 1, 2, 3]
59 59
60 60 Here we see that there are four engines ready to do work for us.
61 61
62 62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 63 constructed via list-access to the client:
64 64
65 65 .. sourcecode:: ipython
66 66
67 67 In [4]: dview = rc[:] # use all engines
68 68
69 69 .. seealso::
70 70
71 71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72 72
73 73
74 74 Quick and easy parallelism
75 75 ==========================
76 76
77 77 In many cases, you simply want to apply a Python function to a sequence of
78 78 objects, but *in parallel*. The client interface provides a simple way
79 79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 Python's builtin :func:`map` functions allows a function to be applied to a
85 85 sequence element-by-element. This type of code is typically trivial to
86 86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 88 DirectView's :meth:`map` method:
89 89
90 90 .. sourcecode:: ipython
91 91
92 92 In [62]: serial_result = map(lambda x:x**10, range(32))
93 93
94 94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95 95
96 96 In [67]: serial_result==parallel_result
97 97 Out[67]: True
98 98
99 99
100 100 .. note::
101 101
102 102 The :class:`DirectView`'s version of :meth:`map` does
103 103 not do dynamic load balancing. For a load balanced version, use a
104 104 :class:`LoadBalancedView`.
105 105
106 106 .. seealso::
107 107
108 108 :meth:`map` is implemented via :class:`ParallelFunction`.
109 109
110 110 Remote function decorators
111 111 --------------------------
112 112
113 113 Remote functions are just like normal functions, but when they are called,
114 114 they execute on one or more engines, rather than locally. IPython provides
115 115 two decorators:
116 116
117 117 .. sourcecode:: ipython
118 118
119 119 In [10]: @dview.remote(block=True)
120 120 ....: def getpid():
121 121 ....: import os
122 122 ....: return os.getpid()
123 123 ....:
124 124
125 125 In [11]: getpid()
126 126 Out[11]: [12345, 12346, 12347, 12348]
127 127
128 128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
129 129 operations and distribute them, reconstructing the result.
130 130
131 131 .. sourcecode:: ipython
132 132
133 133 In [12]: import numpy as np
134 134
135 135 In [13]: A = np.random.random((64,48))
136 136
137 137 In [14]: @dview.parallel(block=True)
138 138 ....: def pmul(A,B):
139 139 ....: return A*B
140 140
141 141 In [15]: C_local = A*A
142 142
143 143 In [16]: C_remote = pmul(A,A)
144 144
145 145 In [17]: (C_local == C_remote).all()
146 146 Out[17]: True
147 147
148 148 Calling a ``@parallel`` function *does not* correspond to map. It is used for splitting
149 149 element-wise operations that operate on a sequence or array. For ``map`` behavior,
150 150 parallel functions do have a map method.
151 151
152 152 ==================== ============================ =============================
153 153 call pfunc(seq) pfunc.map(seq)
154 154 ==================== ============================ =============================
155 155 # of tasks # of engines (1 per engine) # of engines (1 per engine)
156 156 # of remote calls # of engines (1 per engine) ``len(seq)``
157 157 argument to remote ``seq[i:j]`` (sub-sequence) ``seq[i]`` (single element)
158 158 ==================== ============================ =============================
159 159
160 160 A quick example to illustrate the difference in arguments for the two modes:
161 161
162 162 .. sourcecode:: ipython
163 163
164 164 In [16]: @dview.parallel(block=True)
165 165 ....: def echo(x):
166 166 ....: return str(x)
167 167 ....:
168 168
169 169 In [17]: echo(range(5))
170 170 Out[17]: ['[0, 1]', '[2]', '[3]', '[4]']
171 171
172 172 In [18]: echo.map(range(5))
173 173 Out[18]: ['0', '1', '2', '3', '4']
174 174
175 175
176 176 .. seealso::
177 177
178 178 See the :func:`~.remotefunction.parallel` and :func:`~.remotefunction.remote`
179 179 decorators for options.
180 180
181 181 Calling Python functions
182 182 ========================
183 183
184 184 The most basic type of operation that can be performed on the engines is to
185 185 execute Python code or call Python functions. Executing Python code can be
186 186 done in blocking or non-blocking mode (non-blocking is default) using the
187 187 :meth:`.View.execute` method, and calling functions can be done via the
188 188 :meth:`.View.apply` method.
189 189
190 190 apply
191 191 -----
192 192
193 193 The main method for doing remote execution (in fact, all methods that
194 194 communicate with the engines are built on top of it), is :meth:`View.apply`.
195 195
196 196 We strive to provide the cleanest interface we can, so `apply` has the following
197 197 signature:
198 198
199 199 .. sourcecode:: python
200 200
201 201 view.apply(f, *args, **kwargs)
202 202
203 203 There are various ways to call functions with IPython, and these flags are set as
204 204 attributes of the View. The ``DirectView`` has just two of these flags:
205 205
206 206 dv.block : bool
207 207 whether to wait for the result, or return an :class:`AsyncResult` object
208 208 immediately
209 209 dv.track : bool
210 210 whether to instruct pyzmq to track when zeromq is done sending the message.
211 211 This is primarily useful for non-copying sends of numpy arrays that you plan to
212 212 edit in-place. You need to know when it becomes safe to edit the buffer
213 213 without corrupting the message.
214 214 dv.targets : int, list of ints
215 215 which targets this view is associated with.
216 216
217 217
218 218 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
219 219
220 220 .. sourcecode:: ipython
221 221
222 222 In [4]: view = rc[1:3]
223 223 Out[4]: <DirectView [1, 2]>
224 224
225 225 In [5]: view.apply<tab>
226 226 view.apply view.apply_async view.apply_sync
227 227
228 228 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
229 229
230 230 Blocking execution
231 231 ------------------
232 232
233 233 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
234 234 these examples) submits the command to the controller, which places the
235 235 command in the engines' queues for execution. The :meth:`apply` call then
236 236 blocks until the engines are done executing the command:
237 237
238 238 .. sourcecode:: ipython
239 239
240 240 In [2]: dview = rc[:] # A DirectView of all engines
241 241 In [3]: dview.block=True
242 242 In [4]: dview['a'] = 5
243 243
244 244 In [5]: dview['b'] = 10
245 245
246 246 In [6]: dview.apply(lambda x: a+b+x, 27)
247 247 Out[6]: [42, 42, 42, 42]
248 248
249 249 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
250 250 method:
251 251
252 252 .. sourcecode:: ipython
253 253
254 254 In [7]: dview.block=False
255 255
256 256 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
257 257 Out[8]: [42, 42, 42, 42]
258 258
259 259 Python commands can be executed as strings on specific engines by using a View's ``execute``
260 260 method:
261 261
262 262 .. sourcecode:: ipython
263 263
264 264 In [6]: rc[::2].execute('c=a+b')
265 265
266 266 In [7]: rc[1::2].execute('c=a-b')
267 267
268 268 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
269 269 Out[8]: [15, -5, 15, -5]
270 270
271 271
272 272 Non-blocking execution
273 273 ----------------------
274 274
275 275 In non-blocking mode, :meth:`apply` submits the command to be executed and
276 276 then returns a :class:`AsyncResult` object immediately. The
277 277 :class:`AsyncResult` object gives you a way of getting a result at a later
278 278 time through its :meth:`get` method.
279 279
280 280 .. seealso::
281 281
282 282 Docs on the :ref:`AsyncResult <parallel_asyncresult>` object.
283 283
284 284 This allows you to quickly submit long running commands without blocking your
285 285 local Python/IPython session:
286 286
287 287 .. sourcecode:: ipython
288 288
289 289 # define our function
290 290 In [6]: def wait(t):
291 291 ....: import time
292 292 ....: tic = time.time()
293 293 ....: time.sleep(t)
294 294 ....: return time.time()-tic
295 295
296 296 # In non-blocking mode
297 297 In [7]: ar = dview.apply_async(wait, 2)
298 298
299 299 # Now block for the result
300 300 In [8]: ar.get()
301 301 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
302 302
303 303 # Again in non-blocking mode
304 304 In [9]: ar = dview.apply_async(wait, 10)
305 305
306 306 # Poll to see if the result is ready
307 307 In [10]: ar.ready()
308 308 Out[10]: False
309 309
310 310 # ask for the result, but wait a maximum of 1 second:
311 311 In [45]: ar.get(1)
312 312 ---------------------------------------------------------------------------
313 313 TimeoutError Traceback (most recent call last)
314 314 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
315 315 ----> 1 ar.get(1)
316 316
317 317 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
318 318 62 raise self._exception
319 319 63 else:
320 320 ---> 64 raise error.TimeoutError("Result not ready.")
321 321 65
322 322 66 def ready(self):
323 323
324 324 TimeoutError: Result not ready.
325 325
326 326 .. Note::
327 327
328 328 Note the import inside the function. This is a common model, to ensure
329 329 that the appropriate modules are imported where the task is run. You can
330 330 also manually import modules into the engine(s) namespace(s) via
331 331 :meth:`view.execute('import numpy')`.
332 332
333 333 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
334 334 are done. For this, there is a the method :meth:`wait`. This method takes a
335 335 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
336 336 and blocks until all of the associated results are ready:
337 337
338 338 .. sourcecode:: ipython
339 339
340 340 In [72]: dview.block=False
341 341
342 342 # A trivial list of AsyncResults objects
343 343 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
344 344
345 345 # Wait until all of them are done
346 346 In [74]: dview.wait(pr_list)
347 347
348 348 # Then, their results are ready using get() or the `.r` attribute
349 349 In [75]: pr_list[0].get()
350 350 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
351 351
352 352
353 353
354 354 The ``block`` and ``targets`` keyword arguments and attributes
355 355 --------------------------------------------------------------
356 356
357 357 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
358 358 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
359 359 blocking mode and which engines the command is applied to. The :class:`View` class also has
360 360 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
361 361 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
362 362
363 363 * If no keyword argument is provided, the instance attributes are used.
364 364 * The Keyword arguments, if provided overrides the instance attributes for
365 365 the duration of a single call.
366 366
367 367 The following examples demonstrate how to use the instance attributes:
368 368
369 369 .. sourcecode:: ipython
370 370
371 371 In [16]: dview.targets = [0,2]
372 372
373 373 In [17]: dview.block = False
374 374
375 375 In [18]: ar = dview.apply(lambda : 10)
376 376
377 377 In [19]: ar.get()
378 378 Out[19]: [10, 10]
379 379
380 380 In [20]: dview.targets = v.client.ids # all engines (4)
381 381
382 382 In [21]: dview.block = True
383 383
384 384 In [22]: dview.apply(lambda : 42)
385 385 Out[22]: [42, 42, 42, 42]
386 386
387 387 The :attr:`block` and :attr:`targets` instance attributes of the
388 388 :class:`.DirectView` also determine the behavior of the parallel magic commands.
389 389
390 390 .. seealso::
391 391
392 392 See the documentation of the :ref:`Parallel Magics <parallel_magics>`.
393 393
394 394
395 395 Moving Python objects around
396 396 ============================
397 397
398 398 In addition to calling functions and executing code on engines, you can
399 399 transfer Python objects to and from your IPython session and the engines. In
400 400 IPython, these operations are called :meth:`push` (sending an object to the
401 401 engines) and :meth:`pull` (getting an object from the engines).
402 402
403 403 Basic push and pull
404 404 -------------------
405 405
406 406 Here are some examples of how you use :meth:`push` and :meth:`pull`:
407 407
408 408 .. sourcecode:: ipython
409 409
410 410 In [38]: dview.push(dict(a=1.03234,b=3453))
411 411 Out[38]: [None,None,None,None]
412 412
413 413 In [39]: dview.pull('a')
414 414 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
415 415
416 416 In [40]: dview.pull('b', targets=0)
417 417 Out[40]: 3453
418 418
419 419 In [41]: dview.pull(('a','b'))
420 420 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
421 421
422 422 In [42]: dview.push(dict(c='speed'))
423 423 Out[42]: [None,None,None,None]
424 424
425 425 In non-blocking mode :meth:`push` and :meth:`pull` also return
426 426 :class:`AsyncResult` objects:
427 427
428 428 .. sourcecode:: ipython
429 429
430 430 In [48]: ar = dview.pull('a', block=False)
431 431
432 432 In [49]: ar.get()
433 433 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
434 434
435 435
436 436 Dictionary interface
437 437 --------------------
438 438
439 439 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
440 440 dictionary-style access by key and methods such as :meth:`get` and
441 441 :meth:`update` for convenience. This make the remote namespaces of the engines
442 442 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
443 443
444 444 .. sourcecode:: ipython
445 445
446 446 In [51]: dview['a']=['foo','bar']
447 447
448 448 In [52]: dview['a']
449 449 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
450 450
451 451 Scatter and gather
452 452 ------------------
453 453
454 454 Sometimes it is useful to partition a sequence and push the partitions to
455 455 different engines. In MPI language, this is know as scatter/gather and we
456 456 follow that terminology. However, it is important to remember that in
457 457 IPython's :class:`Client` class, :meth:`scatter` is from the
458 458 interactive IPython session to the engines and :meth:`gather` is from the
459 459 engines back to the interactive IPython session. For scatter/gather operations
460 460 between engines, MPI, pyzmq, or some other direct interconnect should be used.
461 461
462 462 .. sourcecode:: ipython
463 463
464 464 In [58]: dview.scatter('a',range(16))
465 465 Out[58]: [None,None,None,None]
466 466
467 467 In [59]: dview['a']
468 468 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
469 469
470 470 In [60]: dview.gather('a')
471 471 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
472 472
473 473 Other things to look at
474 474 =======================
475 475
476 476 How to do parallel list comprehensions
477 477 --------------------------------------
478 478
479 479 In many cases list comprehensions are nicer than using the map function. While
480 480 we don't have fully parallel list comprehensions, it is simple to get the
481 481 basic effect using :meth:`scatter` and :meth:`gather`:
482 482
483 483 .. sourcecode:: ipython
484 484
485 485 In [66]: dview.scatter('x',range(64))
486 486
487 487 In [67]: %px y = [i**10 for i in x]
488 488 Parallel execution on engines: [0, 1, 2, 3]
489 489
490 490 In [68]: y = dview.gather('y')
491 491
492 492 In [69]: print y
493 493 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
494 494
495 495 Remote imports
496 496 --------------
497 497
498 498 Sometimes you will want to import packages both in your interactive session
499 499 and on your remote engines. This can be done with the :class:`ContextManager`
500 500 created by a DirectView's :meth:`sync_imports` method:
501 501
502 502 .. sourcecode:: ipython
503 503
504 504 In [69]: with dview.sync_imports():
505 505 ....: import numpy
506 506 importing numpy on engine(s)
507 507
508 508 Any imports made inside the block will also be performed on the view's engines.
509 509 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
510 510 whether the local imports should also be performed. However, support for `local=False`
511 511 has not been implemented, so only packages that can be imported locally will work
512 512 this way.
513 513
514 514 You can also specify imports via the ``@require`` decorator. This is a decorator
515 515 designed for use in Dependencies, but can be used to handle remote imports as well.
516 516 Modules or module names passed to ``@require`` will be imported before the decorated
517 517 function is called. If they cannot be imported, the decorated function will never
518 518 execute and will fail with an UnmetDependencyError. Failures of single Engines will
519 519 be collected and raise a CompositeError, as demonstrated in the next section.
520 520
521 521 .. sourcecode:: ipython
522 522
523 523 In [69]: from IPython.parallel import require
524 524
525 In [70]: @require('re'):
525 In [70]: @require('re')
526 526 ....: def findall(pat, x):
527 527 ....: # re is guaranteed to be available
528 528 ....: return re.findall(pat, x)
529 529
530 530 # you can also pass modules themselves, that you already have locally:
531 In [71]: @require(time):
531 In [71]: @require(time)
532 532 ....: def wait(t):
533 533 ....: time.sleep(t)
534 534 ....: return t
535 535
536 536 .. note::
537 537
538 538 :func:`sync_imports` does not allow ``import foo as bar`` syntax,
539 539 because the assignment represented by the ``as bar`` part is not
540 540 available to the import hook.
541 541
542 542
543 543 .. _parallel_exceptions:
544 544
545 545 Parallel exceptions
546 546 -------------------
547 547
548 548 In the multiengine interface, parallel commands can raise Python exceptions,
549 549 just like serial commands. But it is a little subtle, because a single
550 550 parallel command can actually raise multiple exceptions (one for each engine
551 551 the command was run on). To express this idea, we have a
552 552 :exc:`CompositeError` exception class that will be raised in most cases. The
553 553 :exc:`CompositeError` class is a special type of exception that wraps one or
554 554 more other types of exceptions. Here is how it works:
555 555
556 556 .. sourcecode:: ipython
557 557
558 558 In [78]: dview.block = True
559 559
560 560 In [79]: dview.execute("1/0")
561 561 [0:execute]:
562 562 ---------------------------------------------------------------------------
563 563 ZeroDivisionError Traceback (most recent call last)
564 564 ----> 1 1/0
565 565 ZeroDivisionError: integer division or modulo by zero
566 566
567 567 [1:execute]:
568 568 ---------------------------------------------------------------------------
569 569 ZeroDivisionError Traceback (most recent call last)
570 570 ----> 1 1/0
571 571 ZeroDivisionError: integer division or modulo by zero
572 572
573 573 [2:execute]:
574 574 ---------------------------------------------------------------------------
575 575 ZeroDivisionError Traceback (most recent call last)
576 576 ----> 1 1/0
577 577 ZeroDivisionError: integer division or modulo by zero
578 578
579 579 [3:execute]:
580 580 ---------------------------------------------------------------------------
581 581 ZeroDivisionError Traceback (most recent call last)
582 582 ----> 1 1/0
583 583 ZeroDivisionError: integer division or modulo by zero
584 584
585 585 Notice how the error message printed when :exc:`CompositeError` is raised has
586 586 information about the individual exceptions that were raised on each engine.
587 587 If you want, you can even raise one of these original exceptions:
588 588
589 589 .. sourcecode:: ipython
590 590
591 591 In [80]: try:
592 592 ....: dview.execute('1/0', block=True)
593 593 ....: except parallel.error.CompositeError, e:
594 594 ....: e.raise_exception()
595 595 ....:
596 596 ....:
597 597 ---------------------------------------------------------------------------
598 598 ZeroDivisionError Traceback (most recent call last)
599 599 ----> 1 1/0
600 600 ZeroDivisionError: integer division or modulo by zero
601 601
602 602 If you are working in IPython, you can simple type ``%debug`` after one of
603 603 these :exc:`CompositeError` exceptions is raised, and inspect the exception
604 604 instance:
605 605
606 606 .. sourcecode:: ipython
607 607
608 608 In [81]: dview.execute('1/0')
609 609 [0:execute]:
610 610 ---------------------------------------------------------------------------
611 611 ZeroDivisionError Traceback (most recent call last)
612 612 ----> 1 1/0
613 613 ZeroDivisionError: integer division or modulo by zero
614 614
615 615 [1:execute]:
616 616 ---------------------------------------------------------------------------
617 617 ZeroDivisionError Traceback (most recent call last)
618 618 ----> 1 1/0
619 619 ZeroDivisionError: integer division or modulo by zero
620 620
621 621 [2:execute]:
622 622 ---------------------------------------------------------------------------
623 623 ZeroDivisionError Traceback (most recent call last)
624 624 ----> 1 1/0
625 625 ZeroDivisionError: integer division or modulo by zero
626 626
627 627 [3:execute]:
628 628 ---------------------------------------------------------------------------
629 629 ZeroDivisionError Traceback (most recent call last)
630 630 ----> 1 1/0
631 631 ZeroDivisionError: integer division or modulo by zero
632 632
633 633 In [82]: %debug
634 634 > /.../site-packages/IPython/parallel/client/asyncresult.py(125)get()
635 635 124 else:
636 636 --> 125 raise self._exception
637 637 126 else:
638 638
639 639 # Here, self._exception is the CompositeError instance:
640 640
641 641 ipdb> e = self._exception
642 642 ipdb> e
643 643 CompositeError(4)
644 644
645 645 # we can tab-complete on e to see available methods:
646 646 ipdb> e.<TAB>
647 647 e.args e.message e.traceback
648 648 e.elist e.msg
649 649 e.ename e.print_traceback
650 650 e.engine_info e.raise_exception
651 651 e.evalue e.render_traceback
652 652
653 653 # We can then display the individual tracebacks, if we want:
654 654 ipdb> e.print_traceback(1)
655 655 [1:execute]:
656 656 ---------------------------------------------------------------------------
657 657 ZeroDivisionError Traceback (most recent call last)
658 658 ----> 1 1/0
659 659 ZeroDivisionError: integer division or modulo by zero
660 660
661 661
662 662 Since you might have 100 engines, you probably don't want to see 100 tracebacks
663 663 for a simple NameError because of a typo.
664 664 For this reason, CompositeError truncates the list of exceptions it will print
665 665 to :attr:`CompositeError.tb_limit` (default is five).
666 666 You can change this limit to suit your needs with:
667 667
668 668 .. sourcecode:: ipython
669 669
670 670 In [20]: from IPython.parallel import CompositeError
671 671 In [21]: CompositeError.tb_limit = 1
672 672 In [22]: %px a=b
673 673 [0:execute]:
674 674 ---------------------------------------------------------------------------
675 675 NameError Traceback (most recent call last)
676 676 ----> 1 a=b
677 677 NameError: name 'b' is not defined
678 678
679 679 ... 3 more exceptions ...
680 680
681 681
682 682 All of this same error handling magic even works in non-blocking mode:
683 683
684 684 .. sourcecode:: ipython
685 685
686 686 In [83]: dview.block=False
687 687
688 688 In [84]: ar = dview.execute('1/0')
689 689
690 690 In [85]: ar.get()
691 691 [0:execute]:
692 692 ---------------------------------------------------------------------------
693 693 ZeroDivisionError Traceback (most recent call last)
694 694 ----> 1 1/0
695 695 ZeroDivisionError: integer division or modulo by zero
696 696
697 697 ... 3 more exceptions ...
@@ -1,472 +1,472 b''
1 1 .. _parallel_task:
2 2
3 3 ==========================
4 4 The IPython task interface
5 5 ==========================
6 6
7 7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 9 the task interface the user have no direct access to individual engines. By
10 10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 11 simpler and more powerful.
12 12
13 13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
17 17 the user to guide the distribution of jobs, without having to assign tasks to
18 18 engines explicitly.
19 19
20 20 Starting the IPython controller and engines
21 21 ===========================================
22 22
23 23 To follow along with this tutorial, you will need to start the IPython
24 24 controller and four IPython engines. The simplest way of doing this is to use
25 25 the :command:`ipcluster` command::
26 26
27 27 $ ipcluster start -n 4
28 28
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31 31
32 32 Creating a ``LoadBalancedView`` instance
33 33 ========================================
34 34
35 35 The first step is to import the IPython :mod:`IPython.parallel`
36 36 module and then create a :class:`.Client` instance, and we will also be using
37 37 a :class:`LoadBalancedView`, here called `lview`:
38 38
39 39 .. sourcecode:: ipython
40 40
41 41 In [1]: from IPython.parallel import Client
42 42
43 43 In [2]: rc = Client()
44 44
45 45
46 46 This form assumes that the controller was started on localhost with default
47 47 configuration. If not, the location of the controller must be given as an
48 48 argument to the constructor:
49 49
50 50 .. sourcecode:: ipython
51 51
52 52 # for a visible LAN controller listening on an external port:
53 53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 54 # or to connect with a specific profile you have set up:
55 55 In [3]: rc = Client(profile='mpi')
56 56
57 57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 58 be constructed via the client's :meth:`load_balanced_view` method:
59 59
60 60 .. sourcecode:: ipython
61 61
62 62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63 63
64 64 .. seealso::
65 65
66 66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67 67
68 68
69 69 Quick and easy parallelism
70 70 ==========================
71 71
72 72 In many cases, you simply want to apply a Python function to a sequence of
73 73 objects, but *in parallel*. Like the multiengine interface, these can be
74 74 implemented via the task interface. The exact same tools can perform these
75 75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 78 execution time per item varies significantly, you should use the versions in
79 79 the task interface.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85 85
86 86 .. sourcecode:: ipython
87 87
88 88 In [62]: lview.block = True
89 89
90 90 In [63]: serial_result = map(lambda x:x**10, range(32))
91 91
92 92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93 93
94 94 In [65]: serial_result==parallel_result
95 95 Out[65]: True
96 96
97 97 Parallel function decorator
98 98 ---------------------------
99 99
100 100 Parallel functions are just like normal function, but they can be called on
101 101 sequences and *in parallel*. The multiengine interface provides a decorator
102 102 that turns any Python function into a parallel function:
103 103
104 104 .. sourcecode:: ipython
105 105
106 106 In [10]: @lview.parallel()
107 107 ....: def f(x):
108 108 ....: return 10.0*x**4
109 109 ....:
110 110
111 111 In [11]: f.map(range(32)) # this is done in parallel
112 112 Out[11]: [0.0,10.0,160.0,...]
113 113
114 114 .. _parallel_dependencies:
115 115
116 116 Dependencies
117 117 ============
118 118
119 119 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
120 120 may want to associate some kind of `Dependency` that describes when, where, or whether
121 121 a task can be run. In IPython, we provide two types of dependencies:
122 122 `Functional Dependencies`_ and `Graph Dependencies`_
123 123
124 124 .. note::
125 125
126 126 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
127 127 and you will see errors or warnings if you try to use dependencies with the pure
128 128 scheduler.
129 129
130 130 Functional Dependencies
131 131 -----------------------
132 132
133 133 Functional dependencies are used to determine whether a given engine is capable of running
134 134 a particular task. This is implemented via a special :class:`Exception` class,
135 135 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
136 136 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
137 137 the error up to the client like any other error, catches the error, and submits the task
138 138 to a different engine. This will repeat indefinitely, and a task will never be submitted
139 139 to a given engine a second time.
140 140
141 141 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
142 142 some decorators for facilitating this behavior.
143 143
144 144 There are two decorators and a class used for functional dependencies:
145 145
146 146 .. sourcecode:: ipython
147 147
148 148 In [9]: from IPython.parallel import depend, require, dependent
149 149
150 150 @require
151 151 ********
152 152
153 153 The simplest sort of dependency is requiring that a Python module is available. The
154 154 ``@require`` decorator lets you define a function that will only run on engines where names
155 155 you specify are importable:
156 156
157 157 .. sourcecode:: ipython
158 158
159 159 In [10]: @require('numpy', 'zmq')
160 160 ....: def myfunc():
161 161 ....: return dostuff()
162 162
163 163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
164 164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
165 165
166 166 @depend
167 167 *******
168 168
169 169 The ``@depend`` decorator lets you decorate any function with any *other* function to
170 170 evaluate the dependency. The dependency function will be called at the start of the task,
171 171 and if it returns ``False``, then the dependency will be considered unmet, and the task
172 172 will be assigned to another engine. If the dependency returns *anything other than
173 173 ``False``*, the rest of the task will continue.
174 174
175 175 .. sourcecode:: ipython
176 176
177 177 In [10]: def platform_specific(plat):
178 178 ....: import sys
179 179 ....: return sys.platform == plat
180 180
181 181 In [11]: @depend(platform_specific, 'darwin')
182 182 ....: def mactask():
183 183 ....: do_mac_stuff()
184 184
185 185 In [12]: @depend(platform_specific, 'nt')
186 186 ....: def wintask():
187 187 ....: do_windows_stuff()
188 188
189 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
189 In this case, any time you apply ``mactask``, it will only run on an OSX machine.
190 190 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
191 191 signature.
192 192
193 193 dependents
194 194 **********
195 195
196 196 You don't have to use the decorators on your tasks, if for instance you may want
197 197 to run tasks with a single function but varying dependencies, you can directly construct
198 198 the :class:`dependent` object that the decorators use:
199 199
200 200 .. sourcecode::ipython
201 201
202 202 In [13]: def mytask(*args):
203 203 ....: dostuff()
204 204
205 205 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
206 206 # this is the same as decorating the declaration of mytask with @depend
207 207 # but you can do it again:
208 208
209 209 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
210 210
211 211 # in general:
212 212 In [16]: t = dependent(f, g, *dargs, **dkwargs)
213 213
214 214 # is equivalent to:
215 215 In [17]: @depend(g, *dargs, **dkwargs)
216 216 ....: def t(a,b,c):
217 217 ....: # contents of f
218 218
219 219 Graph Dependencies
220 220 ------------------
221 221
222 222 Sometimes you want to restrict the time and/or location to run a given task as a function
223 223 of the time and/or location of other tasks. This is implemented via a subclass of
224 224 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
225 225 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
226 226 has been met.
227 227
228 228 The switches we provide for interpreting whether a given dependency set has been met:
229 229
230 230 any|all
231 231 Whether the dependency is considered met if *any* of the dependencies are done, or
232 232 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
233 233 boolean attribute, which defaults to ``True``.
234 234
235 235 success [default: True]
236 236 Whether to consider tasks that succeeded as fulfilling dependencies.
237 237
238 238 failure [default : False]
239 239 Whether to consider tasks that failed as fulfilling dependencies.
240 240 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
241 241 only when tasks have failed.
242 242
243 243 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
244 244 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
245 245 not care whether the task succeeds, and always want the second task to run, in which case you
246 246 should use `success=failure=True`. The default behavior is to only use successes.
247 247
248 248 There are other switches for interpretation that are made at the *task* level. These are
249 249 specified via keyword arguments to the client's :meth:`apply` method.
250 250
251 251 after,follow
252 252 You may want to run a task *after* a given set of dependencies have been run and/or
253 253 run it *where* another set of dependencies are met. To support this, every task has an
254 254 `after` dependency to restrict time, and a `follow` dependency to restrict
255 255 destination.
256 256
257 257 timeout
258 258 You may also want to set a time-limit for how long the scheduler should wait before a
259 259 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
260 260 indicates that the task should never timeout. If the timeout is reached, and the
261 261 scheduler still hasn't been able to assign the task to an engine, the task will fail
262 262 with a :class:`DependencyTimeout`.
263 263
264 264 .. note::
265 265
266 266 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
267 267 task to run after a job submitted via the MUX interface.
268 268
269 269 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
270 270 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
271 271 `follow` and `after` keywords to :meth:`client.apply`:
272 272
273 273 .. sourcecode:: ipython
274 274
275 275 In [14]: client.block=False
276 276
277 277 In [15]: ar = lview.apply(f, args, kwargs)
278 278
279 279 In [16]: ar2 = lview.apply(f2)
280 280
281 281 In [17]: with lview.temp_flags(after=[ar,ar2]):
282 282 ....: ar3 = lview.apply(f3)
283 283
284 284 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
285 285 ....: ar4 = lview.apply(f3)
286 286
287 287 .. seealso::
288 288
289 289 Some parallel workloads can be described as a `Directed Acyclic Graph
290 290 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
291 291 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
292 292 onto task dependencies.
293 293
294 294
295 295 Impossible Dependencies
296 296 ***********************
297 297
298 298 The schedulers do perform some analysis on graph dependencies to determine whether they
299 299 are not possible to be met. If the scheduler does discover that a dependency cannot be
300 300 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
301 301 scheduler realized that a task can never be run, it won't sit indefinitely in the
302 302 scheduler clogging the pipeline.
303 303
304 304 The basic cases that are checked:
305 305
306 306 * depending on nonexistent messages
307 307 * `follow` dependencies were run on more than one machine and `all=True`
308 308 * any dependencies failed and `all=True,success=True,failures=False`
309 309 * all dependencies failed and `all=False,success=True,failure=False`
310 310
311 311 .. warning::
312 312
313 313 This analysis has not been proven to be rigorous, so it is likely possible for tasks
314 314 to become impossible to run in obscure situations, so a timeout may be a good choice.
315 315
316 316
317 317 Retries and Resubmit
318 318 ====================
319 319
320 320 Retries
321 321 -------
322 322
323 323 Another flag for tasks is `retries`. This is an integer, specifying how many times
324 324 a task should be resubmitted after failure. This is useful for tasks that should still run
325 325 if their engine was shutdown, or may have some statistical chance of failing. The default
326 326 is to not retry tasks.
327 327
328 328 Resubmit
329 329 --------
330 330
331 331 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
332 332 you have fixed the error, or because you want to restore the cluster to an interrupted state.
333 333 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
334 334 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
335 335 a task that is pending - only those that have finished, either successful or unsuccessful.
336 336
337 337 .. _parallel_schedulers:
338 338
339 339 Schedulers
340 340 ==========
341 341
342 342 There are a variety of valid ways to determine where jobs should be assigned in a
343 343 load-balancing situation. In IPython, we support several standard schemes, and
344 344 even make it easy to define your own. The scheme can be selected via the ``scheme``
345 345 argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute
346 346 of a controller config object.
347 347
348 348 The built-in routing schemes:
349 349
350 350 To select one of these schemes, simply do::
351 351
352 352 $ ipcontroller --scheme=<schemename>
353 353 for instance:
354 354 $ ipcontroller --scheme=lru
355 355
356 356 lru: Least Recently Used
357 357
358 358 Always assign work to the least-recently-used engine. A close relative of
359 359 round-robin, it will be fair with respect to the number of tasks, agnostic
360 360 with respect to runtime of each task.
361 361
362 362 plainrandom: Plain Random
363 363
364 364 Randomly picks an engine on which to run.
365 365
366 366 twobin: Two-Bin Random
367 367
368 368 **Requires numpy**
369 369
370 370 Pick two engines at random, and use the LRU of the two. This is known to be better
371 371 than plain random in many cases, but requires a small amount of computation.
372 372
373 373 leastload: Least Load
374 374
375 375 **This is the default scheme**
376 376
377 377 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
378 378
379 379 weighted: Weighted Two-Bin Random
380 380
381 381 **Requires numpy**
382 382
383 383 Pick two engines at random using the number of outstanding tasks as inverse weights,
384 384 and use the one with the lower load.
385 385
386 386 Greedy Assignment
387 387 -----------------
388 388
389 389 Tasks can be assigned greedily as they are submitted. If their dependencies are
390 390 met, they will be assigned to an engine right away, and multiple tasks can be
391 391 assigned to an engine at a given time. This limit is set with the
392 392 ``TaskScheduler.hwm`` (high water mark) configurable in your
393 393 :file:`ipcontroller_config.py` config file, with:
394 394
395 395 .. sourcecode:: python
396 396
397 397 # the most common choices are:
398 398 c.TaskSheduler.hwm = 0 # (minimal latency, default in IPython < 0.13)
399 399 # or
400 400 c.TaskScheduler.hwm = 1 # (most-informed balancing, default in β‰₯ 0.13)
401 401
402 402 In IPython < 0.13, the default is 0, or no-limit. That is, there is no limit to the number of
403 403 tasks that can be outstanding on a given engine. This greatly benefits the
404 404 latency of execution, because network traffic can be hidden behind computation.
405 405 However, this means that workload is assigned without knowledge of how long
406 406 each task might take, and can result in poor load-balancing, particularly for
407 407 submitting a collection of heterogeneous tasks all at once. You can limit this
408 408 effect by setting hwm to a positive integer, 1 being maximum load-balancing (a
409 409 task will never be waiting if there is an idle engine), and any larger number
410 410 being a compromise between load-balancing and latency-hiding.
411 411
412 412 In practice, some users have been confused by having this optimization on by
413 413 default, so the default value has been changed to 1 in IPython 0.13. This can be slower,
414 414 but has more obvious behavior and won't result in assigning too many tasks to
415 415 some engines in heterogeneous cases.
416 416
417 417
418 418 Pure ZMQ Scheduler
419 419 ------------------
420 420
421 421 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
422 422 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
423 423 load-balancing. This scheduler does not support any of the advanced features of the Python
424 424 :class:`.Scheduler`.
425 425
426 426 Disabled features when using the ZMQ Scheduler:
427 427
428 428 * Engine unregistration
429 429 Task farming will be disabled if an engine unregisters.
430 430 Further, if an engine is unregistered during computation, the scheduler may not recover.
431 431 * Dependencies
432 432 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
433 433 based on message content.
434 434 * Early destination notification
435 435 The Python schedulers know which engine gets which task, and notify the Hub. This
436 436 allows graceful handling of Engines coming and going. There is no way to know
437 437 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
438 438 engine until they *finish*. This makes recovery from engine shutdown very difficult.
439 439
440 440
441 441 .. note::
442 442
443 443 TODO: performance comparisons
444 444
445 445
446 446
447 447
448 448 More details
449 449 ============
450 450
451 451 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
452 452 of flexibility in how tasks are defined and run. The next places to look are
453 453 in the following classes:
454 454
455 455 * :class:`~IPython.parallel.client.view.LoadBalancedView`
456 456 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
457 457 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
458 458 * :mod:`~IPython.parallel.controller.dependency`
459 459
460 460 The following is an overview of how to use these classes together:
461 461
462 462 1. Create a :class:`Client` and :class:`LoadBalancedView`
463 463 2. Define some functions to be run as tasks
464 464 3. Submit your tasks to using the :meth:`apply` method of your
465 465 :class:`LoadBalancedView` instance.
466 466 4. Use :meth:`.Client.get_result` to get the results of the
467 467 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
468 468 for and then receive the results.
469 469
470 470 .. seealso::
471 471
472 472 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now