parallel_task.txt
471 lines
| 17.4 KiB
| text/plain
|
TextLexer
MinRK
|
r3664 | .. _parallel_task: | ||
MinRK
|
r3586 | |||
========================== | ||||
The IPython task interface | ||||
========================== | ||||
MinRK
|
r3624 | The task interface to the cluster presents the engines as a fault tolerant, | ||
MinRK
|
r3591 | dynamic load-balanced system of workers. Unlike the multiengine interface, in | ||
MinRK
|
r3624 | the task interface the user have no direct access to individual engines. By | ||
allowing the IPython scheduler to assign work, this interface is simultaneously | ||||
simpler and more powerful. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3624 | Best of all, the user can use both of these interfaces running at the same time | ||
MinRK
|
r3591 | to take advantage of their respective strengths. When the user can break up | ||
the user's work into segments that do not depend on previous execution, the | ||||
task interface is ideal. But it also has more power and flexibility, allowing | ||||
the user to guide the distribution of jobs, without having to assign tasks to | ||||
MinRK
|
r3586 | engines explicitly. | ||
Starting the IPython controller and engines | ||||
=========================================== | ||||
To follow along with this tutorial, you will need to start the IPython | ||||
controller and four IPython engines. The simplest way of doing this is to use | ||||
MinRK
|
r3672 | the :command:`ipcluster` command:: | ||
MinRK
|
r3586 | |||
MinRK
|
r4608 | $ ipcluster start -n 4 | ||
MinRK
|
r3586 | |||
For more detailed information about starting the controller and engines, see | ||||
Fernando Perez
|
r4435 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. | ||
MinRK
|
r3586 | |||
MinRK
|
r5169 | Creating a ``LoadBalancedView`` instance | ||
======================================== | ||||
MinRK
|
r3586 | |||
MinRK
|
r3673 | The first step is to import the IPython :mod:`IPython.parallel` | ||
MinRK
|
r3639 | module and then create a :class:`.Client` instance, and we will also be using | ||
a :class:`LoadBalancedView`, here called `lview`: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3666 | In [1]: from IPython.parallel import Client | ||
MinRK
|
r3663 | |||
MinRK
|
r3666 | In [2]: rc = Client() | ||
MinRK
|
r3591 | |||
MinRK
|
r3586 | |||
MinRK
|
r3591 | This form assumes that the controller was started on localhost with default | ||
configuration. If not, the location of the controller must be given as an | ||||
argument to the constructor: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | # for a visible LAN controller listening on an external port: | ||
MinRK
|
r3666 | In [2]: rc = Client('tcp://192.168.1.16:10101') | ||
MinRK
|
r3663 | # or to connect with a specific profile you have set up: | ||
MinRK
|
r3666 | In [3]: rc = Client(profile='mpi') | ||
MinRK
|
r3663 | |||
MinRK
|
r3664 | For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can | ||
be constructed via the client's :meth:`load_balanced_view` method: | ||||
MinRK
|
r3663 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [4]: lview = rc.load_balanced_view() # default load-balanced view | ||
MinRK
|
r3663 | |||
.. seealso:: | ||||
MinRK
|
r3591 | |||
MinRK
|
r3663 | For more information, see the in-depth explanation of :ref:`Views <parallel_details>`. | ||
MinRK
|
r3591 | |||
MinRK
|
r3586 | |||
Quick and easy parallelism | ||||
========================== | ||||
In many cases, you simply want to apply a Python function to a sequence of | ||||
MinRK
|
r3591 | objects, but *in parallel*. Like the multiengine interface, these can be | ||
implemented via the task interface. The exact same tools can perform these | ||||
actions in load-balanced ways as well as multiplexed ways: a parallel version | ||||
of :func:`map` and :func:`@parallel` function decorator. If one specifies the | ||||
MinRK
|
r3635 | argument `balanced=True`, then they are dynamically load balanced. Thus, if the | ||
MinRK
|
r3591 | execution time per item varies significantly, you should use the versions in | ||
the task interface. | ||||
MinRK
|
r3586 | |||
Parallel map | ||||
------------ | ||||
MinRK
|
r3639 | To load-balance :meth:`map`,simply use a LoadBalancedView: | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3639 | |||
In [62]: lview.block = True | ||||
MinRK
|
r5168 | In [63]: serial_result = map(lambda x:x**10, range(32)) | ||
MinRK
|
r3586 | |||
MinRK
|
r5168 | In [64]: parallel_result = lview.map(lambda x:x**10, range(32)) | ||
MinRK
|
r3586 | |||
MinRK
|
r5168 | In [65]: serial_result==parallel_result | ||
Out[65]: True | ||||
MinRK
|
r3586 | |||
Parallel function decorator | ||||
--------------------------- | ||||
Parallel functions are just like normal function, but they can be called on | ||||
sequences and *in parallel*. The multiengine interface provides a decorator | ||||
that turns any Python function into a parallel function: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [10]: @lview.parallel() | ||
MinRK
|
r3586 | ....: def f(x): | ||
....: return 10.0*x**4 | ||||
MinRK
|
r3624 | ....: | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [11]: f.map(range(32)) # this is done in parallel | ||
Out[11]: [0.0,10.0,160.0,...] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3664 | .. _parallel_dependencies: | ||
MinRK
|
r3624 | Dependencies | ||
============ | ||||
Often, pure atomic load-balancing is too primitive for your work. In these cases, you | ||||
may want to associate some kind of `Dependency` that describes when, where, or whether | ||||
a task can be run. In IPython, we provide two types of dependencies: | ||||
`Functional Dependencies`_ and `Graph Dependencies`_ | ||||
.. note:: | ||||
It is important to note that the pure ZeroMQ scheduler does not support dependencies, | ||||
and you will see errors or warnings if you try to use dependencies with the pure | ||||
scheduler. | ||||
Functional Dependencies | ||||
----------------------- | ||||
Functional dependencies are used to determine whether a given engine is capable of running | ||||
a particular task. This is implemented via a special :class:`Exception` class, | ||||
MinRK
|
r3666 | :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple: | ||
MinRK
|
r3624 | if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying | ||
the error up to the client like any other error, catches the error, and submits the task | ||||
to a different engine. This will repeat indefinitely, and a task will never be submitted | ||||
to a given engine a second time. | ||||
You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided | ||||
some decorators for facilitating this behavior. | ||||
There are two decorators and a class used for functional dependencies: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3673 | In [9]: from IPython.parallel import depend, require, dependent | ||
MinRK
|
r3624 | |||
@require | ||||
******** | ||||
The simplest sort of dependency is requiring that a Python module is available. The | ||||
``@require`` decorator lets you define a function that will only run on engines where names | ||||
you specify are importable: | ||||
.. sourcecode:: ipython | ||||
In [10]: @require('numpy', 'zmq') | ||||
MinRK
|
r5169 | ....: def myfunc(): | ||
....: return dostuff() | ||||
MinRK
|
r3624 | |||
Now, any time you apply :func:`myfunc`, the task will only run on a machine that has | ||||
MinRK
|
r3670 | numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported. | ||
MinRK
|
r3624 | |||
@depend | ||||
******* | ||||
The ``@depend`` decorator lets you decorate any function with any *other* function to | ||||
evaluate the dependency. The dependency function will be called at the start of the task, | ||||
and if it returns ``False``, then the dependency will be considered unmet, and the task | ||||
will be assigned to another engine. If the dependency returns *anything other than | ||||
``False``*, the rest of the task will continue. | ||||
.. sourcecode:: ipython | ||||
In [10]: def platform_specific(plat): | ||||
MinRK
|
r5169 | ....: import sys | ||
....: return sys.platform == plat | ||||
MinRK
|
r3624 | |||
In [11]: @depend(platform_specific, 'darwin') | ||||
MinRK
|
r5169 | ....: def mactask(): | ||
....: do_mac_stuff() | ||||
MinRK
|
r3624 | |||
In [12]: @depend(platform_specific, 'nt') | ||||
MinRK
|
r5169 | ....: def wintask(): | ||
....: do_windows_stuff() | ||||
MinRK
|
r3624 | |||
In this case, any time you apply ``mytask``, it will only run on an OSX machine. | ||||
``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)`` | ||||
signature. | ||||
dependents | ||||
********** | ||||
You don't have to use the decorators on your tasks, if for instance you may want | ||||
to run tasks with a single function but varying dependencies, you can directly construct | ||||
the :class:`dependent` object that the decorators use: | ||||
.. sourcecode::ipython | ||||
In [13]: def mytask(*args): | ||||
MinRK
|
r5169 | ....: dostuff() | ||
MinRK
|
r3624 | |||
In [14]: mactask = dependent(mytask, platform_specific, 'darwin') | ||||
# this is the same as decorating the declaration of mytask with @depend | ||||
# but you can do it again: | ||||
In [15]: wintask = dependent(mytask, platform_specific, 'nt') | ||||
# in general: | ||||
In [16]: t = dependent(f, g, *dargs, **dkwargs) | ||||
# is equivalent to: | ||||
In [17]: @depend(g, *dargs, **dkwargs) | ||||
MinRK
|
r5169 | ....: def t(a,b,c): | ||
....: # contents of f | ||||
MinRK
|
r3624 | |||
Graph Dependencies | ||||
------------------ | ||||
Sometimes you want to restrict the time and/or location to run a given task as a function | ||||
of the time and/or location of other tasks. This is implemented via a subclass of | ||||
:class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids` | ||||
corresponding to tasks, and a few attributes to guide how to decide when the Dependency | ||||
has been met. | ||||
The switches we provide for interpreting whether a given dependency set has been met: | ||||
any|all | ||||
Whether the dependency is considered met if *any* of the dependencies are done, or | ||||
only after *all* of them have finished. This is set by a Dependency's :attr:`all` | ||||
boolean attribute, which defaults to ``True``. | ||||
MinRK
|
r3664 | success [default: True] | ||
Whether to consider tasks that succeeded as fulfilling dependencies. | ||||
failure [default : False] | ||||
Whether to consider tasks that failed as fulfilling dependencies. | ||||
using `failure=True,success=False` is useful for setting up cleanup tasks, to be run | ||||
only when tasks have failed. | ||||
Sometimes you want to run a task after another, but only if that task succeeded. In this case, | ||||
``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may | ||||
not care whether the task succeeds, and always want the second task to run, in which case you | ||||
should use `success=failure=True`. The default behavior is to only use successes. | ||||
MinRK
|
r3624 | |||
There are other switches for interpretation that are made at the *task* level. These are | ||||
specified via keyword arguments to the client's :meth:`apply` method. | ||||
after,follow | ||||
You may want to run a task *after* a given set of dependencies have been run and/or | ||||
run it *where* another set of dependencies are met. To support this, every task has an | ||||
`after` dependency to restrict time, and a `follow` dependency to restrict | ||||
destination. | ||||
timeout | ||||
You may also want to set a time-limit for how long the scheduler should wait before a | ||||
task's dependencies are met. This is done via a `timeout`, which defaults to 0, which | ||||
indicates that the task should never timeout. If the timeout is reached, and the | ||||
scheduler still hasn't been able to assign the task to an engine, the task will fail | ||||
with a :class:`DependencyTimeout`. | ||||
.. note:: | ||||
Dependencies only work within the task scheduler. You cannot instruct a load-balanced | ||||
task to run after a job submitted via the MUX interface. | ||||
MinRK
|
r3664 | The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases, | ||
MinRK
|
r3624 | you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the | ||
`follow` and `after` keywords to :meth:`client.apply`: | ||||
.. sourcecode:: ipython | ||||
In [14]: client.block=False | ||||
MinRK
|
r3664 | In [15]: ar = lview.apply(f, args, kwargs) | ||
MinRK
|
r3624 | |||
MinRK
|
r3664 | In [16]: ar2 = lview.apply(f2) | ||
MinRK
|
r3624 | |||
MinRK
|
r5169 | In [17]: with lview.temp_flags(after=[ar,ar2]): | ||
....: ar3 = lview.apply(f3) | ||||
MinRK
|
r3624 | |||
MinRK
|
r5169 | In [18]: with lview.temp_flags(follow=[ar], timeout=2.5) | ||
....: ar4 = lview.apply(f3) | ||||
MinRK
|
r3624 | |||
.. seealso:: | ||||
Some parallel workloads can be described as a `Directed Acyclic Graph | ||||
<http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG | ||||
Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG | ||||
onto task dependencies. | ||||
Impossible Dependencies | ||||
*********************** | ||||
The schedulers do perform some analysis on graph dependencies to determine whether they | ||||
are not possible to be met. If the scheduler does discover that a dependency cannot be | ||||
met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the | ||||
scheduler realized that a task can never be run, it won't sit indefinitely in the | ||||
scheduler clogging the pipeline. | ||||
The basic cases that are checked: | ||||
* depending on nonexistent messages | ||||
* `follow` dependencies were run on more than one machine and `all=True` | ||||
MinRK
|
r3664 | * any dependencies failed and `all=True,success=True,failures=False` | ||
* all dependencies failed and `all=False,success=True,failure=False` | ||||
MinRK
|
r3624 | |||
.. warning:: | ||||
This analysis has not been proven to be rigorous, so it is likely possible for tasks | ||||
to become impossible to run in obscure situations, so a timeout may be a good choice. | ||||
MinRK
|
r3876 | |||
Retries and Resubmit | ||||
==================== | ||||
Retries | ||||
------- | ||||
Another flag for tasks is `retries`. This is an integer, specifying how many times | ||||
a task should be resubmitted after failure. This is useful for tasks that should still run | ||||
if their engine was shutdown, or may have some statistical chance of failing. The default | ||||
is to not retry tasks. | ||||
Resubmit | ||||
-------- | ||||
Sometimes you may want to re-run a task. This could be because it failed for some reason, and | ||||
you have fixed the error, or because you want to restore the cluster to an interrupted state. | ||||
For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more | ||||
msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit | ||||
a task that is pending - only those that have finished, either successful or unsuccessful. | ||||
MinRK
|
r3655 | .. _parallel_schedulers: | ||
MinRK
|
r3624 | Schedulers | ||
========== | ||||
There are a variety of valid ways to determine where jobs should be assigned in a | ||||
load-balancing situation. In IPython, we support several standard schemes, and | ||||
MinRK
|
r3990 | even make it easy to define your own. The scheme can be selected via the ``scheme`` | ||
argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute | ||||
MinRK
|
r3624 | of a controller config object. | ||
The built-in routing schemes: | ||||
MinRK
|
r3655 | To select one of these schemes, simply do:: | ||
Thomas Kluyver
|
r4196 | $ ipcontroller --scheme=<schemename> | ||
MinRK
|
r3655 | for instance: | ||
Thomas Kluyver
|
r4196 | $ ipcontroller --scheme=lru | ||
MinRK
|
r3655 | |||
MinRK
|
r3624 | lru: Least Recently Used | ||
Always assign work to the least-recently-used engine. A close relative of | ||||
round-robin, it will be fair with respect to the number of tasks, agnostic | ||||
with respect to runtime of each task. | ||||
plainrandom: Plain Random | ||||
MinRK
|
r3655 | |||
MinRK
|
r3624 | Randomly picks an engine on which to run. | ||
twobin: Two-Bin Random | ||||
MinRK
|
r3655 | **Requires numpy** | ||
MinRK
|
r3624 | |||
Jason Grout
|
r3848 | Pick two engines at random, and use the LRU of the two. This is known to be better | ||
MinRK
|
r3624 | than plain random in many cases, but requires a small amount of computation. | ||
leastload: Least Load | ||||
**This is the default scheme** | ||||
Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie). | ||||
weighted: Weighted Two-Bin Random | ||||
MinRK
|
r3655 | **Requires numpy** | ||
MinRK
|
r3624 | |||
Pick two engines at random using the number of outstanding tasks as inverse weights, | ||||
and use the one with the lower load. | ||||
MinRK
|
r5890 | Greedy Assignment | ||
----------------- | ||||
Tasks are assigned greedily as they are submitted. If their dependencies are | ||||
met, they will be assigned to an engine right away, and multiple tasks can be | ||||
assigned to an engine at a given time. This limit is set with the | ||||
``TaskScheduler.hwm`` (high water mark) configurable: | ||||
.. sourcecode:: python | ||||
# the most common choices are: | ||||
MinRK
|
r5919 | c.TaskSheduler.hwm = 0 # (minimal latency, default in IPython ≤ 0.12) | ||
MinRK
|
r5890 | # or | ||
MinRK
|
r5919 | c.TaskScheduler.hwm = 1 # (most-informed balancing, default in > 0.12) | ||
MinRK
|
r5890 | |||
MinRK
|
r5919 | In IPython ≤ 0.12,the default is 0, or no-limit. That is, there is no limit to the number of | ||
MinRK
|
r5890 | tasks that can be outstanding on a given engine. This greatly benefits the | ||
latency of execution, because network traffic can be hidden behind computation. | ||||
However, this means that workload is assigned without knowledge of how long | ||||
each task might take, and can result in poor load-balancing, particularly for | ||||
submitting a collection of heterogeneous tasks all at once. You can limit this | ||||
effect by setting hwm to a positive integer, 1 being maximum load-balancing (a | ||||
task will never be waiting if there is an idle engine), and any larger number | ||||
being a compromise between load-balance and latency-hiding. | ||||
MinRK
|
r5919 | In practice, some users have been confused by having this optimization on by | ||
default, and the default value has been changed to 1. This can be slower, | ||||
but has more obvious behavior and won't result in assigning too many tasks to | ||||
some engines in heterogeneous cases. | ||||
MinRK
|
r3624 | |||
Pure ZMQ Scheduler | ||||
------------------ | ||||
For maximum throughput, the 'pure' scheme is not Python at all, but a C-level | ||||
MinRK
|
r4725 | :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all | ||
MinRK
|
r3624 | load-balancing. This scheduler does not support any of the advanced features of the Python | ||
:class:`.Scheduler`. | ||||
Disabled features when using the ZMQ Scheduler: | ||||
* Engine unregistration | ||||
Task farming will be disabled if an engine unregisters. | ||||
Further, if an engine is unregistered during computation, the scheduler may not recover. | ||||
* Dependencies | ||||
Since there is no Python logic inside the Scheduler, routing decisions cannot be made | ||||
based on message content. | ||||
* Early destination notification | ||||
The Python schedulers know which engine gets which task, and notify the Hub. This | ||||
allows graceful handling of Engines coming and going. There is no way to know | ||||
where ZeroMQ messages have gone, so there is no way to know what tasks are on which | ||||
engine until they *finish*. This makes recovery from engine shutdown very difficult. | ||||
MinRK
|
r3655 | |||
MinRK
|
r3624 | |||
.. note:: | ||||
TODO: performance comparisons | ||||
MinRK
|
r3876 | |||
MinRK
|
r3586 | More details | ||
============ | ||||
MinRK
|
r3664 | The :class:`LoadBalancedView` has many more powerful features that allow quite a bit | ||
MinRK
|
r3586 | of flexibility in how tasks are defined and run. The next places to look are | ||
in the following classes: | ||||
MinRK
|
r3673 | * :class:`~IPython.parallel.client.view.LoadBalancedView` | ||
* :class:`~IPython.parallel.client.asyncresult.AsyncResult` | ||||
* :meth:`~IPython.parallel.client.view.LoadBalancedView.apply` | ||||
* :mod:`~IPython.parallel.controller.dependency` | ||||
MinRK
|
r3586 | |||
The following is an overview of how to use these classes together: | ||||
MinRK
|
r3664 | 1. Create a :class:`Client` and :class:`LoadBalancedView` | ||
MinRK
|
r3591 | 2. Define some functions to be run as tasks | ||
3. Submit your tasks to using the :meth:`apply` method of your | ||||
MinRK
|
r3664 | :class:`LoadBalancedView` instance. | ||
MinRK
|
r5169 | 4. Use :meth:`.Client.get_result` to get the results of the | ||
MinRK
|
r3591 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait | ||
for and then receive the results. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3624 | .. seealso:: | ||
MinRK
|
r3586 | |||
MinRK
|
r3624 | A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython. | ||