|
|
.. _parallel_details:
|
|
|
|
|
|
==========================================
|
|
|
Details of Parallel Computing with IPython
|
|
|
==========================================
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
There are still many sections to fill out
|
|
|
|
|
|
|
|
|
Caveats
|
|
|
=======
|
|
|
|
|
|
First, some caveats about the detailed workings of parallel computing with 0MQ and IPython.
|
|
|
|
|
|
Non-copying sends and numpy arrays
|
|
|
----------------------------------
|
|
|
|
|
|
When numpy arrays are passed as arguments to apply or via data-movement methods, they are not
|
|
|
copied. This means that you must be careful if you are sending an array that you intend to work
|
|
|
on. PyZMQ does allow you to track when a message has been sent so you can know when it is safe
|
|
|
to edit the buffer, but IPython only allows for this.
|
|
|
|
|
|
It is also important to note that the non-copying receive of a message is *read-only*. That
|
|
|
means that if you intend to work in-place on an array that you have sent or received, you must
|
|
|
copy it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as
|
|
|
results.
|
|
|
|
|
|
The following will fail:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [3]: A = numpy.zeros(2)
|
|
|
|
|
|
In [4]: def setter(a):
|
|
|
...: a[0]=1
|
|
|
...: return a
|
|
|
|
|
|
In [5]: rc[0].apply_sync(setter, A)
|
|
|
---------------------------------------------------------------------------
|
|
|
RemoteError Traceback (most recent call last)
|
|
|
...
|
|
|
RemoteError: RuntimeError(array is not writeable)
|
|
|
Traceback (most recent call last):
|
|
|
File "/path/to/site-packages/IPython/parallel/streamkernel.py", line 329, in apply_request
|
|
|
exec code in working, working
|
|
|
File "<string>", line 1, in <module>
|
|
|
File "<ipython-input-14-736187483856>", line 2, in setter
|
|
|
RuntimeError: array is not writeable
|
|
|
|
|
|
If you do need to edit the array in-place, just remember to copy the array if it's read-only.
|
|
|
The :attr:`ndarray.flags.writeable` flag will tell you if you can write to an array.
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [3]: A = numpy.zeros(2)
|
|
|
|
|
|
In [4]: def setter(a):
|
|
|
...: """only copy read-only arrays"""
|
|
|
...: if not a.flags.writeable:
|
|
|
...: a=a.copy()
|
|
|
...: a[0]=1
|
|
|
...: return a
|
|
|
|
|
|
In [5]: rc[0].apply_sync(setter, A)
|
|
|
Out[5]: array([ 1., 0.])
|
|
|
|
|
|
# note that results will also be read-only:
|
|
|
In [6]: _.flags.writeable
|
|
|
Out[6]: False
|
|
|
|
|
|
If you want to safely edit an array in-place after *sending* it, you must use the `track=True` flag. IPython always performs non-copying sends of arrays, which return immediately. You
|
|
|
must instruct IPython track those messages *at send time* in order to know for sure that the send has completed. AsyncResults have a :attr:`sent` property, and :meth:`wait_on_send` method
|
|
|
for checking and waiting for 0MQ to finish with a buffer.
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [5]: A = numpy.random.random((1024,1024))
|
|
|
|
|
|
In [6]: view.track=True
|
|
|
|
|
|
In [7]: ar = view.apply_async(lambda x: 2*x, A)
|
|
|
|
|
|
In [8]: ar.sent
|
|
|
Out[8]: False
|
|
|
|
|
|
In [9]: ar.wait_on_send() # blocks until sent is True
|
|
|
|
|
|
|
|
|
What is sendable?
|
|
|
-----------------
|
|
|
|
|
|
If IPython doesn't know what to do with an object, it will pickle it. There is a short list of
|
|
|
objects that are not pickled: ``buffers``, ``str/bytes`` objects, and ``numpy``
|
|
|
arrays. These are handled specially by IPython in order to prevent the copying of data. Sending
|
|
|
bytes or numpy arrays will result in exactly zero in-memory copies of your data (unless the data
|
|
|
is very small).
|
|
|
|
|
|
If you have an object that provides a Python buffer interface, then you can always send that
|
|
|
buffer without copying - and reconstruct the object on the other side in your own code. It is
|
|
|
possible that the object reconstruction will become extensible, so you can add your own
|
|
|
non-copying types, but this does not yet exist.
|
|
|
|
|
|
Closures
|
|
|
********
|
|
|
|
|
|
Just about anything in Python is pickleable. The one notable exception is objects (generally
|
|
|
functions) with *closures*. Closures can be a complicated topic, but the basic principal is that
|
|
|
functions that refer to variables in their parent scope have closures.
|
|
|
|
|
|
An example of a function that uses a closure:
|
|
|
|
|
|
.. sourcecode:: python
|
|
|
|
|
|
def f(a):
|
|
|
def inner():
|
|
|
# inner will have a closure
|
|
|
return a
|
|
|
return echo
|
|
|
|
|
|
f1 = f(1)
|
|
|
f2 = f(2)
|
|
|
f1() # returns 1
|
|
|
f2() # returns 2
|
|
|
|
|
|
f1 and f2 will have closures referring to the scope in which `inner` was defined, because they
|
|
|
use the variable 'a'. As a result, you would not be able to send ``f1`` or ``f2`` with IPython.
|
|
|
Note that you *would* be able to send `f`. This is only true for interactively defined
|
|
|
functions (as are often used in decorators), and only when there are variables used inside the
|
|
|
inner function, that are defined in the outer function. If the names are *not* in the outer
|
|
|
function, then there will not be a closure, and the generated function will look in
|
|
|
``globals()`` for the name:
|
|
|
|
|
|
.. sourcecode:: python
|
|
|
|
|
|
def g(b):
|
|
|
# note that `b` is not referenced in inner's scope
|
|
|
def inner():
|
|
|
# this inner will *not* have a closure
|
|
|
return a
|
|
|
return echo
|
|
|
g1 = g(1)
|
|
|
g2 = g(2)
|
|
|
g1() # raises NameError on 'a'
|
|
|
a=5
|
|
|
g2() # returns 5
|
|
|
|
|
|
`g1` and `g2` *will* be sendable with IPython, and will treat the engine's namespace as
|
|
|
globals(). The :meth:`pull` method is implemented based on this principal. If we did not
|
|
|
provide pull, you could implement it yourself with `apply`, by simply returning objects out
|
|
|
of the global namespace:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [10]: view.apply(lambda : a)
|
|
|
|
|
|
# is equivalent to
|
|
|
In [11]: view.pull('a')
|
|
|
|
|
|
Running Code
|
|
|
============
|
|
|
|
|
|
There are two principal units of execution in Python: strings of Python code (e.g. 'a=5'),
|
|
|
and Python functions. IPython is designed around the use of functions via the core
|
|
|
Client method, called `apply`.
|
|
|
|
|
|
Apply
|
|
|
-----
|
|
|
|
|
|
The principal method of remote execution is :meth:`apply`, of View objects. The Client provides
|
|
|
the full execution and communication API for engines via its low-level
|
|
|
:meth:`send_apply_message` method.
|
|
|
|
|
|
f : function
|
|
|
The fuction to be called remotely
|
|
|
args : tuple/list
|
|
|
The positional arguments passed to `f`
|
|
|
kwargs : dict
|
|
|
The keyword arguments passed to `f`
|
|
|
|
|
|
flags for all views:
|
|
|
|
|
|
block : bool (default: view.block)
|
|
|
Whether to wait for the result, or return immediately.
|
|
|
False:
|
|
|
returns AsyncResult
|
|
|
True:
|
|
|
returns actual result(s) of f(*args, **kwargs)
|
|
|
if multiple targets:
|
|
|
list of results, matching `targets`
|
|
|
track : bool [default view.track]
|
|
|
whether to track non-copying sends.
|
|
|
|
|
|
targets : int,list of ints, 'all', None [default view.targets]
|
|
|
Specify the destination of the job.
|
|
|
if 'all' or None:
|
|
|
Run on all active engines
|
|
|
if list:
|
|
|
Run on each specified engine
|
|
|
if int:
|
|
|
Run on single engine
|
|
|
|
|
|
Note that LoadBalancedView uses targets to restrict possible destinations. LoadBalanced calls
|
|
|
will always execute in just one location.
|
|
|
|
|
|
flags only in LoadBalancedViews:
|
|
|
|
|
|
after : Dependency or collection of msg_ids
|
|
|
Only for load-balanced execution (targets=None)
|
|
|
Specify a list of msg_ids as a time-based dependency.
|
|
|
This job will only be run *after* the dependencies
|
|
|
have been met.
|
|
|
|
|
|
follow : Dependency or collection of msg_ids
|
|
|
Only for load-balanced execution (targets=None)
|
|
|
Specify a list of msg_ids as a location-based dependency.
|
|
|
This job will only be run on an engine where this dependency
|
|
|
is met.
|
|
|
|
|
|
timeout : float/int or None
|
|
|
Only for load-balanced execution (targets=None)
|
|
|
Specify an amount of time (in seconds) for the scheduler to
|
|
|
wait for dependencies to be met before failing with a
|
|
|
DependencyTimeout.
|
|
|
|
|
|
execute and run
|
|
|
---------------
|
|
|
|
|
|
For executing strings of Python code, :class:`DirectView`s also provide an :meth:`execute` and a
|
|
|
:meth:`run` method, which rather than take functions and arguments, take simple strings.
|
|
|
`execute` simply takes a string of Python code to execute, and sends it to the Engine(s). `run`
|
|
|
is the same as `execute`, but for a *file*, rather than a string. It is simply a wrapper that
|
|
|
does something very similar to ``execute(open(f).read())``.
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
TODO: Example
|
|
|
|
|
|
Views
|
|
|
=====
|
|
|
|
|
|
The principal extension of the :class:`~parallel.Client` is the
|
|
|
:class:`~parallel.view.View` class. The client
|
|
|
|
|
|
|
|
|
DirectView
|
|
|
----------
|
|
|
|
|
|
The :class:`.DirectView` is the class for the IPython :ref:`Multiplexing Interface
|
|
|
<parallel_multiengine>`.
|
|
|
|
|
|
Creating a DirectView
|
|
|
*********************
|
|
|
|
|
|
DirectViews can be created in two ways, by index access to a client, or by a client's
|
|
|
:meth:`view` method. Index access to a Client works in a few ways. First, you can create
|
|
|
DirectViews to single engines simply by accessing the client by engine id:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [2]: rc[0]
|
|
|
Out[2]: <DirectView 0>
|
|
|
|
|
|
You can also create a DirectView with a list of engines:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [2]: rc[0,1,2]
|
|
|
Out[2]: <DirectView [0,1,2]>
|
|
|
|
|
|
Other methods for accessing elements, such as slicing and negative indexing, work by passing
|
|
|
the index directly to the client's :attr:`ids` list, so:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
# negative index
|
|
|
In [2]: rc[-1]
|
|
|
Out[2]: <DirectView 3>
|
|
|
|
|
|
# or slicing:
|
|
|
In [3]: rc[::2]
|
|
|
Out[3]: <DirectView [0,2]>
|
|
|
|
|
|
are always the same as:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [2]: rc[rc.ids[-1]]
|
|
|
Out[2]: <DirectView 3>
|
|
|
|
|
|
In [3]: rc[rc.ids[::2]]
|
|
|
Out[3]: <DirectView [0,2]>
|
|
|
|
|
|
Also note that the slice is evaluated at the time of construction of the DirectView, so the
|
|
|
targets will not change over time if engines are added/removed from the cluster.
|
|
|
|
|
|
Execution via DirectView
|
|
|
************************
|
|
|
|
|
|
The DirectView is the simplest way to work with one or more engines directly (hence the name).
|
|
|
|
|
|
|
|
|
Data movement via DirectView
|
|
|
****************************
|
|
|
|
|
|
Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
|
|
|
dictionary-style access by key and methods such as :meth:`get` and
|
|
|
:meth:`update` for convenience. This make the remote namespaces of the engines
|
|
|
appear as a local dictionary. Underneath, these methods call :meth:`apply`:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [51]: dview['a']=['foo','bar']
|
|
|
|
|
|
In [52]: dview['a']
|
|
|
Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
|
|
|
|
|
|
Scatter and gather
|
|
|
------------------
|
|
|
|
|
|
Sometimes it is useful to partition a sequence and push the partitions to
|
|
|
different engines. In MPI language, this is know as scatter/gather and we
|
|
|
follow that terminology. However, it is important to remember that in
|
|
|
IPython's :class:`Client` class, :meth:`scatter` is from the
|
|
|
interactive IPython session to the engines and :meth:`gather` is from the
|
|
|
engines back to the interactive IPython session. For scatter/gather operations
|
|
|
between engines, MPI should be used:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [58]: dview.scatter('a',range(16))
|
|
|
Out[58]: [None,None,None,None]
|
|
|
|
|
|
In [59]: dview['a']
|
|
|
Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
|
|
|
|
|
|
In [60]: dview.gather('a')
|
|
|
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
|
|
|
|
|
|
Push and pull
|
|
|
-------------
|
|
|
|
|
|
push
|
|
|
|
|
|
pull
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LoadBalancedView
|
|
|
----------------
|
|
|
|
|
|
The :class:`.LoadBalancedView`
|
|
|
|
|
|
|
|
|
Data Movement
|
|
|
=============
|
|
|
|
|
|
Reference
|
|
|
|
|
|
Results
|
|
|
=======
|
|
|
|
|
|
AsyncResults are the primary class
|
|
|
|
|
|
get_result
|
|
|
|
|
|
results, metadata
|
|
|
|
|
|
Querying the Hub
|
|
|
================
|
|
|
|
|
|
The Hub sees all traffic that may pass through the schedulers between engines and clients.
|
|
|
It does this so that it can track state, allowing multiple clients to retrieve results of
|
|
|
computations submitted by their peers, as well as persisting the state to a database.
|
|
|
|
|
|
queue_status
|
|
|
|
|
|
You can check the status of the queues of the engines with this command.
|
|
|
|
|
|
result_status
|
|
|
|
|
|
purge_results
|
|
|
|
|
|
Controlling the Engines
|
|
|
=======================
|
|
|
|
|
|
There are a few actions you can do with Engines that do not involve execution. These
|
|
|
messages are sent via the Control socket, and bypass any long queues of waiting execution
|
|
|
jobs
|
|
|
|
|
|
abort
|
|
|
|
|
|
Sometimes you may want to prevent a job you have submitted from actually running. The method
|
|
|
for this is :meth:`abort`. It takes a container of msg_ids, and instructs the Engines to not
|
|
|
run the jobs if they arrive. The jobs will then fail with an AbortedTask error.
|
|
|
|
|
|
clear
|
|
|
|
|
|
You may want to purge the Engine(s) namespace of any data you have left in it. After
|
|
|
running `clear`, there will be no names in the Engine's namespace
|
|
|
|
|
|
shutdown
|
|
|
|
|
|
You can also instruct engines (and the Controller) to terminate from a Client. This
|
|
|
can be useful when a job is finished, since you can shutdown all the processes with a
|
|
|
single command.
|
|
|
|
|
|
Synchronization
|
|
|
===============
|
|
|
|
|
|
Since the Client is a synchronous object, events do not automatically trigger in your
|
|
|
interactive session - you must poll the 0MQ sockets for incoming messages. Note that
|
|
|
this polling *does not* actually make any network requests. It simply performs a `select`
|
|
|
operation, to check if messages are already in local memory, waiting to be handled.
|
|
|
|
|
|
The method that handles incoming messages is :meth:`spin`. This method flushes any waiting
|
|
|
messages on the various incoming sockets, and updates the state of the Client.
|
|
|
|
|
|
If you need to wait for particular results to finish, you can use the :meth:`wait` method,
|
|
|
which will call :meth:`spin` until the messages are no longer outstanding. Anything that
|
|
|
represents a collection of messages, such as a list of msg_ids or one or more AsyncResult
|
|
|
objects, can be passed as argument to wait. A timeout can be specified, which will prevent
|
|
|
the call from blocking for more than a specified time, but the default behavior is to wait
|
|
|
forever.
|
|
|
|
|
|
|
|
|
|
|
|
The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies.
|
|
|
This is the default if wait is called with no arguments - i.e. wait on *all* outstanding
|
|
|
messages.
|
|
|
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
TODO wait example
|
|
|
|
|
|
Map
|
|
|
===
|
|
|
|
|
|
Many parallel computing problems can be expressed as a `map`, or running a single program with a
|
|
|
variety of different inputs. Python has a built-in :py-func:`map`, which does exactly this, and
|
|
|
many parallel execution tools in Python, such as the built-in :py-class:`multiprocessing.Pool`
|
|
|
object provide implementations of `map`. All View objects provide a :meth:`map` method as well,
|
|
|
but the load-balanced and direct implementations differ.
|
|
|
|
|
|
Views' map methods can be called on any number of sequences, but they can also take the `block`
|
|
|
and `bound` keyword arguments, just like :meth:`~client.apply`, but *only as keywords*.
|
|
|
|
|
|
.. sourcecode:: python
|
|
|
|
|
|
dview.map(*sequences, block=None)
|
|
|
|
|
|
|
|
|
* iter, map_async, reduce
|
|
|
|
|
|
Decorators and RemoteFunctions
|
|
|
==============================
|
|
|
|
|
|
@parallel
|
|
|
|
|
|
@remote
|
|
|
|
|
|
RemoteFunction
|
|
|
|
|
|
ParallelFunction
|
|
|
|
|
|
Dependencies
|
|
|
============
|
|
|
|
|
|
@depend
|
|
|
|
|
|
@require
|
|
|
|
|
|
Dependency
|
|
|
|