##// END OF EJS Templates
clarify TaskScheduler.hwm doc...
MinRK -
Show More
@@ -1,471 +1,472 b''
1 1 .. _parallel_task:
2 2
3 3 ==========================
4 4 The IPython task interface
5 5 ==========================
6 6
7 7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 9 the task interface the user have no direct access to individual engines. By
10 10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 11 simpler and more powerful.
12 12
13 13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
17 17 the user to guide the distribution of jobs, without having to assign tasks to
18 18 engines explicitly.
19 19
20 20 Starting the IPython controller and engines
21 21 ===========================================
22 22
23 23 To follow along with this tutorial, you will need to start the IPython
24 24 controller and four IPython engines. The simplest way of doing this is to use
25 25 the :command:`ipcluster` command::
26 26
27 27 $ ipcluster start -n 4
28 28
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31 31
32 32 Creating a ``LoadBalancedView`` instance
33 33 ========================================
34 34
35 35 The first step is to import the IPython :mod:`IPython.parallel`
36 36 module and then create a :class:`.Client` instance, and we will also be using
37 37 a :class:`LoadBalancedView`, here called `lview`:
38 38
39 39 .. sourcecode:: ipython
40 40
41 41 In [1]: from IPython.parallel import Client
42 42
43 43 In [2]: rc = Client()
44 44
45 45
46 46 This form assumes that the controller was started on localhost with default
47 47 configuration. If not, the location of the controller must be given as an
48 48 argument to the constructor:
49 49
50 50 .. sourcecode:: ipython
51 51
52 52 # for a visible LAN controller listening on an external port:
53 53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 54 # or to connect with a specific profile you have set up:
55 55 In [3]: rc = Client(profile='mpi')
56 56
57 57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 58 be constructed via the client's :meth:`load_balanced_view` method:
59 59
60 60 .. sourcecode:: ipython
61 61
62 62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63 63
64 64 .. seealso::
65 65
66 66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67 67
68 68
69 69 Quick and easy parallelism
70 70 ==========================
71 71
72 72 In many cases, you simply want to apply a Python function to a sequence of
73 73 objects, but *in parallel*. Like the multiengine interface, these can be
74 74 implemented via the task interface. The exact same tools can perform these
75 75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 78 execution time per item varies significantly, you should use the versions in
79 79 the task interface.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85 85
86 86 .. sourcecode:: ipython
87 87
88 88 In [62]: lview.block = True
89 89
90 90 In [63]: serial_result = map(lambda x:x**10, range(32))
91 91
92 92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93 93
94 94 In [65]: serial_result==parallel_result
95 95 Out[65]: True
96 96
97 97 Parallel function decorator
98 98 ---------------------------
99 99
100 100 Parallel functions are just like normal function, but they can be called on
101 101 sequences and *in parallel*. The multiengine interface provides a decorator
102 102 that turns any Python function into a parallel function:
103 103
104 104 .. sourcecode:: ipython
105 105
106 106 In [10]: @lview.parallel()
107 107 ....: def f(x):
108 108 ....: return 10.0*x**4
109 109 ....:
110 110
111 111 In [11]: f.map(range(32)) # this is done in parallel
112 112 Out[11]: [0.0,10.0,160.0,...]
113 113
114 114 .. _parallel_dependencies:
115 115
116 116 Dependencies
117 117 ============
118 118
119 119 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
120 120 may want to associate some kind of `Dependency` that describes when, where, or whether
121 121 a task can be run. In IPython, we provide two types of dependencies:
122 122 `Functional Dependencies`_ and `Graph Dependencies`_
123 123
124 124 .. note::
125 125
126 126 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
127 127 and you will see errors or warnings if you try to use dependencies with the pure
128 128 scheduler.
129 129
130 130 Functional Dependencies
131 131 -----------------------
132 132
133 133 Functional dependencies are used to determine whether a given engine is capable of running
134 134 a particular task. This is implemented via a special :class:`Exception` class,
135 135 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
136 136 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
137 137 the error up to the client like any other error, catches the error, and submits the task
138 138 to a different engine. This will repeat indefinitely, and a task will never be submitted
139 139 to a given engine a second time.
140 140
141 141 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
142 142 some decorators for facilitating this behavior.
143 143
144 144 There are two decorators and a class used for functional dependencies:
145 145
146 146 .. sourcecode:: ipython
147 147
148 148 In [9]: from IPython.parallel import depend, require, dependent
149 149
150 150 @require
151 151 ********
152 152
153 153 The simplest sort of dependency is requiring that a Python module is available. The
154 154 ``@require`` decorator lets you define a function that will only run on engines where names
155 155 you specify are importable:
156 156
157 157 .. sourcecode:: ipython
158 158
159 159 In [10]: @require('numpy', 'zmq')
160 160 ....: def myfunc():
161 161 ....: return dostuff()
162 162
163 163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
164 164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
165 165
166 166 @depend
167 167 *******
168 168
169 169 The ``@depend`` decorator lets you decorate any function with any *other* function to
170 170 evaluate the dependency. The dependency function will be called at the start of the task,
171 171 and if it returns ``False``, then the dependency will be considered unmet, and the task
172 172 will be assigned to another engine. If the dependency returns *anything other than
173 173 ``False``*, the rest of the task will continue.
174 174
175 175 .. sourcecode:: ipython
176 176
177 177 In [10]: def platform_specific(plat):
178 178 ....: import sys
179 179 ....: return sys.platform == plat
180 180
181 181 In [11]: @depend(platform_specific, 'darwin')
182 182 ....: def mactask():
183 183 ....: do_mac_stuff()
184 184
185 185 In [12]: @depend(platform_specific, 'nt')
186 186 ....: def wintask():
187 187 ....: do_windows_stuff()
188 188
189 189 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
190 190 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
191 191 signature.
192 192
193 193 dependents
194 194 **********
195 195
196 196 You don't have to use the decorators on your tasks, if for instance you may want
197 197 to run tasks with a single function but varying dependencies, you can directly construct
198 198 the :class:`dependent` object that the decorators use:
199 199
200 200 .. sourcecode::ipython
201 201
202 202 In [13]: def mytask(*args):
203 203 ....: dostuff()
204 204
205 205 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
206 206 # this is the same as decorating the declaration of mytask with @depend
207 207 # but you can do it again:
208 208
209 209 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
210 210
211 211 # in general:
212 212 In [16]: t = dependent(f, g, *dargs, **dkwargs)
213 213
214 214 # is equivalent to:
215 215 In [17]: @depend(g, *dargs, **dkwargs)
216 216 ....: def t(a,b,c):
217 217 ....: # contents of f
218 218
219 219 Graph Dependencies
220 220 ------------------
221 221
222 222 Sometimes you want to restrict the time and/or location to run a given task as a function
223 223 of the time and/or location of other tasks. This is implemented via a subclass of
224 224 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
225 225 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
226 226 has been met.
227 227
228 228 The switches we provide for interpreting whether a given dependency set has been met:
229 229
230 230 any|all
231 231 Whether the dependency is considered met if *any* of the dependencies are done, or
232 232 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
233 233 boolean attribute, which defaults to ``True``.
234 234
235 235 success [default: True]
236 236 Whether to consider tasks that succeeded as fulfilling dependencies.
237 237
238 238 failure [default : False]
239 239 Whether to consider tasks that failed as fulfilling dependencies.
240 240 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
241 241 only when tasks have failed.
242 242
243 243 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
244 244 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
245 245 not care whether the task succeeds, and always want the second task to run, in which case you
246 246 should use `success=failure=True`. The default behavior is to only use successes.
247 247
248 248 There are other switches for interpretation that are made at the *task* level. These are
249 249 specified via keyword arguments to the client's :meth:`apply` method.
250 250
251 251 after,follow
252 252 You may want to run a task *after* a given set of dependencies have been run and/or
253 253 run it *where* another set of dependencies are met. To support this, every task has an
254 254 `after` dependency to restrict time, and a `follow` dependency to restrict
255 255 destination.
256 256
257 257 timeout
258 258 You may also want to set a time-limit for how long the scheduler should wait before a
259 259 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
260 260 indicates that the task should never timeout. If the timeout is reached, and the
261 261 scheduler still hasn't been able to assign the task to an engine, the task will fail
262 262 with a :class:`DependencyTimeout`.
263 263
264 264 .. note::
265 265
266 266 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
267 267 task to run after a job submitted via the MUX interface.
268 268
269 269 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
270 270 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
271 271 `follow` and `after` keywords to :meth:`client.apply`:
272 272
273 273 .. sourcecode:: ipython
274 274
275 275 In [14]: client.block=False
276 276
277 277 In [15]: ar = lview.apply(f, args, kwargs)
278 278
279 279 In [16]: ar2 = lview.apply(f2)
280 280
281 281 In [17]: with lview.temp_flags(after=[ar,ar2]):
282 282 ....: ar3 = lview.apply(f3)
283 283
284 284 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
285 285 ....: ar4 = lview.apply(f3)
286 286
287 287 .. seealso::
288 288
289 289 Some parallel workloads can be described as a `Directed Acyclic Graph
290 290 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
291 291 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
292 292 onto task dependencies.
293 293
294 294
295 295 Impossible Dependencies
296 296 ***********************
297 297
298 298 The schedulers do perform some analysis on graph dependencies to determine whether they
299 299 are not possible to be met. If the scheduler does discover that a dependency cannot be
300 300 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
301 301 scheduler realized that a task can never be run, it won't sit indefinitely in the
302 302 scheduler clogging the pipeline.
303 303
304 304 The basic cases that are checked:
305 305
306 306 * depending on nonexistent messages
307 307 * `follow` dependencies were run on more than one machine and `all=True`
308 308 * any dependencies failed and `all=True,success=True,failures=False`
309 309 * all dependencies failed and `all=False,success=True,failure=False`
310 310
311 311 .. warning::
312 312
313 313 This analysis has not been proven to be rigorous, so it is likely possible for tasks
314 314 to become impossible to run in obscure situations, so a timeout may be a good choice.
315 315
316 316
317 317 Retries and Resubmit
318 318 ====================
319 319
320 320 Retries
321 321 -------
322 322
323 323 Another flag for tasks is `retries`. This is an integer, specifying how many times
324 324 a task should be resubmitted after failure. This is useful for tasks that should still run
325 325 if their engine was shutdown, or may have some statistical chance of failing. The default
326 326 is to not retry tasks.
327 327
328 328 Resubmit
329 329 --------
330 330
331 331 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
332 332 you have fixed the error, or because you want to restore the cluster to an interrupted state.
333 333 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
334 334 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
335 335 a task that is pending - only those that have finished, either successful or unsuccessful.
336 336
337 337 .. _parallel_schedulers:
338 338
339 339 Schedulers
340 340 ==========
341 341
342 342 There are a variety of valid ways to determine where jobs should be assigned in a
343 343 load-balancing situation. In IPython, we support several standard schemes, and
344 344 even make it easy to define your own. The scheme can be selected via the ``scheme``
345 345 argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute
346 346 of a controller config object.
347 347
348 348 The built-in routing schemes:
349 349
350 350 To select one of these schemes, simply do::
351 351
352 352 $ ipcontroller --scheme=<schemename>
353 353 for instance:
354 354 $ ipcontroller --scheme=lru
355 355
356 356 lru: Least Recently Used
357 357
358 358 Always assign work to the least-recently-used engine. A close relative of
359 359 round-robin, it will be fair with respect to the number of tasks, agnostic
360 360 with respect to runtime of each task.
361 361
362 362 plainrandom: Plain Random
363 363
364 364 Randomly picks an engine on which to run.
365 365
366 366 twobin: Two-Bin Random
367 367
368 368 **Requires numpy**
369 369
370 370 Pick two engines at random, and use the LRU of the two. This is known to be better
371 371 than plain random in many cases, but requires a small amount of computation.
372 372
373 373 leastload: Least Load
374 374
375 375 **This is the default scheme**
376 376
377 377 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
378 378
379 379 weighted: Weighted Two-Bin Random
380 380
381 381 **Requires numpy**
382 382
383 383 Pick two engines at random using the number of outstanding tasks as inverse weights,
384 384 and use the one with the lower load.
385 385
386 386 Greedy Assignment
387 387 -----------------
388 388
389 Tasks are assigned greedily as they are submitted. If their dependencies are
389 Tasks can be assigned greedily as they are submitted. If their dependencies are
390 390 met, they will be assigned to an engine right away, and multiple tasks can be
391 391 assigned to an engine at a given time. This limit is set with the
392 ``TaskScheduler.hwm`` (high water mark) configurable:
392 ``TaskScheduler.hwm`` (high water mark) configurable in your
393 :file:`ipcontroller_config.py` config file, with:
393 394
394 395 .. sourcecode:: python
395 396
396 397 # the most common choices are:
397 c.TaskSheduler.hwm = 0 # (minimal latency, default in IPython ≀ 0.12)
398 c.TaskSheduler.hwm = 0 # (minimal latency, default in IPython < 0.13)
398 399 # or
399 c.TaskScheduler.hwm = 1 # (most-informed balancing, default in > 0.12)
400 c.TaskScheduler.hwm = 1 # (most-informed balancing, default in β‰₯ 0.13)
400 401
401 In IPython ≀ 0.12,the default is 0, or no-limit. That is, there is no limit to the number of
402 In IPython < 0.13, the default is 0, or no-limit. That is, there is no limit to the number of
402 403 tasks that can be outstanding on a given engine. This greatly benefits the
403 404 latency of execution, because network traffic can be hidden behind computation.
404 405 However, this means that workload is assigned without knowledge of how long
405 406 each task might take, and can result in poor load-balancing, particularly for
406 407 submitting a collection of heterogeneous tasks all at once. You can limit this
407 408 effect by setting hwm to a positive integer, 1 being maximum load-balancing (a
408 409 task will never be waiting if there is an idle engine), and any larger number
409 being a compromise between load-balance and latency-hiding.
410 being a compromise between load-balancing and latency-hiding.
410 411
411 412 In practice, some users have been confused by having this optimization on by
412 default, and the default value has been changed to 1. This can be slower,
413 default, so the default value has been changed to 1 in IPython 0.13. This can be slower,
413 414 but has more obvious behavior and won't result in assigning too many tasks to
414 415 some engines in heterogeneous cases.
415 416
416 417
417 418 Pure ZMQ Scheduler
418 419 ------------------
419 420
420 421 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
421 422 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
422 423 load-balancing. This scheduler does not support any of the advanced features of the Python
423 424 :class:`.Scheduler`.
424 425
425 426 Disabled features when using the ZMQ Scheduler:
426 427
427 428 * Engine unregistration
428 429 Task farming will be disabled if an engine unregisters.
429 430 Further, if an engine is unregistered during computation, the scheduler may not recover.
430 431 * Dependencies
431 432 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
432 433 based on message content.
433 434 * Early destination notification
434 435 The Python schedulers know which engine gets which task, and notify the Hub. This
435 436 allows graceful handling of Engines coming and going. There is no way to know
436 437 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
437 438 engine until they *finish*. This makes recovery from engine shutdown very difficult.
438 439
439 440
440 441 .. note::
441 442
442 443 TODO: performance comparisons
443 444
444 445
445 446
446 447
447 448 More details
448 449 ============
449 450
450 451 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
451 452 of flexibility in how tasks are defined and run. The next places to look are
452 453 in the following classes:
453 454
454 455 * :class:`~IPython.parallel.client.view.LoadBalancedView`
455 456 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
456 457 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
457 458 * :mod:`~IPython.parallel.controller.dependency`
458 459
459 460 The following is an overview of how to use these classes together:
460 461
461 462 1. Create a :class:`Client` and :class:`LoadBalancedView`
462 463 2. Define some functions to be run as tasks
463 464 3. Submit your tasks to using the :meth:`apply` method of your
464 465 :class:`LoadBalancedView` instance.
465 466 4. Use :meth:`.Client.get_result` to get the results of the
466 467 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
467 468 for and then receive the results.
468 469
469 470 .. seealso::
470 471
471 472 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now