parallel_multiengine.txt
785 lines
| 26.6 KiB
| text/plain
|
TextLexer
MinRK
|
r3586 | .. _parallelmultiengine: | ||
=============================== | ||||
IPython's multiengine interface | ||||
=============================== | ||||
The multiengine interface represents one possible way of working with a set of | ||||
IPython engines. The basic idea behind the multiengine interface is that the | ||||
capabilities of each engine are directly and explicitly exposed to the user. | ||||
Thus, in the multiengine interface, each engine is given an id that is used to | ||||
identify the engine and give it work to do. This interface is very intuitive | ||||
and is designed with interactive usage in mind, and is thus the best place for | ||||
new users of IPython to begin. | ||||
Starting the IPython controller and engines | ||||
=========================================== | ||||
To follow along with this tutorial, you will need to start the IPython | ||||
controller and four IPython engines. The simplest way of doing this is to use | ||||
MinRK
|
r3591 | the :command:`ipclusterz` command:: | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | $ ipclusterz -n 4 | ||
MinRK
|
r3586 | For more detailed information about starting the controller and engines, see | ||
our :ref:`introduction <ip1par>` to using IPython for parallel computing. | ||||
MinRK
|
r3591 | Creating a ``Client`` instance | ||
============================== | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | The first step is to import the IPython :mod:`IPython.zmq.parallel.client` | ||
module and then create a :class:`.Client` instance: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [1]: from IPython.zmq.parallel import client | ||
In [2]: rc = client.Client() | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | This form assumes that the controller was started on localhost with default | ||
configuration. If not, the location of the controller must be given as an | ||||
argument to the constructor: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | # for a visible LAN controller listening on an external port: | ||
In [2]: rc = client.Client('tcp://192.168.1.16:10101') | ||||
# for a remote controller at my.server.com listening on localhost: | ||||
In [3]: rc = client.Client(sshserver='my.server.com') | ||||
MinRK
|
r3586 | |||
To make sure there are engines connected to the controller, use can get a list | ||||
of engine ids: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [3]: rc.ids | ||
Out[3]: set([0, 1, 2, 3]) | ||||
MinRK
|
r3586 | |||
Here we see that there are four engines ready to do work for us. | ||||
Quick and easy parallelism | ||||
========================== | ||||
In many cases, you simply want to apply a Python function to a sequence of | ||||
MinRK
|
r3591 | objects, but *in parallel*. The client interface provides a simple way | ||
of accomplishing this: useing the builtin :func:`map` and the ``@remote`` | ||||
MinRK
|
r3586 | function decorator. | ||
Parallel map | ||||
------------ | ||||
Python's builtin :func:`map` functions allows a function to be applied to a | ||||
sequence element-by-element. This type of code is typically trivial to | ||||
MinRK
|
r3591 | parallelize. In fact, since IPython's interface is all about functions anyway, you can just use the builtin :func:`map`, or a client's :map: method: | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [62]: serial_result = map(lambda x:x**10, range(32)) | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [66]: parallel_result = rc.map(lambda x: x**10, range(32)) | ||
In [67]: serial_result==parallel_result | ||||
Out[67]: True | ||||
MinRK
|
r3586 | |||
.. note:: | ||||
MinRK
|
r3591 | The client's own version of :meth:`map` or that of :class:`.DirectView` do | ||
not do any load balancing. For a load balanced version, use a | ||||
:class:`LoadBalancedView`, or a :class:`ParallelFunction` with | ||||
`targets=None`. | ||||
MinRK
|
r3586 | |||
.. seealso:: | ||||
MinRK
|
r3591 | |||
:meth:`map` is implemented via :class:`.ParallelFunction`. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Remote function decorator | ||
------------------------- | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Remote functions are just like normal functions, but when they are called, | ||
they execute on one or more engines, rather than locally. IPython provides | ||||
some decorators: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [10]: @rc.remote(block=True) | ||
MinRK
|
r3586 | ....: def f(x): | ||
....: return 10.0*x**4 | ||||
....: | ||||
MinRK
|
r3591 | In [11]: map(f, range(32)) # this is done in parallel | ||
MinRK
|
r3586 | Out[11]: | ||
[0.0,10.0,160.0,...] | ||||
MinRK
|
r3591 | See the docstring for the :func:`parallel` and :func:`remote` decorators for | ||
options. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Calling Python functions | ||
======================== | ||||
MinRK
|
r3586 | |||
The most basic type of operation that can be performed on the engines is to | ||||
MinRK
|
r3591 | execute Python code or call Python functions. Executing Python code can be | ||
done in blocking or non-blocking mode (non-blocking is default) using the | ||||
:meth:`execute` method, and calling functions can be done via the | ||||
:meth:`.View.apply` method. | ||||
MinRK
|
r3586 | |||
Blocking execution | ||||
------------------ | ||||
MinRK
|
r3591 | In blocking mode, the :class:`.DirectView` object (called ``dview`` in | ||
MinRK
|
r3586 | these examples) submits the command to the controller, which places the | ||
MinRK
|
r3591 | command in the engines' queues for execution. The :meth:`apply` call then | ||
MinRK
|
r3586 | blocks until the engines are done executing the command: | ||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [2]: rc.block=True | ||
In [3]: dview = rc[:] # A DirectView of all engines | ||||
In [4]: dview['a'] = 5 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [5]: dview['b'] = 10 | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [6]: dview.apply_bound(lambda x: a+b+x, 27) | ||
Out[6]: {0: 42, 1: 42, 2: 42, 3: 42} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Python commands can be executed on specific engines by calling execute using | ||
the ``targets`` keyword argument, or creating a :class:`DirectView` instance | ||||
by index-access to the client: | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | .. sourcecode:: ipython | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [6]: rc.execute('c=a+b',targets=[0,2]) | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [7]: rc.execute('c=a-b',targets=[1,3]) | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [8]: rc[:]['c'] | ||
Out[8]: {0: 15, 1: -5, 2: 15, 3: -5} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | .. note:: | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Note that every call to ``rc.<meth>(...,targets=x)`` can be made via | ||
``rc[<x>].<meth>(...)``, which constructs a View object. The only place | ||||
where this differs in in :meth:`apply`. The :class:`Client` takes many | ||||
arguments to apply, so it requires `args` and `kwargs` to be passed as | ||||
individual arguments. Extended options such as `bound`,`targets`, and | ||||
`block` are controlled by the attributes of the :class:`View` objects, so | ||||
they can provide the much more convenient | ||||
:meth:`View.apply(f,*args,**kwargs)`, which simply calls | ||||
``f(*args,**kwargs)`` remotely. | ||||
MinRK
|
r3586 | |||
This example also shows one of the most important things about the IPython | ||||
MinRK
|
r3591 | engines: they have a persistent user namespaces. The :meth:`apply` method can | ||
be run in either a bound or unbound way. The default for a View is to be | ||||
unbound, unless called by the :meth:`apply_bound` method: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [9]: rc[:]['b'] = 5 # assign b to 5 everywhere | ||
In [10]: v0 = rc[0] | ||||
In [12]: v0.apply_bound(lambda : b) | ||||
Out[12]: 5 | ||||
In [13]: v0.apply(lambda : b) | ||||
--------------------------------------------------------------------------- | ||||
RemoteError Traceback (most recent call last) | ||||
/home/you/<ipython-input-34-21a468eb10f0> in <module>() | ||||
----> 1 v0.apply(lambda : b) | ||||
... | ||||
RemoteError: NameError(global name 'b' is not defined) | ||||
Traceback (most recent call last): | ||||
File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request | ||||
exec code in working, working | ||||
File "<string>", line 1, in <module> | ||||
File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda> | ||||
NameError: global name 'b' is not defined | ||||
Specifically, `bound=True` specifies that the engine's namespace is to be used | ||||
for execution, and `bound=False` specifies that the engine's namespace is not | ||||
to be used (hence, 'b' is undefined during unbound execution, since the | ||||
function is called in an empty namespace). Unbound execution is often useful | ||||
for large numbers of atomic tasks, which prevents bloating the engine's | ||||
memory, while bound execution lets you build on your previous work. | ||||
MinRK
|
r3586 | |||
Non-blocking execution | ||||
---------------------- | ||||
MinRK
|
r3591 | In non-blocking mode, :meth:`apply` submits the command to be executed and | ||
then returns a :class:`AsyncResult` object immediately. The | ||||
:class:`AsyncResult` object gives you a way of getting a result at a later | ||||
time through its :meth:`get` method. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | .. Note:: | ||
The :class:`AsyncResult` object provides the exact same interface as | ||||
:py:class:`multiprocessing.pool.AsyncResult`. See the | ||||
`official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_ | ||||
for more. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | |||
This allows you to quickly submit long running commands without blocking your | ||||
local Python/IPython session: | ||||
.. sourcecode:: ipython | ||||
# define our function | ||||
In [35]: def wait(t): | ||||
....: import time | ||||
....: tic = time.time() | ||||
....: time.sleep(t) | ||||
....: return time.time()-tic | ||||
# In blocking mode | ||||
In [6]: rc.apply('import time') | ||||
# In non-blocking mode | ||||
In [7]: pr = rc[:].apply_async(wait, 2) | ||||
# Now block for the result | ||||
In [8]: pr.get() | ||||
Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] | ||||
# Again in non-blocking mode | ||||
In [9]: pr = rc[:].apply_async(wait, 10) | ||||
# Poll to see if the result is ready | ||||
In [10]: pr.ready() | ||||
Out[10]: False | ||||
# ask for the result, but wait a maximum of 1 second: | ||||
In [45]: pr.get(1) | ||||
--------------------------------------------------------------------------- | ||||
TimeoutError Traceback (most recent call last) | ||||
/home/you/<ipython-input-45-7cd858bbb8e0> in <module>() | ||||
----> 1 pr.get(1) | ||||
/path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) | ||||
62 raise self._exception | ||||
63 else: | ||||
---> 64 raise error.TimeoutError("Result not ready.") | ||||
65 | ||||
66 def ready(self): | ||||
TimeoutError: Result not ready. | ||||
.. Note:: | ||||
Note the import inside the function. This is a common model, to ensure | ||||
that the appropriate modules are imported where the task is run. | ||||
Often, it is desirable to wait until a set of :class:`AsyncResult` objects | ||||
MinRK
|
r3586 | are done. For this, there is a the method :meth:`barrier`. This method takes a | ||
MinRK
|
r3591 | tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the associated | ||
MinRK
|
r3586 | results are ready: | ||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [72]: rc.block=False | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | # A trivial list of AsyncResults objects | ||
In [73]: pr_list = [rc[:].apply_async(wait, 3) for i in range(10)] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | # Wait until all of them are done | ||
In [74]: rc.barrier(pr_list) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | # Then, their results are ready using get_result or the r attribute | ||
In [75]: pr_list[0].get() | ||||
Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752] | ||||
MinRK
|
r3586 | |||
The ``block`` and ``targets`` keyword arguments and attributes | ||||
-------------------------------------------------------------- | ||||
MinRK
|
r3591 | .. warning:: | ||
This is different now, I haven't updated this section. | ||||
-MinRK | ||||
Most methods(like :meth:`apply`) accept | ||||
MinRK
|
r3586 | ``block`` and ``targets`` as keyword arguments. As we have seen above, these | ||
keyword arguments control the blocking mode and which engines the command is | ||||
MinRK
|
r3591 | applied to. The :class:`Client` class also has :attr:`block` and | ||
MinRK
|
r3586 | :attr:`targets` attributes that control the default behavior when the keyword | ||
arguments are not provided. Thus the following logic is used for :attr:`block` | ||||
and :attr:`targets`: | ||||
* If no keyword argument is provided, the instance attributes are used. | ||||
* Keyword argument, if provided override the instance attributes. | ||||
The following examples demonstrate how to use the instance attributes: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [16]: rc.targets = [0,2] | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [17]: rc.block = False | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [18]: pr = rc.execute('a=5') | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [19]: pr.r | ||
Out[19]: | ||||
<Results List> | ||||
[0] In [6]: a=5 | ||||
[2] In [6]: a=5 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | # Note targets='all' means all engines | ||
In [20]: rc.targets = 'all' | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [21]: rc.block = True | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [22]: rc.execute('b=10; print b') | ||
Out[22]: | ||||
<Results List> | ||||
[0] In [7]: b=10; print b | ||||
[0] Out[7]: 10 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [1] In [6]: b=10; print b | ||
[1] Out[6]: 10 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [2] In [7]: b=10; print b | ||
[2] Out[7]: 10 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [3] In [6]: b=10; print b | ||
[3] Out[6]: 10 | ||||
MinRK
|
r3586 | |||
The :attr:`block` and :attr:`targets` instance attributes also determine the | ||||
behavior of the parallel magic commands. | ||||
Parallel magic commands | ||||
----------------------- | ||||
MinRK
|
r3591 | .. warning:: | ||
The magics have not been changed to work with the zeromq system. ``%px`` | ||||
and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do | ||||
not* print stdin/out. | ||||
MinRK
|
r3586 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) | ||
that make it more pleasant to execute Python commands on the engines | ||||
interactively. These are simply shortcuts to :meth:`execute` and | ||||
:meth:`get_result`. The ``%px`` magic executes a single Python command on the | ||||
engines specified by the :attr:`targets` attribute of the | ||||
:class:`MultiEngineClient` instance (by default this is ``'all'``): | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | # Create a DirectView for all targets | ||
In [22]: dv = rc[:] | ||||
# Make this DirectView active for parallel magic commands | ||||
In [23]: dv.activate() | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [24]: dv.block=True | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [25]: import numpy | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [26]: %px import numpy | ||
Parallel execution on engines: [0, 1, 2, 3] | ||||
Out[26]:{0: None, 1: None, 2: None, 3: None} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [27]: %px a = numpy.random.rand(2,2) | ||
Parallel execution on engines: [0, 1, 2, 3] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [28]: %px ev = numpy.linalg.eigvals(a) | ||
Parallel execution on engines: [0, 1, 2, 3] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [28]: dv['ev'] | ||
Out[44]: {0: array([ 1.09522024, -0.09645227]), | ||||
1: array([ 1.21435496, -0.35546712]), | ||||
2: array([ 0.72180653, 0.07133042]), | ||||
3: array([ 1.46384341e+00, 1.04353244e-04])} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | .. Note:: | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | ``%result`` doesn't work | ||
MinRK
|
r3586 | |||
The ``%result`` magic gets and prints the stdin/stdout/stderr of the last | ||||
command executed on each engine. It is simply a shortcut to the | ||||
:meth:`get_result` method: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [29]: %result | ||
Out[29]: | ||||
<Results List> | ||||
[0] In [10]: print numpy.linalg.eigvals(a) | ||||
[0] Out[10]: [ 1.28167017 0.14197338] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [1] In [9]: print numpy.linalg.eigvals(a) | ||
[1] Out[9]: [-0.14093616 1.27877273] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [2] In [10]: print numpy.linalg.eigvals(a) | ||
[2] Out[10]: [-0.37023573 1.06779409] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [3] In [9]: print numpy.linalg.eigvals(a) | ||
[3] Out[9]: [ 0.83664764 -0.25602658] | ||||
MinRK
|
r3586 | |||
The ``%autopx`` magic switches to a mode where everything you type is executed | ||||
on the engines given by the :attr:`targets` attribute: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [30]: dv.block=False | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [31]: %autopx | ||
Auto Parallel Enabled | ||||
Type %autopx to disable | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [32]: max_evals = [] | ||
<IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17b8a70> | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [33]: for i in range(100): | ||
....: a = numpy.random.rand(10,10) | ||||
....: a = a+a.transpose() | ||||
....: evals = numpy.linalg.eigvals(a) | ||||
....: max_evals.append(evals[0].real) | ||||
....: | ||||
....: | ||||
<IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17af8f0> | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [34]: %autopx | ||
Auto Parallel Disabled | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [35]: dv.block=True | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals)) | ||
Parallel execution on engines: [0, 1, 2, 3] | ||||
In [37]: dv['ans'] | ||||
Out[37]: {0 : 'Average max eigenvalue is: 10.1387247332', | ||||
1 : 'Average max eigenvalue is: 10.2076902286', | ||||
2 : 'Average max eigenvalue is: 10.1891484655', | ||||
3 : 'Average max eigenvalue is: 10.1158837784',} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | .. Note:: | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Multiline ``%autpx`` gets fouled up by NameErrors, because IPython | ||
currently introspects too much. | ||||
MinRK
|
r3586 | |||
Moving Python objects around | ||||
============================ | ||||
MinRK
|
r3591 | In addition to calling functions and executing code on engines, you can | ||
transfer Python objects to and from your IPython session and the engines. In | ||||
IPython, these operations are called :meth:`push` (sending an object to the | ||||
engines) and :meth:`pull` (getting an object from the engines). | ||||
MinRK
|
r3586 | |||
Basic push and pull | ||||
------------------- | ||||
Here are some examples of how you use :meth:`push` and :meth:`pull`: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [38]: rc.push(dict(a=1.03234,b=3453)) | ||
Out[38]: {0: None, 1: None, 2: None, 3: None} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [39]: rc.pull('a') | ||
Out[39]: {0: 1.03234, 1: 1.03234, 2: 1.03234, 3: 1.03234} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [40]: rc.pull('b',targets=0) | ||
Out[40]: 3453 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [41]: rc.pull(('a','b')) | ||
Out[41]: {0: [1.03234, 3453], 1: [1.03234, 3453], 2: [1.03234, 3453], 3:[1.03234, 3453]} | ||||
# zmq client does not have zip_pull | ||||
In [42]: rc.zip_pull(('a','b')) | ||||
Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [43]: rc.push(dict(c='speed')) | ||
Out[43]: {0: None, 1: None, 2: None, 3: None} | ||||
MinRK
|
r3586 | |||
In non-blocking mode :meth:`push` and :meth:`pull` also return | ||||
MinRK
|
r3591 | :class:`AsyncResult` objects: | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [47]: rc.block=False | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [48]: pr = rc.pull('a') | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [49]: pr.get() | ||
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234] | ||||
MinRK
|
r3586 | |||
Dictionary interface | ||||
-------------------- | ||||
MinRK
|
r3591 | Since a namespace is just a :class:`dict`, :class:`DirectView` objects provide | ||
dictionary-style access by key and methods such as :meth:`get` and | ||||
:meth:`update` for convenience. This make the remote namespaces of the engines | ||||
appear as a local dictionary. Underneath, this uses :meth:`push` and | ||||
:meth:`pull`: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [50]: rc.block=True | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [51]: rc[:]['a']=['foo','bar'] | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [52]: rc[:]['a'] | ||
Out[52]: {0: ['foo', 'bar'], 1: ['foo', 'bar'], 2: ['foo', 'bar'], 3: ['foo', 'bar']} | ||||
MinRK
|
r3586 | |||
Scatter and gather | ||||
------------------ | ||||
Sometimes it is useful to partition a sequence and push the partitions to | ||||
different engines. In MPI language, this is know as scatter/gather and we | ||||
follow that terminology. However, it is important to remember that in | ||||
MinRK
|
r3591 | IPython's :class:`Client` class, :meth:`scatter` is from the | ||
MinRK
|
r3586 | interactive IPython session to the engines and :meth:`gather` is from the | ||
engines back to the interactive IPython session. For scatter/gather operations | ||||
between engines, MPI should be used: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [58]: rc.scatter('a',range(16)) | ||
Out[58]: {0: None, 1: None, 2: None, 3: None} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [59]: rc[:]['a'] | ||
Out[59]: {0: [0, 1, 2, 3], | ||||
1: [4, 5, 6, 7], | ||||
2: [8, 9, 10, 11], | ||||
3: [12, 13, 14, 15]} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [60]: rc.gather('a') | ||
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] | ||||
MinRK
|
r3586 | |||
Other things to look at | ||||
======================= | ||||
How to do parallel list comprehensions | ||||
-------------------------------------- | ||||
In many cases list comprehensions are nicer than using the map function. While | ||||
we don't have fully parallel list comprehensions, it is simple to get the | ||||
basic effect using :meth:`scatter` and :meth:`gather`: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [66]: rc.scatter('x',range(64)) | ||
Out[66]: {0: None, 1: None, 2: None, 3: None} | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [67]: px y = [i**10 for i in x] | ||
Executing command on Controller | ||||
Out[67]: | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [68]: y = rc.gather('y') | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [69]: print y | ||
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] | ||||
MinRK
|
r3586 | |||
Parallel exceptions | ||||
------------------- | ||||
In the multiengine interface, parallel commands can raise Python exceptions, | ||||
just like serial commands. But, it is a little subtle, because a single | ||||
parallel command can actually raise multiple exceptions (one for each engine | ||||
the command was run on). To express this idea, the MultiEngine interface has a | ||||
:exc:`CompositeError` exception class that will be raised in most cases. The | ||||
:exc:`CompositeError` class is a special type of exception that wraps one or | ||||
more other types of exceptions. Here is how it works: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [76]: rc.block=True | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [77]: rc.execute('1/0') | ||
--------------------------------------------------------------------------- | ||||
CompositeError Traceback (most recent call last) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block) | ||
432 targets, block = self._findTargetsAndBlock(targets, block) | ||||
433 result = blockingCallFromThread(self.smultiengine.execute, lines, | ||||
--> 434 targets=targets, block=block) | ||||
435 if block: | ||||
436 result = ResultList(result) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw) | ||
72 result.raiseException() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
76 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | CompositeError: one or more exceptions from call to method: execute | ||
[0:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
MinRK
|
r3586 | |||
Notice how the error message printed when :exc:`CompositeError` is raised has | ||||
information about the individual exceptions that were raised on each engine. | ||||
If you want, you can even raise one of these original exceptions: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [80]: try: | ||
....: rc.execute('1/0') | ||||
....: except client.CompositeError, e: | ||||
....: e.raise_exception() | ||||
....: | ||||
....: | ||||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid) | ||
156 raise IndexError("an exception with index %i does not exist"%excid) | ||||
157 else: | ||||
--> 158 raise et, ev, etb | ||||
159 | ||||
160 def collect_exceptions(rlist, method): | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
If you are working in IPython, you can simple type ``%debug`` after one of | ||||
these :exc:`CompositeError` exceptions is raised, and inspect the exception | ||||
instance: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [81]: rc.execute('1/0') | ||
--------------------------------------------------------------------------- | ||||
CompositeError Traceback (most recent call last) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block) | ||
432 targets, block = self._findTargetsAndBlock(targets, block) | ||||
433 result = blockingCallFromThread(self.smultiengine.execute, lines, | ||||
--> 434 targets=targets, block=block) | ||||
435 if block: | ||||
436 result = ResultList(result) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw) | ||
72 result.raiseException() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
76 | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | CompositeError: one or more exceptions from call to method: execute | ||
[0:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [82]: %debug | ||
> | ||||
/ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | # With the debugger running, e is the exceptions instance. We can tab complete | ||
# on it and see the extra methods that are available. | ||||
ipdb> e. | ||||
e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args | ||||
e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist | ||||
e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message | ||||
e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks | ||||
e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception | ||||
ipdb> e.print_tracebacks() | ||||
[0:execute]: | ||||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/docs/examples/<string> in <module>() | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [1:execute]: | ||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/docs/examples/<string> in <module>() | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [2:execute]: | ||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/docs/examples/<string> in <module>() | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | [3:execute]: | ||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | /ipython1-client-r3021/docs/examples/<string> in <module>() | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
.. note:: | ||||
The above example appears to be broken right now because of a change in | ||||
how we are using Twisted. | ||||
All of this same error handling magic even works in non-blocking mode: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [83]: rc.block=False | ||
In [84]: pr = rc.execute('1/0') | ||||
In [85]: pr.get() | ||||
--------------------------------------------------------------------------- | ||||
CompositeError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self) | ||||
170 | ||||
171 def _get_r(self): | ||||
--> 172 return self.get_result(block=True) | ||||
173 | ||||
174 r = property(_get_r) | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block) | ||||
131 return self.result | ||||
132 try: | ||||
--> 133 result = self.client.get_pending_deferred(self.result_id, block) | ||||
134 except error.ResultNotCompleted: | ||||
135 return default | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block) | ||||
385 | ||||
386 def get_pending_deferred(self, deferredID, block): | ||||
--> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block) | ||||
388 | ||||
389 def barrier(self, pendingResults): | ||||
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw) | ||||
72 result.raiseException() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
76 | ||||
CompositeError: one or more exceptions from call to method: execute | ||||
[0:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
MinRK
|
r3586 | |||