|
|
.. _parallelmultiengine:
|
|
|
|
|
|
==========================
|
|
|
IPython's Direct interface
|
|
|
==========================
|
|
|
|
|
|
The direct, or multiengine, interface represents one possible way of working with a set of
|
|
|
IPython engines. The basic idea behind the multiengine interface is that the
|
|
|
capabilities of each engine are directly and explicitly exposed to the user.
|
|
|
Thus, in the multiengine interface, each engine is given an id that is used to
|
|
|
identify the engine and give it work to do. This interface is very intuitive
|
|
|
and is designed with interactive usage in mind, and is thus the best place for
|
|
|
new users of IPython to begin.
|
|
|
|
|
|
Starting the IPython controller and engines
|
|
|
===========================================
|
|
|
|
|
|
To follow along with this tutorial, you will need to start the IPython
|
|
|
controller and four IPython engines. The simplest way of doing this is to use
|
|
|
the :command:`ipclusterz` command::
|
|
|
|
|
|
$ ipclusterz start -n 4
|
|
|
|
|
|
For more detailed information about starting the controller and engines, see
|
|
|
our :ref:`introduction <ip1par>` to using IPython for parallel computing.
|
|
|
|
|
|
Creating a ``Client`` instance
|
|
|
==============================
|
|
|
|
|
|
The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
|
|
|
module and then create a :class:`.Client` instance:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [1]: from IPython.zmq.parallel import client
|
|
|
|
|
|
In [2]: rc = client.Client()
|
|
|
|
|
|
This form assumes that the default connection information (stored in
|
|
|
:file:`ipcontroller-client.json` found in `~/.ipython/clusterz_default/security`) is
|
|
|
accurate. If the controller was started on a remote machine, you must copy that connection
|
|
|
file to the client machine, or enter its contents as arguments to the Client constructor:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
# If you have copied the json connector file from the controller:
|
|
|
In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
|
|
|
# for a remote controller at 10.0.1.5, visible from my.server.com:
|
|
|
In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
|
|
|
|
|
|
|
|
|
To make sure there are engines connected to the controller, use can get a list
|
|
|
of engine ids:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [3]: rc.ids
|
|
|
Out[3]: set([0, 1, 2, 3])
|
|
|
|
|
|
Here we see that there are four engines ready to do work for us.
|
|
|
|
|
|
Quick and easy parallelism
|
|
|
==========================
|
|
|
|
|
|
In many cases, you simply want to apply a Python function to a sequence of
|
|
|
objects, but *in parallel*. The client interface provides a simple way
|
|
|
of accomplishing this: using the builtin :func:`map` and the ``@remote``
|
|
|
function decorator, or the client's :meth:`map` method.
|
|
|
|
|
|
Parallel map
|
|
|
------------
|
|
|
|
|
|
Python's builtin :func:`map` functions allows a function to be applied to a
|
|
|
sequence element-by-element. This type of code is typically trivial to
|
|
|
parallelize. In fact, since IPython's interface is all about functions anyway,
|
|
|
you can just use the builtin :func:`map`, or a client's :meth:`map` method:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [62]: serial_result = map(lambda x:x**10, range(32))
|
|
|
|
|
|
In [66]: parallel_result = rc.map(lambda x: x**10, range(32))
|
|
|
|
|
|
In [67]: serial_result==parallel_result
|
|
|
Out[67]: True
|
|
|
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
The client's own version of :meth:`map` or that of :class:`.DirectView` do
|
|
|
not do any load balancing. For a load balanced version, use a
|
|
|
:class:`LoadBalancedView`, or a :class:`ParallelFunction` with
|
|
|
`targets=None`.
|
|
|
|
|
|
.. seealso::
|
|
|
|
|
|
:meth:`map` is implemented via :class:`.ParallelFunction`.
|
|
|
|
|
|
Remote function decorator
|
|
|
-------------------------
|
|
|
|
|
|
Remote functions are just like normal functions, but when they are called,
|
|
|
they execute on one or more engines, rather than locally. IPython provides
|
|
|
some decorators:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [10]: @rc.remote(block=True)
|
|
|
....: def f(x):
|
|
|
....: return 10.0*x**4
|
|
|
....:
|
|
|
|
|
|
In [11]: map(f, range(32)) # this is done in parallel
|
|
|
Out[11]: [0.0,10.0,160.0,...]
|
|
|
|
|
|
See the docstring for the :func:`parallel` and :func:`remote` decorators for
|
|
|
options.
|
|
|
|
|
|
Calling Python functions
|
|
|
========================
|
|
|
|
|
|
The most basic type of operation that can be performed on the engines is to
|
|
|
execute Python code or call Python functions. Executing Python code can be
|
|
|
done in blocking or non-blocking mode (non-blocking is default) using the
|
|
|
:meth:`execute` method, and calling functions can be done via the
|
|
|
:meth:`.View.apply` method.
|
|
|
|
|
|
apply
|
|
|
-----
|
|
|
|
|
|
The main method for doing remote execution (in fact, all methods that
|
|
|
communicate with the engines are built on top of it), is :meth:`Client.apply`.
|
|
|
Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
|
|
|
which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
|
|
|
require some more options, they cannot easily provide this interface.
|
|
|
Instead, they provide the signature::
|
|
|
|
|
|
c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
|
|
|
after=None, follow=None, timeout=None)
|
|
|
|
|
|
In order to provide the nicer interface, we have :class:`View` classes, which wrap
|
|
|
:meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
|
|
|
the extra arguments. For instance, performing index-access on a client creates a
|
|
|
:class:`.LoadBalancedView`.
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [4]: view = rc[1:3]
|
|
|
Out[4]: <DirectView [1, 2]>
|
|
|
|
|
|
In [5]: view.apply<tab>
|
|
|
view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound
|
|
|
|
|
|
A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
|
|
|
and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
|
|
|
methods allow specifying `bound` and `block` via the different methods.
|
|
|
|
|
|
================== ========== ==========
|
|
|
method block bound
|
|
|
================== ========== ==========
|
|
|
apply self.block self.bound
|
|
|
apply_sync True False
|
|
|
apply_async False False
|
|
|
apply_sync_bound True True
|
|
|
apply_async_bound False True
|
|
|
================== ========== ==========
|
|
|
|
|
|
For explanation of these values, read on.
|
|
|
|
|
|
Blocking execution
|
|
|
------------------
|
|
|
|
|
|
In blocking mode, the :class:`.DirectView` object (called ``dview`` in
|
|
|
these examples) submits the command to the controller, which places the
|
|
|
command in the engines' queues for execution. The :meth:`apply` call then
|
|
|
blocks until the engines are done executing the command:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [2]: rc.block=True
|
|
|
In [3]: dview = rc[:] # A DirectView of all engines
|
|
|
In [4]: dview['a'] = 5
|
|
|
|
|
|
In [5]: dview['b'] = 10
|
|
|
|
|
|
In [6]: dview.apply_bound(lambda x: a+b+x, 27)
|
|
|
Out[6]: [42, 42, 42, 42]
|
|
|
|
|
|
Python commands can be executed on specific engines by calling execute using
|
|
|
the ``targets`` keyword argument, or creating a :class:`DirectView` instance
|
|
|
by index-access to the client:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2])
|
|
|
|
|
|
In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
|
|
|
|
|
|
In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
|
|
|
Out[8]: [15, -5, 15, -5]
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
|
|
|
``rc[<x>].<meth>(...)``, which constructs a View object. The only place
|
|
|
where this differs in in :meth:`apply`. The :class:`Client` takes many
|
|
|
arguments to apply, so it requires `args` and `kwargs` to be passed as
|
|
|
individual arguments. Extended options such as `bound`,`targets`, and
|
|
|
`block` are controlled by the attributes of the :class:`View` objects, so
|
|
|
they can provide the much more convenient
|
|
|
:meth:`View.apply(f,*args,**kwargs)`, which simply calls
|
|
|
``f(*args,**kwargs)`` remotely.
|
|
|
|
|
|
This example also shows one of the most important things about the IPython
|
|
|
engines: they have a persistent user namespaces. The :meth:`apply` method can
|
|
|
be run in either a bound or unbound way. The default for a View is to be
|
|
|
unbound, unless called by the :meth:`apply_bound` method:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [9]: rc[:]['b'] = 5 # assign b to 5 everywhere
|
|
|
|
|
|
In [10]: v0 = rc[0]
|
|
|
|
|
|
In [12]: v0.apply_bound(lambda : b)
|
|
|
Out[12]: 5
|
|
|
|
|
|
In [13]: v0.apply(lambda : b)
|
|
|
---------------------------------------------------------------------------
|
|
|
RemoteError Traceback (most recent call last)
|
|
|
/home/you/<ipython-input-34-21a468eb10f0> in <module>()
|
|
|
----> 1 v0.apply(lambda : b)
|
|
|
...
|
|
|
RemoteError: NameError(global name 'b' is not defined)
|
|
|
Traceback (most recent call last):
|
|
|
File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
|
|
|
exec code in working, working
|
|
|
File "<string>", line 1, in <module>
|
|
|
File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
|
|
|
NameError: global name 'b' is not defined
|
|
|
|
|
|
|
|
|
Specifically, `bound=True` specifies that the engine's namespace is to be used
|
|
|
for execution, and `bound=False` specifies that the engine's namespace is not
|
|
|
to be used (hence, 'b' is undefined during unbound execution, since the
|
|
|
function is called in an empty namespace). Unbound execution is often useful
|
|
|
for large numbers of atomic tasks, which prevents bloating the engine's
|
|
|
memory, while bound execution lets you build on your previous work.
|
|
|
|
|
|
|
|
|
Non-blocking execution
|
|
|
----------------------
|
|
|
|
|
|
In non-blocking mode, :meth:`apply` submits the command to be executed and
|
|
|
then returns a :class:`AsyncResult` object immediately. The
|
|
|
:class:`AsyncResult` object gives you a way of getting a result at a later
|
|
|
time through its :meth:`get` method.
|
|
|
|
|
|
.. Note::
|
|
|
|
|
|
The :class:`AsyncResult` object provides a superset of the interface in
|
|
|
:py:class:`multiprocessing.pool.AsyncResult`. See the
|
|
|
`official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
|
|
|
for more.
|
|
|
|
|
|
|
|
|
This allows you to quickly submit long running commands without blocking your
|
|
|
local Python/IPython session:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
# define our function
|
|
|
In [6]: def wait(t):
|
|
|
...: import time
|
|
|
...: tic = time.time()
|
|
|
...: time.sleep(t)
|
|
|
...: return time.time()-tic
|
|
|
|
|
|
# In non-blocking mode
|
|
|
In [7]: pr = rc[:].apply_async(wait, 2)
|
|
|
|
|
|
# Now block for the result
|
|
|
In [8]: pr.get()
|
|
|
Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
|
|
|
|
|
|
# Again in non-blocking mode
|
|
|
In [9]: pr = rc[:].apply_async(wait, 10)
|
|
|
|
|
|
# Poll to see if the result is ready
|
|
|
In [10]: pr.ready()
|
|
|
Out[10]: False
|
|
|
|
|
|
# ask for the result, but wait a maximum of 1 second:
|
|
|
In [45]: pr.get(1)
|
|
|
---------------------------------------------------------------------------
|
|
|
TimeoutError Traceback (most recent call last)
|
|
|
/home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
|
|
|
----> 1 pr.get(1)
|
|
|
|
|
|
/path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
|
|
|
62 raise self._exception
|
|
|
63 else:
|
|
|
---> 64 raise error.TimeoutError("Result not ready.")
|
|
|
65
|
|
|
66 def ready(self):
|
|
|
|
|
|
TimeoutError: Result not ready.
|
|
|
|
|
|
.. Note::
|
|
|
|
|
|
Note the import inside the function. This is a common model, to ensure
|
|
|
that the appropriate modules are imported where the task is run.
|
|
|
|
|
|
Often, it is desirable to wait until a set of :class:`AsyncResult` objects
|
|
|
are done. For this, there is a the method :meth:`barrier`. This method takes a
|
|
|
tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
|
|
|
associated results are ready:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [72]: rc.block=False
|
|
|
|
|
|
# A trivial list of AsyncResults objects
|
|
|
In [73]: pr_list = [rc[:].apply_async(wait, 3) for i in range(10)]
|
|
|
|
|
|
# Wait until all of them are done
|
|
|
In [74]: rc.barrier(pr_list)
|
|
|
|
|
|
# Then, their results are ready using get() or the `.r` attribute
|
|
|
In [75]: pr_list[0].get()
|
|
|
Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
|
|
|
|
|
|
|
|
|
|
|
|
The ``block`` and ``targets`` keyword arguments and attributes
|
|
|
--------------------------------------------------------------
|
|
|
|
|
|
.. warning::
|
|
|
|
|
|
This is different now, I haven't updated this section.
|
|
|
-MinRK
|
|
|
|
|
|
Most methods(like :meth:`apply`) accept
|
|
|
``block`` and ``targets`` as keyword arguments. As we have seen above, these
|
|
|
keyword arguments control the blocking mode and which engines the command is
|
|
|
applied to. The :class:`Client` class also has :attr:`block` and
|
|
|
:attr:`targets` attributes that control the default behavior when the keyword
|
|
|
arguments are not provided. Thus the following logic is used for :attr:`block`
|
|
|
and :attr:`targets`:
|
|
|
|
|
|
* If no keyword argument is provided, the instance attributes are used.
|
|
|
* Keyword argument, if provided override the instance attributes.
|
|
|
|
|
|
The following examples demonstrate how to use the instance attributes:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [16]: rc.targets = [0,2]
|
|
|
|
|
|
In [17]: rc.block = False
|
|
|
|
|
|
In [18]: pr = rc.execute('a=5')
|
|
|
|
|
|
In [19]: pr.r
|
|
|
Out[19]:
|
|
|
<Results List>
|
|
|
[0] In [6]: a=5
|
|
|
[2] In [6]: a=5
|
|
|
|
|
|
# Note targets='all' means all engines
|
|
|
In [20]: rc.targets = 'all'
|
|
|
|
|
|
In [21]: rc.block = True
|
|
|
|
|
|
In [22]: rc.execute('b=10; print b')
|
|
|
Out[22]:
|
|
|
<Results List>
|
|
|
[0] In [7]: b=10; print b
|
|
|
[0] Out[7]: 10
|
|
|
|
|
|
[1] In [6]: b=10; print b
|
|
|
[1] Out[6]: 10
|
|
|
|
|
|
[2] In [7]: b=10; print b
|
|
|
[2] Out[7]: 10
|
|
|
|
|
|
[3] In [6]: b=10; print b
|
|
|
[3] Out[6]: 10
|
|
|
|
|
|
The :attr:`block` and :attr:`targets` instance attributes also determine the
|
|
|
behavior of the parallel magic commands.
|
|
|
|
|
|
|
|
|
Parallel magic commands
|
|
|
-----------------------
|
|
|
|
|
|
.. warning::
|
|
|
|
|
|
The magics have not been changed to work with the zeromq system. ``%px``
|
|
|
and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
|
|
|
not* print stdin/out.
|
|
|
|
|
|
We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
|
|
|
that make it more pleasant to execute Python commands on the engines
|
|
|
interactively. These are simply shortcuts to :meth:`execute` and
|
|
|
:meth:`get_result`. The ``%px`` magic executes a single Python command on the
|
|
|
engines specified by the :attr:`targets` attribute of the
|
|
|
:class:`MultiEngineClient` instance (by default this is ``'all'``):
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
# Create a DirectView for all targets
|
|
|
In [22]: dv = rc[:]
|
|
|
|
|
|
# Make this DirectView active for parallel magic commands
|
|
|
In [23]: dv.activate()
|
|
|
|
|
|
In [24]: dv.block=True
|
|
|
|
|
|
In [25]: import numpy
|
|
|
|
|
|
In [26]: %px import numpy
|
|
|
Parallel execution on engines: [0, 1, 2, 3]
|
|
|
Out[26]:[None,None,None,None]
|
|
|
|
|
|
In [27]: %px a = numpy.random.rand(2,2)
|
|
|
Parallel execution on engines: [0, 1, 2, 3]
|
|
|
|
|
|
In [28]: %px ev = numpy.linalg.eigvals(a)
|
|
|
Parallel execution on engines: [0, 1, 2, 3]
|
|
|
|
|
|
In [28]: dv['ev']
|
|
|
Out[44]: [ array([ 1.09522024, -0.09645227]),
|
|
|
array([ 1.21435496, -0.35546712]),
|
|
|
array([ 0.72180653, 0.07133042]),
|
|
|
array([ 1.46384341e+00, 1.04353244e-04])
|
|
|
]
|
|
|
|
|
|
.. Note::
|
|
|
|
|
|
``%result`` doesn't work
|
|
|
|
|
|
The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
|
|
|
command executed on each engine. It is simply a shortcut to the
|
|
|
:meth:`get_result` method:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [29]: %result
|
|
|
Out[29]:
|
|
|
<Results List>
|
|
|
[0] In [10]: print numpy.linalg.eigvals(a)
|
|
|
[0] Out[10]: [ 1.28167017 0.14197338]
|
|
|
|
|
|
[1] In [9]: print numpy.linalg.eigvals(a)
|
|
|
[1] Out[9]: [-0.14093616 1.27877273]
|
|
|
|
|
|
[2] In [10]: print numpy.linalg.eigvals(a)
|
|
|
[2] Out[10]: [-0.37023573 1.06779409]
|
|
|
|
|
|
[3] In [9]: print numpy.linalg.eigvals(a)
|
|
|
[3] Out[9]: [ 0.83664764 -0.25602658]
|
|
|
|
|
|
The ``%autopx`` magic switches to a mode where everything you type is executed
|
|
|
on the engines given by the :attr:`targets` attribute:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [30]: dv.block=False
|
|
|
|
|
|
In [31]: %autopx
|
|
|
Auto Parallel Enabled
|
|
|
Type %autopx to disable
|
|
|
|
|
|
In [32]: max_evals = []
|
|
|
<IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17b8a70>
|
|
|
|
|
|
In [33]: for i in range(100):
|
|
|
....: a = numpy.random.rand(10,10)
|
|
|
....: a = a+a.transpose()
|
|
|
....: evals = numpy.linalg.eigvals(a)
|
|
|
....: max_evals.append(evals[0].real)
|
|
|
....:
|
|
|
....:
|
|
|
<IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17af8f0>
|
|
|
|
|
|
In [34]: %autopx
|
|
|
Auto Parallel Disabled
|
|
|
|
|
|
In [35]: dv.block=True
|
|
|
|
|
|
In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
|
|
|
Parallel execution on engines: [0, 1, 2, 3]
|
|
|
|
|
|
In [37]: dv['ans']
|
|
|
Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
|
|
|
'Average max eigenvalue is: 10.2076902286',
|
|
|
'Average max eigenvalue is: 10.1891484655',
|
|
|
'Average max eigenvalue is: 10.1158837784',]
|
|
|
|
|
|
|
|
|
.. Note::
|
|
|
|
|
|
Multiline ``%autpx`` gets fouled up by NameErrors, because IPython
|
|
|
currently introspects too much.
|
|
|
|
|
|
|
|
|
Moving Python objects around
|
|
|
============================
|
|
|
|
|
|
In addition to calling functions and executing code on engines, you can
|
|
|
transfer Python objects to and from your IPython session and the engines. In
|
|
|
IPython, these operations are called :meth:`push` (sending an object to the
|
|
|
engines) and :meth:`pull` (getting an object from the engines).
|
|
|
|
|
|
Basic push and pull
|
|
|
-------------------
|
|
|
|
|
|
Here are some examples of how you use :meth:`push` and :meth:`pull`:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [38]: rc.push(dict(a=1.03234,b=3453))
|
|
|
Out[38]: [None,None,None,None]
|
|
|
|
|
|
In [39]: rc.pull('a')
|
|
|
Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
|
|
|
|
|
|
In [40]: rc.pull('b',targets=0)
|
|
|
Out[40]: 3453
|
|
|
|
|
|
In [41]: rc.pull(('a','b'))
|
|
|
Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
|
|
|
|
|
|
# zmq client does not have zip_pull
|
|
|
In [42]: rc.zip_pull(('a','b'))
|
|
|
Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
|
|
|
|
|
|
In [43]: rc.push(dict(c='speed'))
|
|
|
Out[43]: [None,None,None,None]
|
|
|
|
|
|
In non-blocking mode :meth:`push` and :meth:`pull` also return
|
|
|
:class:`AsyncResult` objects:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [47]: rc.block=False
|
|
|
|
|
|
In [48]: pr = rc.pull('a')
|
|
|
|
|
|
In [49]: pr.get()
|
|
|
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Dictionary interface
|
|
|
--------------------
|
|
|
|
|
|
Since a namespace is just a :class:`dict`, :class:`DirectView` objects provide
|
|
|
dictionary-style access by key and methods such as :meth:`get` and
|
|
|
:meth:`update` for convenience. This make the remote namespaces of the engines
|
|
|
appear as a local dictionary. Underneath, this uses :meth:`push` and
|
|
|
:meth:`pull`:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [50]: rc.block=True
|
|
|
|
|
|
In [51]: rc[:]['a']=['foo','bar']
|
|
|
|
|
|
In [52]: rc[:]['a']
|
|
|
Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
|
|
|
|
|
|
Scatter and gather
|
|
|
------------------
|
|
|
|
|
|
Sometimes it is useful to partition a sequence and push the partitions to
|
|
|
different engines. In MPI language, this is know as scatter/gather and we
|
|
|
follow that terminology. However, it is important to remember that in
|
|
|
IPython's :class:`Client` class, :meth:`scatter` is from the
|
|
|
interactive IPython session to the engines and :meth:`gather` is from the
|
|
|
engines back to the interactive IPython session. For scatter/gather operations
|
|
|
between engines, MPI should be used:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [58]: rc.scatter('a',range(16))
|
|
|
Out[58]: [None,None,None,None]
|
|
|
|
|
|
In [59]: rc[:]['a']
|
|
|
Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
|
|
|
|
|
|
In [60]: rc.gather('a')
|
|
|
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
|
|
|
|
|
|
Other things to look at
|
|
|
=======================
|
|
|
|
|
|
How to do parallel list comprehensions
|
|
|
--------------------------------------
|
|
|
|
|
|
In many cases list comprehensions are nicer than using the map function. While
|
|
|
we don't have fully parallel list comprehensions, it is simple to get the
|
|
|
basic effect using :meth:`scatter` and :meth:`gather`:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [66]: rc.scatter('x',range(64))
|
|
|
Out[66]: [None,None,None,None]
|
|
|
|
|
|
In [67]: px y = [i**10 for i in x]
|
|
|
Parallel execution on engines: [0, 1, 2, 3]
|
|
|
Out[67]:
|
|
|
|
|
|
In [68]: y = rc.gather('y')
|
|
|
|
|
|
In [69]: print y
|
|
|
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
|
|
|
|
|
|
Parallel exceptions
|
|
|
-------------------
|
|
|
|
|
|
In the multiengine interface, parallel commands can raise Python exceptions,
|
|
|
just like serial commands. But, it is a little subtle, because a single
|
|
|
parallel command can actually raise multiple exceptions (one for each engine
|
|
|
the command was run on). To express this idea, the MultiEngine interface has a
|
|
|
:exc:`CompositeError` exception class that will be raised in most cases. The
|
|
|
:exc:`CompositeError` class is a special type of exception that wraps one or
|
|
|
more other types of exceptions. Here is how it works:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [76]: rc.block=True
|
|
|
|
|
|
In [77]: rc.execute('1/0')
|
|
|
---------------------------------------------------------------------------
|
|
|
CompositeError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
|
|
|
432 targets, block = self._findTargetsAndBlock(targets, block)
|
|
|
433 result = blockingCallFromThread(self.smultiengine.execute, lines,
|
|
|
--> 434 targets=targets, block=block)
|
|
|
435 if block:
|
|
|
436 result = ResultList(result)
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
|
|
|
72 result.raiseException()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
76
|
|
|
|
|
|
CompositeError: one or more exceptions from call to method: execute
|
|
|
[0:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[1:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[2:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[3:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
Notice how the error message printed when :exc:`CompositeError` is raised has
|
|
|
information about the individual exceptions that were raised on each engine.
|
|
|
If you want, you can even raise one of these original exceptions:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [80]: try:
|
|
|
....: rc.execute('1/0')
|
|
|
....: except client.CompositeError, e:
|
|
|
....: e.raise_exception()
|
|
|
....:
|
|
|
....:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
|
|
|
156 raise IndexError("an exception with index %i does not exist"%excid)
|
|
|
157 else:
|
|
|
--> 158 raise et, ev, etb
|
|
|
159
|
|
|
160 def collect_exceptions(rlist, method):
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
If you are working in IPython, you can simple type ``%debug`` after one of
|
|
|
these :exc:`CompositeError` exceptions is raised, and inspect the exception
|
|
|
instance:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [81]: rc.execute('1/0')
|
|
|
---------------------------------------------------------------------------
|
|
|
CompositeError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
|
|
|
432 targets, block = self._findTargetsAndBlock(targets, block)
|
|
|
433 result = blockingCallFromThread(self.smultiengine.execute, lines,
|
|
|
--> 434 targets=targets, block=block)
|
|
|
435 if block:
|
|
|
436 result = ResultList(result)
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
|
|
|
72 result.raiseException()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
76
|
|
|
|
|
|
CompositeError: one or more exceptions from call to method: execute
|
|
|
[0:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[1:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[2:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[3:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
In [82]: %debug
|
|
|
>
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
|
|
|
# With the debugger running, e is the exceptions instance. We can tab complete
|
|
|
# on it and see the extra methods that are available.
|
|
|
ipdb> e.
|
|
|
e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
|
|
|
e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
|
|
|
e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
|
|
|
e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
|
|
|
e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
|
|
|
ipdb> e.print_tracebacks()
|
|
|
[0:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
[1:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
[2:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
[3:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
|
|
|
All of this same error handling magic even works in non-blocking mode:
|
|
|
|
|
|
.. sourcecode:: ipython
|
|
|
|
|
|
In [83]: rc.block=False
|
|
|
|
|
|
In [84]: pr = rc.execute('1/0')
|
|
|
|
|
|
In [85]: pr.get()
|
|
|
---------------------------------------------------------------------------
|
|
|
CompositeError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
|
|
|
170
|
|
|
171 def _get_r(self):
|
|
|
--> 172 return self.get_result(block=True)
|
|
|
173
|
|
|
174 r = property(_get_r)
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
|
|
|
131 return self.result
|
|
|
132 try:
|
|
|
--> 133 result = self.client.get_pending_deferred(self.result_id, block)
|
|
|
134 except error.ResultNotCompleted:
|
|
|
135 return default
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
|
|
|
385
|
|
|
386 def get_pending_deferred(self, deferredID, block):
|
|
|
--> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
|
|
|
388
|
|
|
389 def barrier(self, pendingResults):
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
|
|
|
72 result.raiseException()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
76
|
|
|
|
|
|
CompositeError: one or more exceptions from call to method: execute
|
|
|
[0:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[1:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[2:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[3:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
|
|
|
|