##// END OF EJS Templates
document new default hwm value
MinRK -
Show More
@@ -1,487 +1,492 b''
1 1 .. _parallel_task:
2 2
3 3 ==========================
4 4 The IPython task interface
5 5 ==========================
6 6
7 7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 9 the task interface the user have no direct access to individual engines. By
10 10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 11 simpler and more powerful.
12 12
13 13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
17 17 the user to guide the distribution of jobs, without having to assign tasks to
18 18 engines explicitly.
19 19
20 20 Starting the IPython controller and engines
21 21 ===========================================
22 22
23 23 To follow along with this tutorial, you will need to start the IPython
24 24 controller and four IPython engines. The simplest way of doing this is to use
25 25 the :command:`ipcluster` command::
26 26
27 27 $ ipcluster start -n 4
28 28
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31 31
32 32 Creating a ``LoadBalancedView`` instance
33 33 ========================================
34 34
35 35 The first step is to import the IPython :mod:`IPython.parallel`
36 36 module and then create a :class:`.Client` instance, and we will also be using
37 37 a :class:`LoadBalancedView`, here called `lview`:
38 38
39 39 .. sourcecode:: ipython
40 40
41 41 In [1]: from IPython.parallel import Client
42 42
43 43 In [2]: rc = Client()
44 44
45 45
46 46 This form assumes that the controller was started on localhost with default
47 47 configuration. If not, the location of the controller must be given as an
48 48 argument to the constructor:
49 49
50 50 .. sourcecode:: ipython
51 51
52 52 # for a visible LAN controller listening on an external port:
53 53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 54 # or to connect with a specific profile you have set up:
55 55 In [3]: rc = Client(profile='mpi')
56 56
57 57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 58 be constructed via the client's :meth:`load_balanced_view` method:
59 59
60 60 .. sourcecode:: ipython
61 61
62 62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63 63
64 64 .. seealso::
65 65
66 66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67 67
68 68
69 69 Quick and easy parallelism
70 70 ==========================
71 71
72 72 In many cases, you simply want to apply a Python function to a sequence of
73 73 objects, but *in parallel*. Like the multiengine interface, these can be
74 74 implemented via the task interface. The exact same tools can perform these
75 75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 78 execution time per item varies significantly, you should use the versions in
79 79 the task interface.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85 85
86 86 .. sourcecode:: ipython
87 87
88 88 In [62]: lview.block = True
89 89
90 90 In [63]: serial_result = map(lambda x:x**10, range(32))
91 91
92 92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93 93
94 94 In [65]: serial_result==parallel_result
95 95 Out[65]: True
96 96
97 97 Parallel function decorator
98 98 ---------------------------
99 99
100 100 Parallel functions are just like normal function, but they can be called on
101 101 sequences and *in parallel*. The multiengine interface provides a decorator
102 102 that turns any Python function into a parallel function:
103 103
104 104 .. sourcecode:: ipython
105 105
106 106 In [10]: @lview.parallel()
107 107 ....: def f(x):
108 108 ....: return 10.0*x**4
109 109 ....:
110 110
111 111 In [11]: f.map(range(32)) # this is done in parallel
112 112 Out[11]: [0.0,10.0,160.0,...]
113 113
114 114 .. _parallel_taskmap:
115 115
116 116 Map results are iterable!
117 117 -------------------------
118 118
119 119 When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult`
120 120 object), you can actually iterate through them, and act on the results as they arrive:
121 121
122 122 .. literalinclude:: ../../examples/parallel/itermapresult.py
123 123 :language: python
124 124 :lines: 9-34
125 125
126 126 .. seealso::
127 127
128 128 When AsyncResult or the AsyncMapResult don't provide what you need (for instance,
129 129 handling individual results as they arrive, but with metadata), you can always
130 130 just split the original result's ``msg_ids`` attribute, and handle them as you like.
131 131
132 132 For an example of this, see :file:`docs/examples/parallel/customresult.py`
133 133
134 134
135 135 .. _parallel_dependencies:
136 136
137 137 Dependencies
138 138 ============
139 139
140 140 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
141 141 may want to associate some kind of `Dependency` that describes when, where, or whether
142 142 a task can be run. In IPython, we provide two types of dependencies:
143 143 `Functional Dependencies`_ and `Graph Dependencies`_
144 144
145 145 .. note::
146 146
147 147 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
148 148 and you will see errors or warnings if you try to use dependencies with the pure
149 149 scheduler.
150 150
151 151 Functional Dependencies
152 152 -----------------------
153 153
154 154 Functional dependencies are used to determine whether a given engine is capable of running
155 155 a particular task. This is implemented via a special :class:`Exception` class,
156 156 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
157 157 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
158 158 the error up to the client like any other error, catches the error, and submits the task
159 159 to a different engine. This will repeat indefinitely, and a task will never be submitted
160 160 to a given engine a second time.
161 161
162 162 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
163 163 some decorators for facilitating this behavior.
164 164
165 165 There are two decorators and a class used for functional dependencies:
166 166
167 167 .. sourcecode:: ipython
168 168
169 169 In [9]: from IPython.parallel import depend, require, dependent
170 170
171 171 @require
172 172 ********
173 173
174 174 The simplest sort of dependency is requiring that a Python module is available. The
175 175 ``@require`` decorator lets you define a function that will only run on engines where names
176 176 you specify are importable:
177 177
178 178 .. sourcecode:: ipython
179 179
180 180 In [10]: @require('numpy', 'zmq')
181 181 ....: def myfunc():
182 182 ....: return dostuff()
183 183
184 184 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
185 185 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
186 186
187 187 @depend
188 188 *******
189 189
190 190 The ``@depend`` decorator lets you decorate any function with any *other* function to
191 191 evaluate the dependency. The dependency function will be called at the start of the task,
192 192 and if it returns ``False``, then the dependency will be considered unmet, and the task
193 193 will be assigned to another engine. If the dependency returns *anything other than
194 194 ``False``*, the rest of the task will continue.
195 195
196 196 .. sourcecode:: ipython
197 197
198 198 In [10]: def platform_specific(plat):
199 199 ....: import sys
200 200 ....: return sys.platform == plat
201 201
202 202 In [11]: @depend(platform_specific, 'darwin')
203 203 ....: def mactask():
204 204 ....: do_mac_stuff()
205 205
206 206 In [12]: @depend(platform_specific, 'nt')
207 207 ....: def wintask():
208 208 ....: do_windows_stuff()
209 209
210 210 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
211 211 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
212 212 signature.
213 213
214 214 dependents
215 215 **********
216 216
217 217 You don't have to use the decorators on your tasks, if for instance you may want
218 218 to run tasks with a single function but varying dependencies, you can directly construct
219 219 the :class:`dependent` object that the decorators use:
220 220
221 221 .. sourcecode::ipython
222 222
223 223 In [13]: def mytask(*args):
224 224 ....: dostuff()
225 225
226 226 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
227 227 # this is the same as decorating the declaration of mytask with @depend
228 228 # but you can do it again:
229 229
230 230 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
231 231
232 232 # in general:
233 233 In [16]: t = dependent(f, g, *dargs, **dkwargs)
234 234
235 235 # is equivalent to:
236 236 In [17]: @depend(g, *dargs, **dkwargs)
237 237 ....: def t(a,b,c):
238 238 ....: # contents of f
239 239
240 240 Graph Dependencies
241 241 ------------------
242 242
243 243 Sometimes you want to restrict the time and/or location to run a given task as a function
244 244 of the time and/or location of other tasks. This is implemented via a subclass of
245 245 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
246 246 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
247 247 has been met.
248 248
249 249 The switches we provide for interpreting whether a given dependency set has been met:
250 250
251 251 any|all
252 252 Whether the dependency is considered met if *any* of the dependencies are done, or
253 253 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
254 254 boolean attribute, which defaults to ``True``.
255 255
256 256 success [default: True]
257 257 Whether to consider tasks that succeeded as fulfilling dependencies.
258 258
259 259 failure [default : False]
260 260 Whether to consider tasks that failed as fulfilling dependencies.
261 261 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
262 262 only when tasks have failed.
263 263
264 264 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
265 265 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
266 266 not care whether the task succeeds, and always want the second task to run, in which case you
267 267 should use `success=failure=True`. The default behavior is to only use successes.
268 268
269 269 There are other switches for interpretation that are made at the *task* level. These are
270 270 specified via keyword arguments to the client's :meth:`apply` method.
271 271
272 272 after,follow
273 273 You may want to run a task *after* a given set of dependencies have been run and/or
274 274 run it *where* another set of dependencies are met. To support this, every task has an
275 275 `after` dependency to restrict time, and a `follow` dependency to restrict
276 276 destination.
277 277
278 278 timeout
279 279 You may also want to set a time-limit for how long the scheduler should wait before a
280 280 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
281 281 indicates that the task should never timeout. If the timeout is reached, and the
282 282 scheduler still hasn't been able to assign the task to an engine, the task will fail
283 283 with a :class:`DependencyTimeout`.
284 284
285 285 .. note::
286 286
287 287 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
288 288 task to run after a job submitted via the MUX interface.
289 289
290 290 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
291 291 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
292 292 `follow` and `after` keywords to :meth:`client.apply`:
293 293
294 294 .. sourcecode:: ipython
295 295
296 296 In [14]: client.block=False
297 297
298 298 In [15]: ar = lview.apply(f, args, kwargs)
299 299
300 300 In [16]: ar2 = lview.apply(f2)
301 301
302 302 In [17]: with lview.temp_flags(after=[ar,ar2]):
303 303 ....: ar3 = lview.apply(f3)
304 304
305 305 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
306 306 ....: ar4 = lview.apply(f3)
307 307
308 308 .. seealso::
309 309
310 310 Some parallel workloads can be described as a `Directed Acyclic Graph
311 311 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
312 312 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
313 313 onto task dependencies.
314 314
315 315
316 316 Impossible Dependencies
317 317 ***********************
318 318
319 319 The schedulers do perform some analysis on graph dependencies to determine whether they
320 320 are not possible to be met. If the scheduler does discover that a dependency cannot be
321 321 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
322 322 scheduler realized that a task can never be run, it won't sit indefinitely in the
323 323 scheduler clogging the pipeline.
324 324
325 325 The basic cases that are checked:
326 326
327 327 * depending on nonexistent messages
328 328 * `follow` dependencies were run on more than one machine and `all=True`
329 329 * any dependencies failed and `all=True,success=True,failures=False`
330 330 * all dependencies failed and `all=False,success=True,failure=False`
331 331
332 332 .. warning::
333 333
334 334 This analysis has not been proven to be rigorous, so it is likely possible for tasks
335 335 to become impossible to run in obscure situations, so a timeout may be a good choice.
336 336
337 337
338 338 Retries and Resubmit
339 339 ====================
340 340
341 341 Retries
342 342 -------
343 343
344 344 Another flag for tasks is `retries`. This is an integer, specifying how many times
345 345 a task should be resubmitted after failure. This is useful for tasks that should still run
346 346 if their engine was shutdown, or may have some statistical chance of failing. The default
347 347 is to not retry tasks.
348 348
349 349 Resubmit
350 350 --------
351 351
352 352 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
353 353 you have fixed the error, or because you want to restore the cluster to an interrupted state.
354 354 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
355 355 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
356 356 a task that is pending - only those that have finished, either successful or unsuccessful.
357 357
358 358 .. _parallel_schedulers:
359 359
360 360 Schedulers
361 361 ==========
362 362
363 363 There are a variety of valid ways to determine where jobs should be assigned in a
364 364 load-balancing situation. In IPython, we support several standard schemes, and
365 365 even make it easy to define your own. The scheme can be selected via the ``scheme``
366 366 argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute
367 367 of a controller config object.
368 368
369 369 The built-in routing schemes:
370 370
371 371 To select one of these schemes, simply do::
372 372
373 373 $ ipcontroller --scheme=<schemename>
374 374 for instance:
375 375 $ ipcontroller --scheme=lru
376 376
377 377 lru: Least Recently Used
378 378
379 379 Always assign work to the least-recently-used engine. A close relative of
380 380 round-robin, it will be fair with respect to the number of tasks, agnostic
381 381 with respect to runtime of each task.
382 382
383 383 plainrandom: Plain Random
384 384
385 385 Randomly picks an engine on which to run.
386 386
387 387 twobin: Two-Bin Random
388 388
389 389 **Requires numpy**
390 390
391 391 Pick two engines at random, and use the LRU of the two. This is known to be better
392 392 than plain random in many cases, but requires a small amount of computation.
393 393
394 394 leastload: Least Load
395 395
396 396 **This is the default scheme**
397 397
398 398 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
399 399
400 400 weighted: Weighted Two-Bin Random
401 401
402 402 **Requires numpy**
403 403
404 404 Pick two engines at random using the number of outstanding tasks as inverse weights,
405 405 and use the one with the lower load.
406 406
407 407 Greedy Assignment
408 408 -----------------
409 409
410 410 Tasks are assigned greedily as they are submitted. If their dependencies are
411 411 met, they will be assigned to an engine right away, and multiple tasks can be
412 412 assigned to an engine at a given time. This limit is set with the
413 413 ``TaskScheduler.hwm`` (high water mark) configurable:
414 414
415 415 .. sourcecode:: python
416 416
417 417 # the most common choices are:
418 c.TaskSheduler.hwm = 0 # (minimal latency, default)
418 c.TaskSheduler.hwm = 0 # (minimal latency, default in IPython ≀ 0.12)
419 419 # or
420 c.TaskScheduler.hwm = 1 # (most-informed balancing)
420 c.TaskScheduler.hwm = 1 # (most-informed balancing, default in > 0.12)
421 421
422 The default is 0, or no-limit. That is, there is no limit to the number of
422 In IPython ≀ 0.12,the default is 0, or no-limit. That is, there is no limit to the number of
423 423 tasks that can be outstanding on a given engine. This greatly benefits the
424 424 latency of execution, because network traffic can be hidden behind computation.
425 425 However, this means that workload is assigned without knowledge of how long
426 426 each task might take, and can result in poor load-balancing, particularly for
427 427 submitting a collection of heterogeneous tasks all at once. You can limit this
428 428 effect by setting hwm to a positive integer, 1 being maximum load-balancing (a
429 429 task will never be waiting if there is an idle engine), and any larger number
430 430 being a compromise between load-balance and latency-hiding.
431 431
432 In practice, some users have been confused by having this optimization on by
433 default, and the default value has been changed to 1. This can be slower,
434 but has more obvious behavior and won't result in assigning too many tasks to
435 some engines in heterogeneous cases.
436
432 437
433 438 Pure ZMQ Scheduler
434 439 ------------------
435 440
436 441 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
437 442 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
438 443 load-balancing. This scheduler does not support any of the advanced features of the Python
439 444 :class:`.Scheduler`.
440 445
441 446 Disabled features when using the ZMQ Scheduler:
442 447
443 448 * Engine unregistration
444 449 Task farming will be disabled if an engine unregisters.
445 450 Further, if an engine is unregistered during computation, the scheduler may not recover.
446 451 * Dependencies
447 452 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
448 453 based on message content.
449 454 * Early destination notification
450 455 The Python schedulers know which engine gets which task, and notify the Hub. This
451 456 allows graceful handling of Engines coming and going. There is no way to know
452 457 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
453 458 engine until they *finish*. This makes recovery from engine shutdown very difficult.
454 459
455 460
456 461 .. note::
457 462
458 463 TODO: performance comparisons
459 464
460 465
461 466
462 467
463 468 More details
464 469 ============
465 470
466 471 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
467 472 of flexibility in how tasks are defined and run. The next places to look are
468 473 in the following classes:
469 474
470 475 * :class:`~IPython.parallel.client.view.LoadBalancedView`
471 476 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
472 477 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
473 478 * :mod:`~IPython.parallel.controller.dependency`
474 479
475 480 The following is an overview of how to use these classes together:
476 481
477 482 1. Create a :class:`Client` and :class:`LoadBalancedView`
478 483 2. Define some functions to be run as tasks
479 484 3. Submit your tasks to using the :meth:`apply` method of your
480 485 :class:`LoadBalancedView` instance.
481 486 4. Use :meth:`.Client.get_result` to get the results of the
482 487 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
483 488 for and then receive the results.
484 489
485 490 .. seealso::
486 491
487 492 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now