parallel_details.txt
621 lines
| 20.0 KiB
| text/plain
|
TextLexer
MinRK
|
r3663 | .. _parallel_details: | ||
========================================== | ||||
Details of Parallel Computing with IPython | ||||
========================================== | ||||
.. note:: | ||||
There are still many sections to fill out | ||||
Caveats | ||||
======= | ||||
First, some caveats about the detailed workings of parallel computing with 0MQ and IPython. | ||||
Non-copying sends and numpy arrays | ||||
---------------------------------- | ||||
When numpy arrays are passed as arguments to apply or via data-movement methods, they are not | ||||
MinRK
|
r3664 | copied. This means that you must be careful if you are sending an array that you intend to work | ||
on. PyZMQ does allow you to track when a message has been sent so you can know when it is safe | ||||
to edit the buffer, but IPython only allows for this. | ||||
MinRK
|
r3663 | |||
MinRK
|
r3664 | It is also important to note that the non-copying receive of a message is *read-only*. That | ||
means that if you intend to work in-place on an array that you have sent or received, you must | ||||
copy it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as | ||||
results. | ||||
MinRK
|
r3663 | |||
The following will fail: | ||||
.. sourcecode:: ipython | ||||
In [3]: A = numpy.zeros(2) | ||||
In [4]: def setter(a): | ||||
...: a[0]=1 | ||||
...: return a | ||||
In [5]: rc[0].apply_sync(setter, A) | ||||
--------------------------------------------------------------------------- | ||||
RemoteError Traceback (most recent call last) | ||||
... | ||||
RemoteError: RuntimeError(array is not writeable) | ||||
Traceback (most recent call last): | ||||
MinRK
|
r3666 | File "/path/to/site-packages/IPython/parallel/streamkernel.py", line 329, in apply_request | ||
MinRK
|
r3663 | exec code in working, working | ||
File "<string>", line 1, in <module> | ||||
File "<ipython-input-14-736187483856>", line 2, in setter | ||||
RuntimeError: array is not writeable | ||||
If you do need to edit the array in-place, just remember to copy the array if it's read-only. | ||||
The :attr:`ndarray.flags.writeable` flag will tell you if you can write to an array. | ||||
.. sourcecode:: ipython | ||||
In [3]: A = numpy.zeros(2) | ||||
In [4]: def setter(a): | ||||
...: """only copy read-only arrays""" | ||||
...: if not a.flags.writeable: | ||||
...: a=a.copy() | ||||
...: a[0]=1 | ||||
...: return a | ||||
In [5]: rc[0].apply_sync(setter, A) | ||||
Out[5]: array([ 1., 0.]) | ||||
# note that results will also be read-only: | ||||
In [6]: _.flags.writeable | ||||
Out[6]: False | ||||
MinRK
|
r3664 | If you want to safely edit an array in-place after *sending* it, you must use the `track=True` flag. IPython always performs non-copying sends of arrays, which return immediately. You | ||
must instruct IPython track those messages *at send time* in order to know for sure that the send has completed. AsyncResults have a :attr:`sent` property, and :meth:`wait_on_send` method | ||||
for checking and waiting for 0MQ to finish with a buffer. | ||||
.. sourcecode:: ipython | ||||
In [5]: A = numpy.random.random((1024,1024)) | ||||
In [6]: view.track=True | ||||
In [7]: ar = view.apply_async(lambda x: 2*x, A) | ||||
In [8]: ar.sent | ||||
Out[8]: False | ||||
In [9]: ar.wait_on_send() # blocks until sent is True | ||||
MinRK
|
r3663 | What is sendable? | ||
----------------- | ||||
If IPython doesn't know what to do with an object, it will pickle it. There is a short list of | ||||
objects that are not pickled: ``buffers``, ``str/bytes`` objects, and ``numpy`` | ||||
arrays. These are handled specially by IPython in order to prevent the copying of data. Sending | ||||
bytes or numpy arrays will result in exactly zero in-memory copies of your data (unless the data | ||||
is very small). | ||||
If you have an object that provides a Python buffer interface, then you can always send that | ||||
buffer without copying - and reconstruct the object on the other side in your own code. It is | ||||
possible that the object reconstruction will become extensible, so you can add your own | ||||
non-copying types, but this does not yet exist. | ||||
MinRK
|
r3664 | Closures | ||
******** | ||||
Just about anything in Python is pickleable. The one notable exception is objects (generally | ||||
functions) with *closures*. Closures can be a complicated topic, but the basic principal is that | ||||
functions that refer to variables in their parent scope have closures. | ||||
An example of a function that uses a closure: | ||||
.. sourcecode:: python | ||||
def f(a): | ||||
def inner(): | ||||
# inner will have a closure | ||||
return a | ||||
return echo | ||||
f1 = f(1) | ||||
f2 = f(2) | ||||
f1() # returns 1 | ||||
f2() # returns 2 | ||||
f1 and f2 will have closures referring to the scope in which `inner` was defined, because they | ||||
use the variable 'a'. As a result, you would not be able to send ``f1`` or ``f2`` with IPython. | ||||
Note that you *would* be able to send `f`. This is only true for interactively defined | ||||
functions (as are often used in decorators), and only when there are variables used inside the | ||||
inner function, that are defined in the outer function. If the names are *not* in the outer | ||||
function, then there will not be a closure, and the generated function will look in | ||||
``globals()`` for the name: | ||||
.. sourcecode:: python | ||||
def g(b): | ||||
# note that `b` is not referenced in inner's scope | ||||
def inner(): | ||||
# this inner will *not* have a closure | ||||
return a | ||||
return echo | ||||
g1 = g(1) | ||||
g2 = g(2) | ||||
g1() # raises NameError on 'a' | ||||
a=5 | ||||
g2() # returns 5 | ||||
`g1` and `g2` *will* be sendable with IPython, and will treat the engine's namespace as | ||||
globals(). The :meth:`pull` method is implemented based on this principal. If we did not | ||||
provide pull, you could implement it yourself with `apply`, by simply returning objects out | ||||
of the global namespace: | ||||
.. sourcecode:: ipython | ||||
In [10]: view.apply(lambda : a) | ||||
# is equivalent to | ||||
In [11]: view.pull('a') | ||||
MinRK
|
r3663 | |||
Running Code | ||||
============ | ||||
There are two principal units of execution in Python: strings of Python code (e.g. 'a=5'), | ||||
and Python functions. IPython is designed around the use of functions via the core | ||||
Client method, called `apply`. | ||||
Apply | ||||
----- | ||||
MinRK
|
r3664 | The principal method of remote execution is :meth:`apply`, of View objects. The Client provides | ||
the full execution and communication API for engines via its low-level | ||||
:meth:`send_apply_message` method. | ||||
MinRK
|
r3663 | |||
f : function | ||||
The fuction to be called remotely | ||||
args : tuple/list | ||||
The positional arguments passed to `f` | ||||
kwargs : dict | ||||
The keyword arguments passed to `f` | ||||
MinRK
|
r3666 | |||
flags for all views: | ||||
block : bool (default: view.block) | ||||
MinRK
|
r3663 | Whether to wait for the result, or return immediately. | ||
False: | ||||
returns AsyncResult | ||||
True: | ||||
returns actual result(s) of f(*args, **kwargs) | ||||
if multiple targets: | ||||
list of results, matching `targets` | ||||
MinRK
|
r3666 | track : bool [default view.track] | ||
MinRK
|
r3663 | whether to track non-copying sends. | ||
MinRK
|
r3666 | targets : int,list of ints, 'all', None [default view.targets] | ||
MinRK
|
r3663 | Specify the destination of the job. | ||
MinRK
|
r3666 | if 'all' or None: | ||
MinRK
|
r3663 | Run on all active engines | ||
if list: | ||||
Run on each specified engine | ||||
if int: | ||||
Run on single engine | ||||
MinRK
|
r3666 | Note that LoadBalancedView uses targets to restrict possible destinations. LoadBalanced calls | ||
will always execute in just one location. | ||||
flags only in LoadBalancedViews: | ||||
MinRK
|
r3663 | after : Dependency or collection of msg_ids | ||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a time-based dependency. | ||||
This job will only be run *after* the dependencies | ||||
have been met. | ||||
follow : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a location-based dependency. | ||||
This job will only be run on an engine where this dependency | ||||
is met. | ||||
timeout : float/int or None | ||||
Only for load-balanced execution (targets=None) | ||||
Specify an amount of time (in seconds) for the scheduler to | ||||
wait for dependencies to be met before failing with a | ||||
DependencyTimeout. | ||||
execute and run | ||||
--------------- | ||||
MinRK
|
r3664 | For executing strings of Python code, :class:`DirectView`s also provide an :meth:`execute` and a | ||
:meth:`run` method, which rather than take functions and arguments, take simple strings. | ||||
`execute` simply takes a string of Python code to execute, and sends it to the Engine(s). `run` | ||||
is the same as `execute`, but for a *file*, rather than a string. It is simply a wrapper that | ||||
does something very similar to ``execute(open(f).read())``. | ||||
MinRK
|
r3663 | |||
.. note:: | ||||
TODO: Example | ||||
Views | ||||
===== | ||||
MinRK
|
r3666 | The principal extension of the :class:`~parallel.Client` is the | ||
MinRK
|
r3673 | :class:`~parallel.View` class. The client | ||
MinRK
|
r3663 | |||
DirectView | ||||
---------- | ||||
The :class:`.DirectView` is the class for the IPython :ref:`Multiplexing Interface | ||||
<parallel_multiengine>`. | ||||
Creating a DirectView | ||||
********************* | ||||
DirectViews can be created in two ways, by index access to a client, or by a client's | ||||
:meth:`view` method. Index access to a Client works in a few ways. First, you can create | ||||
DirectViews to single engines simply by accessing the client by engine id: | ||||
.. sourcecode:: ipython | ||||
In [2]: rc[0] | ||||
Out[2]: <DirectView 0> | ||||
You can also create a DirectView with a list of engines: | ||||
.. sourcecode:: ipython | ||||
In [2]: rc[0,1,2] | ||||
Out[2]: <DirectView [0,1,2]> | ||||
Other methods for accessing elements, such as slicing and negative indexing, work by passing | ||||
the index directly to the client's :attr:`ids` list, so: | ||||
.. sourcecode:: ipython | ||||
# negative index | ||||
In [2]: rc[-1] | ||||
Out[2]: <DirectView 3> | ||||
# or slicing: | ||||
In [3]: rc[::2] | ||||
Out[3]: <DirectView [0,2]> | ||||
are always the same as: | ||||
.. sourcecode:: ipython | ||||
In [2]: rc[rc.ids[-1]] | ||||
Out[2]: <DirectView 3> | ||||
In [3]: rc[rc.ids[::2]] | ||||
Out[3]: <DirectView [0,2]> | ||||
Also note that the slice is evaluated at the time of construction of the DirectView, so the | ||||
MinRK
|
r3666 | targets will not change over time if engines are added/removed from the cluster. | ||
MinRK
|
r3663 | |||
Execution via DirectView | ||||
************************ | ||||
MinRK
|
r3666 | The DirectView is the simplest way to work with one or more engines directly (hence the name). | ||
MinRK
|
r3663 | |||
Data movement via DirectView | ||||
**************************** | ||||
Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide | ||||
dictionary-style access by key and methods such as :meth:`get` and | ||||
:meth:`update` for convenience. This make the remote namespaces of the engines | ||||
appear as a local dictionary. Underneath, these methods call :meth:`apply`: | ||||
.. sourcecode:: ipython | ||||
In [51]: dview['a']=['foo','bar'] | ||||
In [52]: dview['a'] | ||||
Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] | ||||
Scatter and gather | ||||
------------------ | ||||
Sometimes it is useful to partition a sequence and push the partitions to | ||||
different engines. In MPI language, this is know as scatter/gather and we | ||||
follow that terminology. However, it is important to remember that in | ||||
IPython's :class:`Client` class, :meth:`scatter` is from the | ||||
interactive IPython session to the engines and :meth:`gather` is from the | ||||
engines back to the interactive IPython session. For scatter/gather operations | ||||
between engines, MPI should be used: | ||||
.. sourcecode:: ipython | ||||
In [58]: dview.scatter('a',range(16)) | ||||
Out[58]: [None,None,None,None] | ||||
In [59]: dview['a'] | ||||
Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] | ||||
In [60]: dview.gather('a') | ||||
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] | ||||
MinRK
|
r3666 | Push and pull | ||
------------- | ||||
push | ||||
pull | ||||
MinRK
|
r3663 | |||
LoadBalancedView | ||||
---------------- | ||||
The :class:`.LoadBalancedView` | ||||
Data Movement | ||||
============= | ||||
Reference | ||||
Results | ||||
======= | ||||
MinRK
|
r3671 | AsyncResults | ||
------------ | ||||
MinRK
|
r3663 | |||
MinRK
|
r3671 | Our primary representation is the AsyncResult object, based on the object of the same name in | ||
the built-in :mod:`multiprocessing.pool` module. Our version provides a superset of that | ||||
interface. | ||||
MinRK
|
r3663 | |||
MinRK
|
r3671 | The basic principle of the AsyncResult is the encapsulation of one or more results not yet completed. Execution methods (including data movement, such as push/pull) will all return | ||
AsyncResults when `block=False`. | ||||
The mp.pool.AsyncResult interface | ||||
--------------------------------- | ||||
The basic interface of the AsyncResult is exactly that of the AsyncResult in :mod:`multiprocessing.pool`, and consists of four methods: | ||||
.. AsyncResult spec directly from docs.python.org | ||||
.. class:: AsyncResult | ||||
The stdlib AsyncResult spec | ||||
.. method:: wait([timeout]) | ||||
Wait until the result is available or until *timeout* seconds pass. This | ||||
method always returns ``None``. | ||||
.. method:: ready() | ||||
Return whether the call has completed. | ||||
.. method:: successful() | ||||
Return whether the call completed without raising an exception. Will | ||||
raise :exc:`AssertionError` if the result is not ready. | ||||
.. method:: get([timeout]) | ||||
Return the result when it arrives. If *timeout* is not ``None`` and the | ||||
result does not arrive within *timeout* seconds then | ||||
:exc:`TimeoutError` is raised. If the remote call raised | ||||
an exception then that exception will be reraised as a :exc:`RemoteError` | ||||
by :meth:`get`. | ||||
While an AsyncResult is not done, you can check on it with its :meth:`ready` method, which will | ||||
return whether the AR is done. You can also wait on an AsyncResult with its :meth:`wait` method. | ||||
This method blocks until the result arrives. If you don't want to wait forever, you can pass a | ||||
timeout (in seconds) as an argument to :meth:`wait`. :meth:`wait` will *always return None*, and | ||||
should never raise an error. | ||||
:meth:`ready` and :meth:`wait` are insensitive to the success or failure of the call. After a | ||||
result is done, :meth:`successful` will tell you whether the call completed without raising an | ||||
exception. | ||||
If you actually want the result of the call, you can use :meth:`get`. Initially, :meth:`get` | ||||
behaves just like :meth:`wait`, in that it will block until the result is ready, or until a | ||||
timeout is met. However, unlike :meth:`wait`, :meth:`get` will raise a :exc:`TimeoutError` if | ||||
the timeout is reached and the result is still not ready. If the result arrives before the | ||||
timeout is reached, then :meth:`get` will return the result itself if no exception was raised, | ||||
and will raise an exception if there was. | ||||
Here is where we start to expand on the multiprocessing interface. Rather than raising the | ||||
original exception, a RemoteError will be raised, encapsulating the remote exception with some | ||||
metadata. If the AsyncResult represents multiple calls (e.g. any time `targets` is plural), then | ||||
a CompositeError, a subclass of RemoteError, will be raised. | ||||
.. seealso:: | ||||
For more information on remote exceptions, see :ref:`the section in the Direct Interface | ||||
<Parallel_exceptions>`. | ||||
Extended interface | ||||
****************** | ||||
Other extensions of the AsyncResult interface include convenience wrappers for :meth:`get`. | ||||
AsyncResults have a property, :attr:`result`, with the short alias :attr:`r`, which simply call | ||||
:meth:`get`. Since our object is designed for representing *parallel* results, it is expected | ||||
that many calls (any of those submitted via DirectView) will map results to engine IDs. We | ||||
provide a :meth:`get_dict`, which is also a wrapper on :meth:`get`, which returns a dictionary | ||||
of the individual results, keyed by engine ID. | ||||
You can also prevent a submitted job from actually executing, via the AsyncResult's :meth:`abort` method. This will instruct engines to not execute the job when it arrives. | ||||
The larger extension of the AsyncResult API is the :attr:`metadata` attribute. The metadata | ||||
is a dictionary (with attribute access) that contains, logically enough, metadata about the | ||||
execution. | ||||
Metadata keys: | ||||
timestamps | ||||
submitted | ||||
When the task left the Client | ||||
started | ||||
When the task started execution on the engine | ||||
completed | ||||
When execution finished on the engine | ||||
received | ||||
When the result arrived on the Client | ||||
note that it is not known when the result arrived in 0MQ on the client, only when it | ||||
arrived in Python via :meth:`Client.spin`, so in interactive use, this may not be | ||||
strictly informative. | ||||
Information about the engine | ||||
engine_id | ||||
The integer id | ||||
engine_uuid | ||||
The UUID of the engine | ||||
output of the call | ||||
pyerr | ||||
Python exception, if there was one | ||||
pyout | ||||
Python output | ||||
stderr | ||||
stderr stream | ||||
stdout | ||||
stdout (e.g. print) stream | ||||
And some extended information | ||||
status | ||||
either 'ok' or 'error' | ||||
msg_id | ||||
The UUID of the message | ||||
after | ||||
For tasks: the time-based msg_id dependencies | ||||
follow | ||||
For tasks: the location-based msg_id dependencies | ||||
While in most cases, the Clients that submitted a request will be the ones using the results, | ||||
other Clients can also request results directly from the Hub. This is done via the Client's | ||||
:meth:`get_result` method. This method will *always* return an AsyncResult object. If the call | ||||
was not submitted by the client, then it will be a subclass, called :class:`AsyncHubResult`. | ||||
These behave in the same way as an AsyncResult, but if the result is not ready, waiting on an | ||||
AsyncHubResult polls the Hub, which is much more expensive than the passive polling used | ||||
in regular AsyncResults. | ||||
The Client keeps track of all results | ||||
history, results, metadata | ||||
MinRK
|
r3663 | |||
Querying the Hub | ||||
================ | ||||
The Hub sees all traffic that may pass through the schedulers between engines and clients. | ||||
It does this so that it can track state, allowing multiple clients to retrieve results of | ||||
computations submitted by their peers, as well as persisting the state to a database. | ||||
queue_status | ||||
You can check the status of the queues of the engines with this command. | ||||
result_status | ||||
MinRK
|
r3671 | check on results | ||
MinRK
|
r3663 | purge_results | ||
MinRK
|
r3671 | forget results (conserve resources) | ||
MinRK
|
r3663 | Controlling the Engines | ||
======================= | ||||
There are a few actions you can do with Engines that do not involve execution. These | ||||
messages are sent via the Control socket, and bypass any long queues of waiting execution | ||||
jobs | ||||
abort | ||||
Sometimes you may want to prevent a job you have submitted from actually running. The method | ||||
for this is :meth:`abort`. It takes a container of msg_ids, and instructs the Engines to not | ||||
run the jobs if they arrive. The jobs will then fail with an AbortedTask error. | ||||
clear | ||||
You may want to purge the Engine(s) namespace of any data you have left in it. After | ||||
running `clear`, there will be no names in the Engine's namespace | ||||
shutdown | ||||
You can also instruct engines (and the Controller) to terminate from a Client. This | ||||
can be useful when a job is finished, since you can shutdown all the processes with a | ||||
single command. | ||||
Synchronization | ||||
=============== | ||||
Since the Client is a synchronous object, events do not automatically trigger in your | ||||
interactive session - you must poll the 0MQ sockets for incoming messages. Note that | ||||
this polling *does not* actually make any network requests. It simply performs a `select` | ||||
operation, to check if messages are already in local memory, waiting to be handled. | ||||
MinRK
|
r3664 | The method that handles incoming messages is :meth:`spin`. This method flushes any waiting | ||
messages on the various incoming sockets, and updates the state of the Client. | ||||
MinRK
|
r3663 | |||
MinRK
|
r3664 | If you need to wait for particular results to finish, you can use the :meth:`wait` method, | ||
MinRK
|
r3663 | which will call :meth:`spin` until the messages are no longer outstanding. Anything that | ||
represents a collection of messages, such as a list of msg_ids or one or more AsyncResult | ||||
MinRK
|
r3664 | objects, can be passed as argument to wait. A timeout can be specified, which will prevent | ||
the call from blocking for more than a specified time, but the default behavior is to wait | ||||
MinRK
|
r3663 | forever. | ||
The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies. | ||||
MinRK
|
r3664 | This is the default if wait is called with no arguments - i.e. wait on *all* outstanding | ||
messages. | ||||
MinRK
|
r3663 | |||
.. note:: | ||||
MinRK
|
r3664 | TODO wait example | ||
MinRK
|
r3663 | |||
Map | ||||
=== | ||||
Many parallel computing problems can be expressed as a `map`, or running a single program with a | ||||
variety of different inputs. Python has a built-in :py-func:`map`, which does exactly this, and | ||||
many parallel execution tools in Python, such as the built-in :py-class:`multiprocessing.Pool` | ||||
object provide implementations of `map`. All View objects provide a :meth:`map` method as well, | ||||
but the load-balanced and direct implementations differ. | ||||
Views' map methods can be called on any number of sequences, but they can also take the `block` | ||||
and `bound` keyword arguments, just like :meth:`~client.apply`, but *only as keywords*. | ||||
.. sourcecode:: python | ||||
dview.map(*sequences, block=None) | ||||
* iter, map_async, reduce | ||||
Decorators and RemoteFunctions | ||||
============================== | ||||
@parallel | ||||
@remote | ||||
RemoteFunction | ||||
ParallelFunction | ||||
Dependencies | ||||
============ | ||||
@depend | ||||
@require | ||||
Dependency | ||||