|
|
.. _parallelmultiengine:
|
|
|
|
|
|
===============================
|
|
|
IPython's multiengine interface
|
|
|
===============================
|
|
|
|
|
|
.. contents::
|
|
|
|
|
|
The multiengine interface represents one possible way of working with a set of
|
|
|
IPython engines. The basic idea behind the multiengine interface is that the
|
|
|
capabilities of each engine are directly and explicitly exposed to the user.
|
|
|
Thus, in the multiengine interface, each engine is given an id that is used to
|
|
|
identify the engine and give it work to do. This interface is very intuitive
|
|
|
and is designed with interactive usage in mind, and is thus the best place for
|
|
|
new users of IPython to begin.
|
|
|
|
|
|
Starting the IPython controller and engines
|
|
|
===========================================
|
|
|
|
|
|
To follow along with this tutorial, you will need to start the IPython
|
|
|
controller and four IPython engines. The simplest way of doing this is to use
|
|
|
the :command:`ipcluster` command::
|
|
|
|
|
|
$ ipcluster -n 4
|
|
|
|
|
|
For more detailed information about starting the controller and engines, see
|
|
|
our :ref:`introduction <ip1par>` to using IPython for parallel computing.
|
|
|
|
|
|
Creating a ``MultiEngineClient`` instance
|
|
|
=========================================
|
|
|
|
|
|
The first step is to import the IPython :mod:`IPython.kernel.client` module
|
|
|
and then create a :class:`MultiEngineClient` instance::
|
|
|
|
|
|
In [1]: from IPython.kernel import client
|
|
|
|
|
|
In [2]: mec = client.MultiEngineClient()
|
|
|
|
|
|
This form assumes that the :file:`ipcontroller-mec.furl` is in the
|
|
|
:file:`~./ipython/security` directory on the client's host. If not, the
|
|
|
location of the ``.furl`` file must be given as an argument to the
|
|
|
constructor::
|
|
|
|
|
|
In[2]: mec = client.MultiEngineClient('/path/to/my/ipcontroller-mec.furl')
|
|
|
|
|
|
To make sure there are engines connected to the controller, use can get a list
|
|
|
of engine ids::
|
|
|
|
|
|
In [3]: mec.get_ids()
|
|
|
Out[3]: [0, 1, 2, 3]
|
|
|
|
|
|
Here we see that there are four engines ready to do work for us.
|
|
|
|
|
|
Quick and easy parallelism
|
|
|
==========================
|
|
|
|
|
|
In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. The multiengine interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator.
|
|
|
|
|
|
Parallel map
|
|
|
------------
|
|
|
|
|
|
Python's builtin :func:`map` functions allows a function to be applied to a
|
|
|
sequence element-by-element. This type of code is typically trivial to
|
|
|
parallelize. In fact, the multiengine interface in IPython already has a
|
|
|
parallel version of :meth:`map` that works just like its serial counterpart::
|
|
|
|
|
|
In [63]: serial_result = map(lambda x:x**10, range(32))
|
|
|
|
|
|
In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
|
|
|
|
|
|
In [65]: serial_result==parallel_result
|
|
|
Out[65]: True
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
The multiengine interface version of :meth:`map` does not do any load
|
|
|
balancing. For a load balanced version, see the task interface.
|
|
|
|
|
|
.. seealso::
|
|
|
|
|
|
The :meth:`map` method has a number of options that can be controlled by
|
|
|
the :meth:`mapper` method. See its docstring for more information.
|
|
|
|
|
|
Parallel function decorator
|
|
|
---------------------------
|
|
|
|
|
|
Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function::
|
|
|
|
|
|
In [10]: @mec.parallel()
|
|
|
....: def f(x):
|
|
|
....: return 10.0*x**4
|
|
|
....:
|
|
|
|
|
|
In [11]: f(range(32)) # this is done in parallel
|
|
|
Out[11]:
|
|
|
[0.0,10.0,160.0,...]
|
|
|
|
|
|
See the docstring for the :meth:`parallel` decorator for options.
|
|
|
|
|
|
Running Python commands
|
|
|
=======================
|
|
|
|
|
|
The most basic type of operation that can be performed on the engines is to
|
|
|
execute Python code. Executing Python code can be done in blocking or
|
|
|
non-blocking mode (blocking is default) using the :meth:`execute` method.
|
|
|
|
|
|
Blocking execution
|
|
|
------------------
|
|
|
|
|
|
In blocking mode, the :class:`MultiEngineClient` object (called ``mec`` in
|
|
|
these examples) submits the command to the controller, which places the
|
|
|
command in the engines' queues for execution. The :meth:`execute` call then
|
|
|
blocks until the engines are done executing the command::
|
|
|
|
|
|
# The default is to run on all engines
|
|
|
In [4]: mec.execute('a=5')
|
|
|
Out[4]:
|
|
|
<Results List>
|
|
|
[0] In [1]: a=5
|
|
|
[1] In [1]: a=5
|
|
|
[2] In [1]: a=5
|
|
|
[3] In [1]: a=5
|
|
|
|
|
|
In [5]: mec.execute('b=10')
|
|
|
Out[5]:
|
|
|
<Results List>
|
|
|
[0] In [2]: b=10
|
|
|
[1] In [2]: b=10
|
|
|
[2] In [2]: b=10
|
|
|
[3] In [2]: b=10
|
|
|
|
|
|
Python commands can be executed on specific engines by calling execute using
|
|
|
the ``targets`` keyword argument::
|
|
|
|
|
|
In [6]: mec.execute('c=a+b',targets=[0,2])
|
|
|
Out[6]:
|
|
|
<Results List>
|
|
|
[0] In [3]: c=a+b
|
|
|
[2] In [3]: c=a+b
|
|
|
|
|
|
|
|
|
In [7]: mec.execute('c=a-b',targets=[1,3])
|
|
|
Out[7]:
|
|
|
<Results List>
|
|
|
[1] In [3]: c=a-b
|
|
|
[3] In [3]: c=a-b
|
|
|
|
|
|
|
|
|
In [8]: mec.execute('print c')
|
|
|
Out[8]:
|
|
|
<Results List>
|
|
|
[0] In [4]: print c
|
|
|
[0] Out[4]: 15
|
|
|
|
|
|
[1] In [4]: print c
|
|
|
[1] Out[4]: -5
|
|
|
|
|
|
[2] In [4]: print c
|
|
|
[2] Out[4]: 15
|
|
|
|
|
|
[3] In [4]: print c
|
|
|
[3] Out[4]: -5
|
|
|
|
|
|
This example also shows one of the most important things about the IPython
|
|
|
engines: they have a persistent user namespaces. The :meth:`execute` method
|
|
|
returns a Python ``dict`` that contains useful information::
|
|
|
|
|
|
In [9]: result_dict = mec.execute('d=10; print d')
|
|
|
|
|
|
In [10]: for r in result_dict:
|
|
|
....: print r
|
|
|
....:
|
|
|
....:
|
|
|
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 0, 'stdout': '10\n'}
|
|
|
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 1, 'stdout': '10\n'}
|
|
|
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 2, 'stdout': '10\n'}
|
|
|
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 3, 'stdout': '10\n'}
|
|
|
|
|
|
Non-blocking execution
|
|
|
----------------------
|
|
|
|
|
|
In non-blocking mode, :meth:`execute` submits the command to be executed and
|
|
|
then returns a :class:`PendingResult` object immediately. The
|
|
|
:class:`PendingResult` object gives you a way of getting a result at a later
|
|
|
time through its :meth:`get_result` method or :attr:`r` attribute. This allows
|
|
|
you to quickly submit long running commands without blocking your local
|
|
|
Python/IPython session::
|
|
|
|
|
|
# In blocking mode
|
|
|
In [6]: mec.execute('import time')
|
|
|
Out[6]:
|
|
|
<Results List>
|
|
|
[0] In [1]: import time
|
|
|
[1] In [1]: import time
|
|
|
[2] In [1]: import time
|
|
|
[3] In [1]: import time
|
|
|
|
|
|
# In non-blocking mode
|
|
|
In [7]: pr = mec.execute('time.sleep(10)',block=False)
|
|
|
|
|
|
# Now block for the result
|
|
|
In [8]: pr.get_result()
|
|
|
Out[8]:
|
|
|
<Results List>
|
|
|
[0] In [2]: time.sleep(10)
|
|
|
[1] In [2]: time.sleep(10)
|
|
|
[2] In [2]: time.sleep(10)
|
|
|
[3] In [2]: time.sleep(10)
|
|
|
|
|
|
# Again in non-blocking mode
|
|
|
In [9]: pr = mec.execute('time.sleep(10)',block=False)
|
|
|
|
|
|
# Poll to see if the result is ready
|
|
|
In [10]: pr.get_result(block=False)
|
|
|
|
|
|
# A shorthand for get_result(block=True)
|
|
|
In [11]: pr.r
|
|
|
Out[11]:
|
|
|
<Results List>
|
|
|
[0] In [3]: time.sleep(10)
|
|
|
[1] In [3]: time.sleep(10)
|
|
|
[2] In [3]: time.sleep(10)
|
|
|
[3] In [3]: time.sleep(10)
|
|
|
|
|
|
Often, it is desirable to wait until a set of :class:`PendingResult` objects
|
|
|
are done. For this, there is a the method :meth:`barrier`. This method takes a
|
|
|
tuple of :class:`PendingResult` objects and blocks until all of the associated
|
|
|
results are ready::
|
|
|
|
|
|
In [72]: mec.block=False
|
|
|
|
|
|
# A trivial list of PendingResults objects
|
|
|
In [73]: pr_list = [mec.execute('time.sleep(3)') for i in range(10)]
|
|
|
|
|
|
# Wait until all of them are done
|
|
|
In [74]: mec.barrier(pr_list)
|
|
|
|
|
|
# Then, their results are ready using get_result or the r attribute
|
|
|
In [75]: pr_list[0].r
|
|
|
Out[75]:
|
|
|
<Results List>
|
|
|
[0] In [20]: time.sleep(3)
|
|
|
[1] In [19]: time.sleep(3)
|
|
|
[2] In [20]: time.sleep(3)
|
|
|
[3] In [19]: time.sleep(3)
|
|
|
|
|
|
|
|
|
The ``block`` and ``targets`` keyword arguments and attributes
|
|
|
--------------------------------------------------------------
|
|
|
|
|
|
Most methods in the multiengine interface (like :meth:`execute`) accept
|
|
|
``block`` and ``targets`` as keyword arguments. As we have seen above, these
|
|
|
keyword arguments control the blocking mode and which engines the command is
|
|
|
applied to. The :class:`MultiEngineClient` class also has :attr:`block` and
|
|
|
:attr:`targets` attributes that control the default behavior when the keyword
|
|
|
arguments are not provided. Thus the following logic is used for :attr:`block`
|
|
|
and :attr:`targets`:
|
|
|
|
|
|
* If no keyword argument is provided, the instance attributes are used.
|
|
|
* Keyword argument, if provided override the instance attributes.
|
|
|
|
|
|
The following examples demonstrate how to use the instance attributes::
|
|
|
|
|
|
In [16]: mec.targets = [0,2]
|
|
|
|
|
|
In [17]: mec.block = False
|
|
|
|
|
|
In [18]: pr = mec.execute('a=5')
|
|
|
|
|
|
In [19]: pr.r
|
|
|
Out[19]:
|
|
|
<Results List>
|
|
|
[0] In [6]: a=5
|
|
|
[2] In [6]: a=5
|
|
|
|
|
|
# Note targets='all' means all engines
|
|
|
In [20]: mec.targets = 'all'
|
|
|
|
|
|
In [21]: mec.block = True
|
|
|
|
|
|
In [22]: mec.execute('b=10; print b')
|
|
|
Out[22]:
|
|
|
<Results List>
|
|
|
[0] In [7]: b=10; print b
|
|
|
[0] Out[7]: 10
|
|
|
|
|
|
[1] In [6]: b=10; print b
|
|
|
[1] Out[6]: 10
|
|
|
|
|
|
[2] In [7]: b=10; print b
|
|
|
[2] Out[7]: 10
|
|
|
|
|
|
[3] In [6]: b=10; print b
|
|
|
[3] Out[6]: 10
|
|
|
|
|
|
The :attr:`block` and :attr:`targets` instance attributes also determine the
|
|
|
behavior of the parallel magic commands.
|
|
|
|
|
|
|
|
|
Parallel magic commands
|
|
|
-----------------------
|
|
|
|
|
|
We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
|
|
|
that make it more pleasant to execute Python commands on the engines
|
|
|
interactively. These are simply shortcuts to :meth:`execute` and
|
|
|
:meth:`get_result`. The ``%px`` magic executes a single Python command on the
|
|
|
engines specified by the :attr:`targets` attribute of the
|
|
|
:class:`MultiEngineClient` instance (by default this is ``'all'``)::
|
|
|
|
|
|
# Make this MultiEngineClient active for parallel magic commands
|
|
|
In [23]: mec.activate()
|
|
|
|
|
|
In [24]: mec.block=True
|
|
|
|
|
|
In [25]: import numpy
|
|
|
|
|
|
In [26]: %px import numpy
|
|
|
Executing command on Controller
|
|
|
Out[26]:
|
|
|
<Results List>
|
|
|
[0] In [8]: import numpy
|
|
|
[1] In [7]: import numpy
|
|
|
[2] In [8]: import numpy
|
|
|
[3] In [7]: import numpy
|
|
|
|
|
|
|
|
|
In [27]: %px a = numpy.random.rand(2,2)
|
|
|
Executing command on Controller
|
|
|
Out[27]:
|
|
|
<Results List>
|
|
|
[0] In [9]: a = numpy.random.rand(2,2)
|
|
|
[1] In [8]: a = numpy.random.rand(2,2)
|
|
|
[2] In [9]: a = numpy.random.rand(2,2)
|
|
|
[3] In [8]: a = numpy.random.rand(2,2)
|
|
|
|
|
|
|
|
|
In [28]: %px print numpy.linalg.eigvals(a)
|
|
|
Executing command on Controller
|
|
|
Out[28]:
|
|
|
<Results List>
|
|
|
[0] In [10]: print numpy.linalg.eigvals(a)
|
|
|
[0] Out[10]: [ 1.28167017 0.14197338]
|
|
|
|
|
|
[1] In [9]: print numpy.linalg.eigvals(a)
|
|
|
[1] Out[9]: [-0.14093616 1.27877273]
|
|
|
|
|
|
[2] In [10]: print numpy.linalg.eigvals(a)
|
|
|
[2] Out[10]: [-0.37023573 1.06779409]
|
|
|
|
|
|
[3] In [9]: print numpy.linalg.eigvals(a)
|
|
|
[3] Out[9]: [ 0.83664764 -0.25602658]
|
|
|
|
|
|
The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
|
|
|
command executed on each engine. It is simply a shortcut to the
|
|
|
:meth:`get_result` method::
|
|
|
|
|
|
In [29]: %result
|
|
|
Out[29]:
|
|
|
<Results List>
|
|
|
[0] In [10]: print numpy.linalg.eigvals(a)
|
|
|
[0] Out[10]: [ 1.28167017 0.14197338]
|
|
|
|
|
|
[1] In [9]: print numpy.linalg.eigvals(a)
|
|
|
[1] Out[9]: [-0.14093616 1.27877273]
|
|
|
|
|
|
[2] In [10]: print numpy.linalg.eigvals(a)
|
|
|
[2] Out[10]: [-0.37023573 1.06779409]
|
|
|
|
|
|
[3] In [9]: print numpy.linalg.eigvals(a)
|
|
|
[3] Out[9]: [ 0.83664764 -0.25602658]
|
|
|
|
|
|
The ``%autopx`` magic switches to a mode where everything you type is executed
|
|
|
on the engines given by the :attr:`targets` attribute::
|
|
|
|
|
|
In [30]: mec.block=False
|
|
|
|
|
|
In [31]: %autopx
|
|
|
Auto Parallel Enabled
|
|
|
Type %autopx to disable
|
|
|
|
|
|
In [32]: max_evals = []
|
|
|
<IPython.kernel.multiengineclient.PendingResult object at 0x17b8a70>
|
|
|
|
|
|
In [33]: for i in range(100):
|
|
|
....: a = numpy.random.rand(10,10)
|
|
|
....: a = a+a.transpose()
|
|
|
....: evals = numpy.linalg.eigvals(a)
|
|
|
....: max_evals.append(evals[0].real)
|
|
|
....:
|
|
|
....:
|
|
|
<IPython.kernel.multiengineclient.PendingResult object at 0x17af8f0>
|
|
|
|
|
|
In [34]: %autopx
|
|
|
Auto Parallel Disabled
|
|
|
|
|
|
In [35]: mec.block=True
|
|
|
|
|
|
In [36]: px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
|
|
|
Executing command on Controller
|
|
|
Out[36]:
|
|
|
<Results List>
|
|
|
[0] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
|
|
|
[0] Out[13]: Average max eigenvalue is: 10.1387247332
|
|
|
|
|
|
[1] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
|
|
|
[1] Out[12]: Average max eigenvalue is: 10.2076902286
|
|
|
|
|
|
[2] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
|
|
|
[2] Out[13]: Average max eigenvalue is: 10.1891484655
|
|
|
|
|
|
[3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
|
|
|
[3] Out[12]: Average max eigenvalue is: 10.1158837784
|
|
|
|
|
|
|
|
|
Moving Python objects around
|
|
|
============================
|
|
|
|
|
|
In addition to executing code on engines, you can transfer Python objects to
|
|
|
and from your IPython session and the engines. In IPython, these operations
|
|
|
are called :meth:`push` (sending an object to the engines) and :meth:`pull`
|
|
|
(getting an object from the engines).
|
|
|
|
|
|
Basic push and pull
|
|
|
-------------------
|
|
|
|
|
|
Here are some examples of how you use :meth:`push` and :meth:`pull`::
|
|
|
|
|
|
In [38]: mec.push(dict(a=1.03234,b=3453))
|
|
|
Out[38]: [None, None, None, None]
|
|
|
|
|
|
In [39]: mec.pull('a')
|
|
|
Out[39]: [1.03234, 1.03234, 1.03234, 1.03234]
|
|
|
|
|
|
In [40]: mec.pull('b',targets=0)
|
|
|
Out[40]: [3453]
|
|
|
|
|
|
In [41]: mec.pull(('a','b'))
|
|
|
Out[41]: [[1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453]]
|
|
|
|
|
|
In [42]: mec.zip_pull(('a','b'))
|
|
|
Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
|
|
|
|
|
|
In [43]: mec.push(dict(c='speed'))
|
|
|
Out[43]: [None, None, None, None]
|
|
|
|
|
|
In [44]: %px print c
|
|
|
Executing command on Controller
|
|
|
Out[44]:
|
|
|
<Results List>
|
|
|
[0] In [14]: print c
|
|
|
[0] Out[14]: speed
|
|
|
|
|
|
[1] In [13]: print c
|
|
|
[1] Out[13]: speed
|
|
|
|
|
|
[2] In [14]: print c
|
|
|
[2] Out[14]: speed
|
|
|
|
|
|
[3] In [13]: print c
|
|
|
[3] Out[13]: speed
|
|
|
|
|
|
In non-blocking mode :meth:`push` and :meth:`pull` also return
|
|
|
:class:`PendingResult` objects::
|
|
|
|
|
|
In [47]: mec.block=False
|
|
|
|
|
|
In [48]: pr = mec.pull('a')
|
|
|
|
|
|
In [49]: pr.r
|
|
|
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
|
|
|
|
|
|
|
|
|
Push and pull for functions
|
|
|
---------------------------
|
|
|
|
|
|
Functions can also be pushed and pulled using :meth:`push_function` and
|
|
|
:meth:`pull_function`::
|
|
|
|
|
|
|
|
|
In [52]: mec.block=True
|
|
|
|
|
|
In [53]: def f(x):
|
|
|
....: return 2.0*x**4
|
|
|
....:
|
|
|
|
|
|
In [54]: mec.push_function(dict(f=f))
|
|
|
Out[54]: [None, None, None, None]
|
|
|
|
|
|
In [55]: mec.execute('y = f(4.0)')
|
|
|
Out[55]:
|
|
|
<Results List>
|
|
|
[0] In [15]: y = f(4.0)
|
|
|
[1] In [14]: y = f(4.0)
|
|
|
[2] In [15]: y = f(4.0)
|
|
|
[3] In [14]: y = f(4.0)
|
|
|
|
|
|
|
|
|
In [56]: px print y
|
|
|
Executing command on Controller
|
|
|
Out[56]:
|
|
|
<Results List>
|
|
|
[0] In [16]: print y
|
|
|
[0] Out[16]: 512.0
|
|
|
|
|
|
[1] In [15]: print y
|
|
|
[1] Out[15]: 512.0
|
|
|
|
|
|
[2] In [16]: print y
|
|
|
[2] Out[16]: 512.0
|
|
|
|
|
|
[3] In [15]: print y
|
|
|
[3] Out[15]: 512.0
|
|
|
|
|
|
|
|
|
Dictionary interface
|
|
|
--------------------
|
|
|
|
|
|
As a shorthand to :meth:`push` and :meth:`pull`, the
|
|
|
:class:`MultiEngineClient` class implements some of the Python dictionary
|
|
|
interface. This make the remote namespaces of the engines appear as a local
|
|
|
dictionary. Underneath, this uses :meth:`push` and :meth:`pull`::
|
|
|
|
|
|
In [50]: mec.block=True
|
|
|
|
|
|
In [51]: mec['a']=['foo','bar']
|
|
|
|
|
|
In [52]: mec['a']
|
|
|
Out[52]: [['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar']]
|
|
|
|
|
|
Scatter and gather
|
|
|
------------------
|
|
|
|
|
|
Sometimes it is useful to partition a sequence and push the partitions to
|
|
|
different engines. In MPI language, this is know as scatter/gather and we
|
|
|
follow that terminology. However, it is important to remember that in
|
|
|
IPython's :class:`MultiEngineClient` class, :meth:`scatter` is from the
|
|
|
interactive IPython session to the engines and :meth:`gather` is from the
|
|
|
engines back to the interactive IPython session. For scatter/gather operations
|
|
|
between engines, MPI should be used::
|
|
|
|
|
|
In [58]: mec.scatter('a',range(16))
|
|
|
Out[58]: [None, None, None, None]
|
|
|
|
|
|
In [59]: px print a
|
|
|
Executing command on Controller
|
|
|
Out[59]:
|
|
|
<Results List>
|
|
|
[0] In [17]: print a
|
|
|
[0] Out[17]: [0, 1, 2, 3]
|
|
|
|
|
|
[1] In [16]: print a
|
|
|
[1] Out[16]: [4, 5, 6, 7]
|
|
|
|
|
|
[2] In [17]: print a
|
|
|
[2] Out[17]: [8, 9, 10, 11]
|
|
|
|
|
|
[3] In [16]: print a
|
|
|
[3] Out[16]: [12, 13, 14, 15]
|
|
|
|
|
|
|
|
|
In [60]: mec.gather('a')
|
|
|
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
|
|
|
|
|
|
Other things to look at
|
|
|
=======================
|
|
|
|
|
|
How to do parallel list comprehensions
|
|
|
--------------------------------------
|
|
|
|
|
|
In many cases list comprehensions are nicer than using the map function. While
|
|
|
we don't have fully parallel list comprehensions, it is simple to get the
|
|
|
basic effect using :meth:`scatter` and :meth:`gather`::
|
|
|
|
|
|
In [66]: mec.scatter('x',range(64))
|
|
|
Out[66]: [None, None, None, None]
|
|
|
|
|
|
In [67]: px y = [i**10 for i in x]
|
|
|
Executing command on Controller
|
|
|
Out[67]:
|
|
|
<Results List>
|
|
|
[0] In [19]: y = [i**10 for i in x]
|
|
|
[1] In [18]: y = [i**10 for i in x]
|
|
|
[2] In [19]: y = [i**10 for i in x]
|
|
|
[3] In [18]: y = [i**10 for i in x]
|
|
|
|
|
|
|
|
|
In [68]: y = mec.gather('y')
|
|
|
|
|
|
In [69]: print y
|
|
|
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
|
|
|
|
|
|
Parallel exceptions
|
|
|
-------------------
|
|
|
|
|
|
In the multiengine interface, parallel commands can raise Python exceptions,
|
|
|
just like serial commands. But, it is a little subtle, because a single
|
|
|
parallel command can actually raise multiple exceptions (one for each engine
|
|
|
the command was run on). To express this idea, the MultiEngine interface has a
|
|
|
:exc:`CompositeError` exception class that will be raised in most cases. The
|
|
|
:exc:`CompositeError` class is a special type of exception that wraps one or
|
|
|
more other types of exceptions. Here is how it works::
|
|
|
|
|
|
In [76]: mec.block=True
|
|
|
|
|
|
In [77]: mec.execute('1/0')
|
|
|
---------------------------------------------------------------------------
|
|
|
CompositeError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
|
|
|
432 targets, block = self._findTargetsAndBlock(targets, block)
|
|
|
433 result = blockingCallFromThread(self.smultiengine.execute, lines,
|
|
|
--> 434 targets=targets, block=block)
|
|
|
435 if block:
|
|
|
436 result = ResultList(result)
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
|
|
|
72 result.raiseException()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
76
|
|
|
|
|
|
CompositeError: one or more exceptions from call to method: execute
|
|
|
[0:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[1:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[2:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[3:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
Notice how the error message printed when :exc:`CompositeError` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
|
|
|
|
|
|
In [80]: try:
|
|
|
....: mec.execute('1/0')
|
|
|
....: except client.CompositeError, e:
|
|
|
....: e.raise_exception()
|
|
|
....:
|
|
|
....:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
|
|
|
156 raise IndexError("an exception with index %i does not exist"%excid)
|
|
|
157 else:
|
|
|
--> 158 raise et, ev, etb
|
|
|
159
|
|
|
160 def collect_exceptions(rlist, method):
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
If you are working in IPython, you can simple type ``%debug`` after one of
|
|
|
these :exc:`CompositeError` exceptions is raised, and inspect the exception
|
|
|
instance::
|
|
|
|
|
|
In [81]: mec.execute('1/0')
|
|
|
---------------------------------------------------------------------------
|
|
|
CompositeError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
|
|
|
432 targets, block = self._findTargetsAndBlock(targets, block)
|
|
|
433 result = blockingCallFromThread(self.smultiengine.execute, lines,
|
|
|
--> 434 targets=targets, block=block)
|
|
|
435 if block:
|
|
|
436 result = ResultList(result)
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
|
|
|
72 result.raiseException()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
76
|
|
|
|
|
|
CompositeError: one or more exceptions from call to method: execute
|
|
|
[0:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[1:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[2:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[3:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
In [82]: %debug
|
|
|
>
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
|
|
|
# With the debugger running, e is the exceptions instance. We can tab complete
|
|
|
# on it and see the extra methods that are available.
|
|
|
ipdb> e.
|
|
|
e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
|
|
|
e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
|
|
|
e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
|
|
|
e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
|
|
|
e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
|
|
|
ipdb> e.print_tracebacks()
|
|
|
[0:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
[1:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
[2:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
[3:execute]:
|
|
|
---------------------------------------------------------------------------
|
|
|
ZeroDivisionError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<string> in <module>()
|
|
|
|
|
|
ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
The above example appears to be broken right now because of a change in
|
|
|
how we are using Twisted.
|
|
|
|
|
|
All of this same error handling magic even works in non-blocking mode::
|
|
|
|
|
|
In [83]: mec.block=False
|
|
|
|
|
|
In [84]: pr = mec.execute('1/0')
|
|
|
|
|
|
In [85]: pr.r
|
|
|
---------------------------------------------------------------------------
|
|
|
CompositeError Traceback (most recent call last)
|
|
|
|
|
|
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
|
|
|
170
|
|
|
171 def _get_r(self):
|
|
|
--> 172 return self.get_result(block=True)
|
|
|
173
|
|
|
174 r = property(_get_r)
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
|
|
|
131 return self.result
|
|
|
132 try:
|
|
|
--> 133 result = self.client.get_pending_deferred(self.result_id, block)
|
|
|
134 except error.ResultNotCompleted:
|
|
|
135 return default
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
|
|
|
385
|
|
|
386 def get_pending_deferred(self, deferredID, block):
|
|
|
--> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
|
|
|
388
|
|
|
389 def barrier(self, pendingResults):
|
|
|
|
|
|
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
|
|
|
72 result.raiseException()
|
|
|
73 except Exception, e:
|
|
|
---> 74 raise e
|
|
|
75 return result
|
|
|
76
|
|
|
|
|
|
CompositeError: one or more exceptions from call to method: execute
|
|
|
[0:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[1:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[2:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
[3:execute]: ZeroDivisionError: integer division or modulo by zero
|
|
|
|
|
|
|
|
|
|