parallel_details.txt
493 lines
| 15.4 KiB
| text/plain
|
TextLexer
MinRK
|
r3663 | .. _parallel_details: | ||
========================================== | ||||
Details of Parallel Computing with IPython | ||||
========================================== | ||||
.. note:: | ||||
There are still many sections to fill out | ||||
Caveats | ||||
======= | ||||
First, some caveats about the detailed workings of parallel computing with 0MQ and IPython. | ||||
Non-copying sends and numpy arrays | ||||
---------------------------------- | ||||
When numpy arrays are passed as arguments to apply or via data-movement methods, they are not | ||||
MinRK
|
r3664 | copied. This means that you must be careful if you are sending an array that you intend to work | ||
on. PyZMQ does allow you to track when a message has been sent so you can know when it is safe | ||||
to edit the buffer, but IPython only allows for this. | ||||
MinRK
|
r3663 | |||
MinRK
|
r3664 | It is also important to note that the non-copying receive of a message is *read-only*. That | ||
means that if you intend to work in-place on an array that you have sent or received, you must | ||||
copy it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as | ||||
results. | ||||
MinRK
|
r3663 | |||
The following will fail: | ||||
.. sourcecode:: ipython | ||||
In [3]: A = numpy.zeros(2) | ||||
In [4]: def setter(a): | ||||
...: a[0]=1 | ||||
...: return a | ||||
In [5]: rc[0].apply_sync(setter, A) | ||||
--------------------------------------------------------------------------- | ||||
RemoteError Traceback (most recent call last) | ||||
... | ||||
RemoteError: RuntimeError(array is not writeable) | ||||
Traceback (most recent call last): | ||||
File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 329, in apply_request | ||||
exec code in working, working | ||||
File "<string>", line 1, in <module> | ||||
File "<ipython-input-14-736187483856>", line 2, in setter | ||||
RuntimeError: array is not writeable | ||||
If you do need to edit the array in-place, just remember to copy the array if it's read-only. | ||||
The :attr:`ndarray.flags.writeable` flag will tell you if you can write to an array. | ||||
.. sourcecode:: ipython | ||||
In [3]: A = numpy.zeros(2) | ||||
In [4]: def setter(a): | ||||
...: """only copy read-only arrays""" | ||||
...: if not a.flags.writeable: | ||||
...: a=a.copy() | ||||
...: a[0]=1 | ||||
...: return a | ||||
In [5]: rc[0].apply_sync(setter, A) | ||||
Out[5]: array([ 1., 0.]) | ||||
# note that results will also be read-only: | ||||
In [6]: _.flags.writeable | ||||
Out[6]: False | ||||
MinRK
|
r3664 | If you want to safely edit an array in-place after *sending* it, you must use the `track=True` flag. IPython always performs non-copying sends of arrays, which return immediately. You | ||
must instruct IPython track those messages *at send time* in order to know for sure that the send has completed. AsyncResults have a :attr:`sent` property, and :meth:`wait_on_send` method | ||||
for checking and waiting for 0MQ to finish with a buffer. | ||||
.. sourcecode:: ipython | ||||
In [5]: A = numpy.random.random((1024,1024)) | ||||
In [6]: view.track=True | ||||
In [7]: ar = view.apply_async(lambda x: 2*x, A) | ||||
In [8]: ar.sent | ||||
Out[8]: False | ||||
In [9]: ar.wait_on_send() # blocks until sent is True | ||||
MinRK
|
r3663 | What is sendable? | ||
----------------- | ||||
If IPython doesn't know what to do with an object, it will pickle it. There is a short list of | ||||
objects that are not pickled: ``buffers``, ``str/bytes`` objects, and ``numpy`` | ||||
arrays. These are handled specially by IPython in order to prevent the copying of data. Sending | ||||
bytes or numpy arrays will result in exactly zero in-memory copies of your data (unless the data | ||||
is very small). | ||||
If you have an object that provides a Python buffer interface, then you can always send that | ||||
buffer without copying - and reconstruct the object on the other side in your own code. It is | ||||
possible that the object reconstruction will become extensible, so you can add your own | ||||
non-copying types, but this does not yet exist. | ||||
MinRK
|
r3664 | Closures | ||
******** | ||||
Just about anything in Python is pickleable. The one notable exception is objects (generally | ||||
functions) with *closures*. Closures can be a complicated topic, but the basic principal is that | ||||
functions that refer to variables in their parent scope have closures. | ||||
An example of a function that uses a closure: | ||||
.. sourcecode:: python | ||||
def f(a): | ||||
def inner(): | ||||
# inner will have a closure | ||||
return a | ||||
return echo | ||||
f1 = f(1) | ||||
f2 = f(2) | ||||
f1() # returns 1 | ||||
f2() # returns 2 | ||||
f1 and f2 will have closures referring to the scope in which `inner` was defined, because they | ||||
use the variable 'a'. As a result, you would not be able to send ``f1`` or ``f2`` with IPython. | ||||
Note that you *would* be able to send `f`. This is only true for interactively defined | ||||
functions (as are often used in decorators), and only when there are variables used inside the | ||||
inner function, that are defined in the outer function. If the names are *not* in the outer | ||||
function, then there will not be a closure, and the generated function will look in | ||||
``globals()`` for the name: | ||||
.. sourcecode:: python | ||||
def g(b): | ||||
# note that `b` is not referenced in inner's scope | ||||
def inner(): | ||||
# this inner will *not* have a closure | ||||
return a | ||||
return echo | ||||
g1 = g(1) | ||||
g2 = g(2) | ||||
g1() # raises NameError on 'a' | ||||
a=5 | ||||
g2() # returns 5 | ||||
`g1` and `g2` *will* be sendable with IPython, and will treat the engine's namespace as | ||||
globals(). The :meth:`pull` method is implemented based on this principal. If we did not | ||||
provide pull, you could implement it yourself with `apply`, by simply returning objects out | ||||
of the global namespace: | ||||
.. sourcecode:: ipython | ||||
In [10]: view.apply(lambda : a) | ||||
# is equivalent to | ||||
In [11]: view.pull('a') | ||||
MinRK
|
r3663 | |||
Running Code | ||||
============ | ||||
There are two principal units of execution in Python: strings of Python code (e.g. 'a=5'), | ||||
and Python functions. IPython is designed around the use of functions via the core | ||||
Client method, called `apply`. | ||||
Apply | ||||
----- | ||||
MinRK
|
r3664 | The principal method of remote execution is :meth:`apply`, of View objects. The Client provides | ||
the full execution and communication API for engines via its low-level | ||||
:meth:`send_apply_message` method. | ||||
MinRK
|
r3663 | |||
f : function | ||||
The fuction to be called remotely | ||||
args : tuple/list | ||||
The positional arguments passed to `f` | ||||
kwargs : dict | ||||
The keyword arguments passed to `f` | ||||
block : bool (default: self.block) | ||||
Whether to wait for the result, or return immediately. | ||||
False: | ||||
returns AsyncResult | ||||
True: | ||||
returns actual result(s) of f(*args, **kwargs) | ||||
if multiple targets: | ||||
list of results, matching `targets` | ||||
track : bool | ||||
whether to track non-copying sends. | ||||
[default False] | ||||
targets : int,list of ints, 'all', None | ||||
Specify the destination of the job. | ||||
if None: | ||||
Submit via Task queue for load-balancing. | ||||
if 'all': | ||||
Run on all active engines | ||||
if list: | ||||
Run on each specified engine | ||||
if int: | ||||
Run on single engine | ||||
Not eht | ||||
balanced : bool, default None | ||||
whether to load-balance. This will default to True | ||||
if targets is unspecified, or False if targets is specified. | ||||
If `balanced` and `targets` are both specified, the task will | ||||
be assigne to *one* of the targets by the scheduler. | ||||
after : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a time-based dependency. | ||||
This job will only be run *after* the dependencies | ||||
have been met. | ||||
follow : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a location-based dependency. | ||||
This job will only be run on an engine where this dependency | ||||
is met. | ||||
timeout : float/int or None | ||||
Only for load-balanced execution (targets=None) | ||||
Specify an amount of time (in seconds) for the scheduler to | ||||
wait for dependencies to be met before failing with a | ||||
DependencyTimeout. | ||||
execute and run | ||||
--------------- | ||||
MinRK
|
r3664 | For executing strings of Python code, :class:`DirectView`s also provide an :meth:`execute` and a | ||
:meth:`run` method, which rather than take functions and arguments, take simple strings. | ||||
`execute` simply takes a string of Python code to execute, and sends it to the Engine(s). `run` | ||||
is the same as `execute`, but for a *file*, rather than a string. It is simply a wrapper that | ||||
does something very similar to ``execute(open(f).read())``. | ||||
MinRK
|
r3663 | |||
.. note:: | ||||
TODO: Example | ||||
Views | ||||
===== | ||||
The principal extension of the :class:`~parallel.client.Client` is the | ||||
MinRK
|
r3664 | :class:`~parallel.view.View` class. The client | ||
MinRK
|
r3663 | |||
Two of apply's keyword arguments are set at the construction of the View, and are immutable for | ||||
a given View: `balanced` and `targets`. `balanced` determines whether the View will be a | ||||
:class:`.LoadBalancedView` or a :class:`.DirectView`, and `targets` will be the View's `targets` | ||||
attribute. Attempts to change this will raise errors. | ||||
MinRK
|
r3664 | Views are cached by targets/class, so requesting a view multiple times will always return the | ||
*same object*, not create a new one: | ||||
MinRK
|
r3663 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [3]: v1 = rc.load_balanced_view([1,2,3]) | ||
In [4]: v2 = rc.load_balanced_view([1,2,3]) | ||||
MinRK
|
r3663 | |||
In [5]: v2 is v1 | ||||
Out[5]: True | ||||
DirectView | ||||
---------- | ||||
The :class:`.DirectView` is the class for the IPython :ref:`Multiplexing Interface | ||||
<parallel_multiengine>`. | ||||
Creating a DirectView | ||||
********************* | ||||
DirectViews can be created in two ways, by index access to a client, or by a client's | ||||
:meth:`view` method. Index access to a Client works in a few ways. First, you can create | ||||
DirectViews to single engines simply by accessing the client by engine id: | ||||
.. sourcecode:: ipython | ||||
In [2]: rc[0] | ||||
Out[2]: <DirectView 0> | ||||
You can also create a DirectView with a list of engines: | ||||
.. sourcecode:: ipython | ||||
In [2]: rc[0,1,2] | ||||
Out[2]: <DirectView [0,1,2]> | ||||
Other methods for accessing elements, such as slicing and negative indexing, work by passing | ||||
the index directly to the client's :attr:`ids` list, so: | ||||
.. sourcecode:: ipython | ||||
# negative index | ||||
In [2]: rc[-1] | ||||
Out[2]: <DirectView 3> | ||||
# or slicing: | ||||
In [3]: rc[::2] | ||||
Out[3]: <DirectView [0,2]> | ||||
are always the same as: | ||||
.. sourcecode:: ipython | ||||
In [2]: rc[rc.ids[-1]] | ||||
Out[2]: <DirectView 3> | ||||
In [3]: rc[rc.ids[::2]] | ||||
Out[3]: <DirectView [0,2]> | ||||
Also note that the slice is evaluated at the time of construction of the DirectView, so the | ||||
targets will not change over time if engines are added/removed from the cluster. Requesting | ||||
two views with the same slice at different times will *not* necessarily return the same View | ||||
if the number of engines has changed. | ||||
Execution via DirectView | ||||
************************ | ||||
The DirectView is the simplest way to work with one or more engines directly (hence the name). | ||||
Data movement via DirectView | ||||
**************************** | ||||
Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide | ||||
dictionary-style access by key and methods such as :meth:`get` and | ||||
:meth:`update` for convenience. This make the remote namespaces of the engines | ||||
appear as a local dictionary. Underneath, these methods call :meth:`apply`: | ||||
.. sourcecode:: ipython | ||||
In [51]: dview['a']=['foo','bar'] | ||||
In [52]: dview['a'] | ||||
Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] | ||||
Scatter and gather | ||||
------------------ | ||||
Sometimes it is useful to partition a sequence and push the partitions to | ||||
different engines. In MPI language, this is know as scatter/gather and we | ||||
follow that terminology. However, it is important to remember that in | ||||
IPython's :class:`Client` class, :meth:`scatter` is from the | ||||
interactive IPython session to the engines and :meth:`gather` is from the | ||||
engines back to the interactive IPython session. For scatter/gather operations | ||||
between engines, MPI should be used: | ||||
.. sourcecode:: ipython | ||||
In [58]: dview.scatter('a',range(16)) | ||||
Out[58]: [None,None,None,None] | ||||
In [59]: dview['a'] | ||||
Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] | ||||
In [60]: dview.gather('a') | ||||
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] | ||||
LoadBalancedView | ||||
---------------- | ||||
The :class:`.LoadBalancedView` | ||||
Data Movement | ||||
============= | ||||
push | ||||
pull | ||||
Reference | ||||
Results | ||||
======= | ||||
AsyncResults are the primary class | ||||
get_result | ||||
results,metadata | ||||
Querying the Hub | ||||
================ | ||||
The Hub sees all traffic that may pass through the schedulers between engines and clients. | ||||
It does this so that it can track state, allowing multiple clients to retrieve results of | ||||
computations submitted by their peers, as well as persisting the state to a database. | ||||
queue_status | ||||
You can check the status of the queues of the engines with this command. | ||||
result_status | ||||
purge_results | ||||
Controlling the Engines | ||||
======================= | ||||
There are a few actions you can do with Engines that do not involve execution. These | ||||
messages are sent via the Control socket, and bypass any long queues of waiting execution | ||||
jobs | ||||
abort | ||||
Sometimes you may want to prevent a job you have submitted from actually running. The method | ||||
for this is :meth:`abort`. It takes a container of msg_ids, and instructs the Engines to not | ||||
run the jobs if they arrive. The jobs will then fail with an AbortedTask error. | ||||
clear | ||||
You may want to purge the Engine(s) namespace of any data you have left in it. After | ||||
running `clear`, there will be no names in the Engine's namespace | ||||
shutdown | ||||
You can also instruct engines (and the Controller) to terminate from a Client. This | ||||
can be useful when a job is finished, since you can shutdown all the processes with a | ||||
single command. | ||||
Synchronization | ||||
=============== | ||||
Since the Client is a synchronous object, events do not automatically trigger in your | ||||
interactive session - you must poll the 0MQ sockets for incoming messages. Note that | ||||
this polling *does not* actually make any network requests. It simply performs a `select` | ||||
operation, to check if messages are already in local memory, waiting to be handled. | ||||
MinRK
|
r3664 | The method that handles incoming messages is :meth:`spin`. This method flushes any waiting | ||
messages on the various incoming sockets, and updates the state of the Client. | ||||
MinRK
|
r3663 | |||
MinRK
|
r3664 | If you need to wait for particular results to finish, you can use the :meth:`wait` method, | ||
MinRK
|
r3663 | which will call :meth:`spin` until the messages are no longer outstanding. Anything that | ||
represents a collection of messages, such as a list of msg_ids or one or more AsyncResult | ||||
MinRK
|
r3664 | objects, can be passed as argument to wait. A timeout can be specified, which will prevent | ||
the call from blocking for more than a specified time, but the default behavior is to wait | ||||
MinRK
|
r3663 | forever. | ||
The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies. | ||||
MinRK
|
r3664 | This is the default if wait is called with no arguments - i.e. wait on *all* outstanding | ||
messages. | ||||
MinRK
|
r3663 | |||
.. note:: | ||||
MinRK
|
r3664 | TODO wait example | ||
MinRK
|
r3663 | |||
Map | ||||
=== | ||||
Many parallel computing problems can be expressed as a `map`, or running a single program with a | ||||
variety of different inputs. Python has a built-in :py-func:`map`, which does exactly this, and | ||||
many parallel execution tools in Python, such as the built-in :py-class:`multiprocessing.Pool` | ||||
object provide implementations of `map`. All View objects provide a :meth:`map` method as well, | ||||
but the load-balanced and direct implementations differ. | ||||
Views' map methods can be called on any number of sequences, but they can also take the `block` | ||||
and `bound` keyword arguments, just like :meth:`~client.apply`, but *only as keywords*. | ||||
.. sourcecode:: python | ||||
dview.map(*sequences, block=None) | ||||
* iter, map_async, reduce | ||||
Decorators and RemoteFunctions | ||||
============================== | ||||
@parallel | ||||
@remote | ||||
RemoteFunction | ||||
ParallelFunction | ||||
Dependencies | ||||
============ | ||||
@depend | ||||
@require | ||||
Dependency | ||||