parallel_multiengine.txt
828 lines
| 25.1 KiB
| text/plain
|
TextLexer
Brian E Granger
|
r1256 | .. _parallelmultiengine: | ||
Brian Granger
|
r1678 | =============================== | ||
IPython's multiengine interface | ||||
=============================== | ||||
Brian E Granger
|
r1256 | |||
Brian Granger
|
r1678 | The multiengine interface represents one possible way of working with a set of | ||
IPython engines. The basic idea behind the multiengine interface is that the | ||||
capabilities of each engine are directly and explicitly exposed to the user. | ||||
Thus, in the multiengine interface, each engine is given an id that is used to | ||||
identify the engine and give it work to do. This interface is very intuitive | ||||
and is designed with interactive usage in mind, and is thus the best place for | ||||
new users of IPython to begin. | ||||
Brian E Granger
|
r1256 | |||
Starting the IPython controller and engines | ||||
=========================================== | ||||
To follow along with this tutorial, you will need to start the IPython | ||||
Brian Granger
|
r1678 | controller and four IPython engines. The simplest way of doing this is to use | ||
the :command:`ipcluster` command:: | ||||
Brian E Granger
|
r1256 | |||
Brian Granger
|
r1788 | $ ipcluster local -n 4 | ||
Brian E Granger
|
r1256 | |||
Brian Granger
|
r1678 | For more detailed information about starting the controller and engines, see | ||
our :ref:`introduction <ip1par>` to using IPython for parallel computing. | ||||
Brian E Granger
|
r1256 | |||
Creating a ``MultiEngineClient`` instance | ||||
========================================= | ||||
Brian Granger
|
r1678 | The first step is to import the IPython :mod:`IPython.kernel.client` module | ||
Brian Granger
|
r1788 | and then create a :class:`MultiEngineClient` instance: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
Brian E Granger
|
r1338 | In [1]: from IPython.kernel import client | ||
Brian E Granger
|
r1256 | |||
In [2]: mec = client.MultiEngineClient() | ||||
Brian Granger
|
r1678 | This form assumes that the :file:`ipcontroller-mec.furl` is in the | ||
:file:`~./ipython/security` directory on the client's host. If not, the | ||||
Brian Granger
|
r1788 | location of the FURL file must be given as an argument to the | ||
constructor: | ||||
.. sourcecode:: ipython | ||||
Brian Granger
|
r1678 | |||
Brian Granger
|
r1788 | In [2]: mec = client.MultiEngineClient('/path/to/my/ipcontroller-mec.furl') | ||
Brian Granger
|
r1678 | |||
To make sure there are engines connected to the controller, use can get a list | ||||
Brian Granger
|
r1788 | of engine ids: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [3]: mec.get_ids() | ||||
Out[3]: [0, 1, 2, 3] | ||||
Here we see that there are four engines ready to do work for us. | ||||
Brian Granger
|
r1678 | Quick and easy parallelism | ||
========================== | ||||
In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. The multiengine interface provides two simple ways of accomplishing this: a parallel version of :func:`map` and ``@parallel`` function decorator. | ||||
Parallel map | ||||
------------ | ||||
Python's builtin :func:`map` functions allows a function to be applied to a | ||||
sequence element-by-element. This type of code is typically trivial to | ||||
parallelize. In fact, the multiengine interface in IPython already has a | ||||
Brian Granger
|
r1788 | parallel version of :meth:`map` that works just like its serial counterpart: | ||
.. sourcecode:: ipython | ||||
Brian Granger
|
r1678 | |||
In [63]: serial_result = map(lambda x:x**10, range(32)) | ||||
In [64]: parallel_result = mec.map(lambda x:x**10, range(32)) | ||||
In [65]: serial_result==parallel_result | ||||
Out[65]: True | ||||
.. note:: | ||||
The multiengine interface version of :meth:`map` does not do any load | ||||
balancing. For a load balanced version, see the task interface. | ||||
.. seealso:: | ||||
The :meth:`map` method has a number of options that can be controlled by | ||||
the :meth:`mapper` method. See its docstring for more information. | ||||
Parallel function decorator | ||||
--------------------------- | ||||
Brian Granger
|
r1788 | Parallel functions are just like normal function, but they can be called on sequences and *in parallel*. The multiengine interface provides a decorator that turns any Python function into a parallel function: | ||
.. sourcecode:: ipython | ||||
Brian Granger
|
r1678 | |||
In [10]: @mec.parallel() | ||||
....: def f(x): | ||||
....: return 10.0*x**4 | ||||
....: | ||||
In [11]: f(range(32)) # this is done in parallel | ||||
Out[11]: | ||||
[0.0,10.0,160.0,...] | ||||
See the docstring for the :meth:`parallel` decorator for options. | ||||
Brian E Granger
|
r1256 | Running Python commands | ||
======================= | ||||
Brian Granger
|
r1678 | The most basic type of operation that can be performed on the engines is to | ||
execute Python code. Executing Python code can be done in blocking or | ||||
non-blocking mode (blocking is default) using the :meth:`execute` method. | ||||
Brian E Granger
|
r1256 | |||
Blocking execution | ||||
------------------ | ||||
Brian Granger
|
r1678 | In blocking mode, the :class:`MultiEngineClient` object (called ``mec`` in | ||
Brian E Granger
|
r1256 | these examples) submits the command to the controller, which places the | ||
Brian Granger
|
r1678 | command in the engines' queues for execution. The :meth:`execute` call then | ||
Brian Granger
|
r1788 | blocks until the engines are done executing the command: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
# The default is to run on all engines | ||||
In [4]: mec.execute('a=5') | ||||
Out[4]: | ||||
<Results List> | ||||
[0] In [1]: a=5 | ||||
[1] In [1]: a=5 | ||||
[2] In [1]: a=5 | ||||
[3] In [1]: a=5 | ||||
In [5]: mec.execute('b=10') | ||||
Out[5]: | ||||
<Results List> | ||||
[0] In [2]: b=10 | ||||
[1] In [2]: b=10 | ||||
[2] In [2]: b=10 | ||||
[3] In [2]: b=10 | ||||
Brian Granger
|
r1678 | Python commands can be executed on specific engines by calling execute using | ||
Brian Granger
|
r1788 | the ``targets`` keyword argument: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [6]: mec.execute('c=a+b',targets=[0,2]) | ||||
Out[6]: | ||||
<Results List> | ||||
[0] In [3]: c=a+b | ||||
[2] In [3]: c=a+b | ||||
In [7]: mec.execute('c=a-b',targets=[1,3]) | ||||
Out[7]: | ||||
<Results List> | ||||
[1] In [3]: c=a-b | ||||
[3] In [3]: c=a-b | ||||
In [8]: mec.execute('print c') | ||||
Out[8]: | ||||
<Results List> | ||||
[0] In [4]: print c | ||||
[0] Out[4]: 15 | ||||
[1] In [4]: print c | ||||
[1] Out[4]: -5 | ||||
[2] In [4]: print c | ||||
[2] Out[4]: 15 | ||||
[3] In [4]: print c | ||||
[3] Out[4]: -5 | ||||
Brian Granger
|
r1678 | This example also shows one of the most important things about the IPython | ||
engines: they have a persistent user namespaces. The :meth:`execute` method | ||||
Brian Granger
|
r1788 | returns a Python ``dict`` that contains useful information: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [9]: result_dict = mec.execute('d=10; print d') | ||||
In [10]: for r in result_dict: | ||||
....: print r | ||||
....: | ||||
....: | ||||
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 0, 'stdout': '10\n'} | ||||
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 1, 'stdout': '10\n'} | ||||
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 2, 'stdout': '10\n'} | ||||
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 3, 'stdout': '10\n'} | ||||
Non-blocking execution | ||||
---------------------- | ||||
Brian Granger
|
r1678 | In non-blocking mode, :meth:`execute` submits the command to be executed and | ||
then returns a :class:`PendingResult` object immediately. The | ||||
:class:`PendingResult` object gives you a way of getting a result at a later | ||||
time through its :meth:`get_result` method or :attr:`r` attribute. This allows | ||||
you to quickly submit long running commands without blocking your local | ||||
Brian Granger
|
r1788 | Python/IPython session: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
# In blocking mode | ||||
In [6]: mec.execute('import time') | ||||
Out[6]: | ||||
<Results List> | ||||
[0] In [1]: import time | ||||
[1] In [1]: import time | ||||
[2] In [1]: import time | ||||
[3] In [1]: import time | ||||
# In non-blocking mode | ||||
In [7]: pr = mec.execute('time.sleep(10)',block=False) | ||||
# Now block for the result | ||||
In [8]: pr.get_result() | ||||
Out[8]: | ||||
<Results List> | ||||
[0] In [2]: time.sleep(10) | ||||
[1] In [2]: time.sleep(10) | ||||
[2] In [2]: time.sleep(10) | ||||
[3] In [2]: time.sleep(10) | ||||
# Again in non-blocking mode | ||||
In [9]: pr = mec.execute('time.sleep(10)',block=False) | ||||
# Poll to see if the result is ready | ||||
In [10]: pr.get_result(block=False) | ||||
# A shorthand for get_result(block=True) | ||||
In [11]: pr.r | ||||
Out[11]: | ||||
<Results List> | ||||
[0] In [3]: time.sleep(10) | ||||
[1] In [3]: time.sleep(10) | ||||
[2] In [3]: time.sleep(10) | ||||
[3] In [3]: time.sleep(10) | ||||
Brian Granger
|
r1678 | Often, it is desirable to wait until a set of :class:`PendingResult` objects | ||
are done. For this, there is a the method :meth:`barrier`. This method takes a | ||||
tuple of :class:`PendingResult` objects and blocks until all of the associated | ||||
Brian Granger
|
r1788 | results are ready: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [72]: mec.block=False | ||||
# A trivial list of PendingResults objects | ||||
In [73]: pr_list = [mec.execute('time.sleep(3)') for i in range(10)] | ||||
# Wait until all of them are done | ||||
In [74]: mec.barrier(pr_list) | ||||
# Then, their results are ready using get_result or the r attribute | ||||
In [75]: pr_list[0].r | ||||
Out[75]: | ||||
<Results List> | ||||
[0] In [20]: time.sleep(3) | ||||
[1] In [19]: time.sleep(3) | ||||
[2] In [20]: time.sleep(3) | ||||
[3] In [19]: time.sleep(3) | ||||
The ``block`` and ``targets`` keyword arguments and attributes | ||||
-------------------------------------------------------------- | ||||
Brian Granger
|
r1678 | Most methods in the multiengine interface (like :meth:`execute`) accept | ||
``block`` and ``targets`` as keyword arguments. As we have seen above, these | ||||
keyword arguments control the blocking mode and which engines the command is | ||||
applied to. The :class:`MultiEngineClient` class also has :attr:`block` and | ||||
:attr:`targets` attributes that control the default behavior when the keyword | ||||
arguments are not provided. Thus the following logic is used for :attr:`block` | ||||
and :attr:`targets`: | ||||
Brian E Granger
|
r1256 | |||
Brian Granger
|
r1678 | * If no keyword argument is provided, the instance attributes are used. | ||
* Keyword argument, if provided override the instance attributes. | ||||
Brian E Granger
|
r1256 | |||
Brian Granger
|
r1788 | The following examples demonstrate how to use the instance attributes: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [16]: mec.targets = [0,2] | ||||
In [17]: mec.block = False | ||||
In [18]: pr = mec.execute('a=5') | ||||
In [19]: pr.r | ||||
Out[19]: | ||||
<Results List> | ||||
[0] In [6]: a=5 | ||||
[2] In [6]: a=5 | ||||
# Note targets='all' means all engines | ||||
In [20]: mec.targets = 'all' | ||||
In [21]: mec.block = True | ||||
In [22]: mec.execute('b=10; print b') | ||||
Out[22]: | ||||
<Results List> | ||||
[0] In [7]: b=10; print b | ||||
[0] Out[7]: 10 | ||||
[1] In [6]: b=10; print b | ||||
[1] Out[6]: 10 | ||||
[2] In [7]: b=10; print b | ||||
[2] Out[7]: 10 | ||||
[3] In [6]: b=10; print b | ||||
[3] Out[6]: 10 | ||||
Brian Granger
|
r1678 | The :attr:`block` and :attr:`targets` instance attributes also determine the | ||
behavior of the parallel magic commands. | ||||
Brian E Granger
|
r1256 | |||
Parallel magic commands | ||||
----------------------- | ||||
Brian Granger
|
r1678 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) | ||
that make it more pleasant to execute Python commands on the engines | ||||
interactively. These are simply shortcuts to :meth:`execute` and | ||||
:meth:`get_result`. The ``%px`` magic executes a single Python command on the | ||||
engines specified by the :attr:`targets` attribute of the | ||||
Brian Granger
|
r1788 | :class:`MultiEngineClient` instance (by default this is ``'all'``): | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
# Make this MultiEngineClient active for parallel magic commands | ||||
In [23]: mec.activate() | ||||
In [24]: mec.block=True | ||||
In [25]: import numpy | ||||
In [26]: %px import numpy | ||||
Executing command on Controller | ||||
Out[26]: | ||||
<Results List> | ||||
[0] In [8]: import numpy | ||||
[1] In [7]: import numpy | ||||
[2] In [8]: import numpy | ||||
[3] In [7]: import numpy | ||||
In [27]: %px a = numpy.random.rand(2,2) | ||||
Executing command on Controller | ||||
Out[27]: | ||||
<Results List> | ||||
[0] In [9]: a = numpy.random.rand(2,2) | ||||
[1] In [8]: a = numpy.random.rand(2,2) | ||||
[2] In [9]: a = numpy.random.rand(2,2) | ||||
[3] In [8]: a = numpy.random.rand(2,2) | ||||
In [28]: %px print numpy.linalg.eigvals(a) | ||||
Executing command on Controller | ||||
Out[28]: | ||||
<Results List> | ||||
[0] In [10]: print numpy.linalg.eigvals(a) | ||||
[0] Out[10]: [ 1.28167017 0.14197338] | ||||
[1] In [9]: print numpy.linalg.eigvals(a) | ||||
[1] Out[9]: [-0.14093616 1.27877273] | ||||
[2] In [10]: print numpy.linalg.eigvals(a) | ||||
[2] Out[10]: [-0.37023573 1.06779409] | ||||
[3] In [9]: print numpy.linalg.eigvals(a) | ||||
[3] Out[9]: [ 0.83664764 -0.25602658] | ||||
Brian Granger
|
r1678 | The ``%result`` magic gets and prints the stdin/stdout/stderr of the last | ||
command executed on each engine. It is simply a shortcut to the | ||||
Brian Granger
|
r1788 | :meth:`get_result` method: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [29]: %result | ||||
Out[29]: | ||||
<Results List> | ||||
[0] In [10]: print numpy.linalg.eigvals(a) | ||||
[0] Out[10]: [ 1.28167017 0.14197338] | ||||
[1] In [9]: print numpy.linalg.eigvals(a) | ||||
[1] Out[9]: [-0.14093616 1.27877273] | ||||
[2] In [10]: print numpy.linalg.eigvals(a) | ||||
[2] Out[10]: [-0.37023573 1.06779409] | ||||
[3] In [9]: print numpy.linalg.eigvals(a) | ||||
[3] Out[9]: [ 0.83664764 -0.25602658] | ||||
Brian Granger
|
r1678 | The ``%autopx`` magic switches to a mode where everything you type is executed | ||
Brian Granger
|
r1788 | on the engines given by the :attr:`targets` attribute: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [30]: mec.block=False | ||||
In [31]: %autopx | ||||
Auto Parallel Enabled | ||||
Type %autopx to disable | ||||
In [32]: max_evals = [] | ||||
Brian E Granger
|
r1338 | <IPython.kernel.multiengineclient.PendingResult object at 0x17b8a70> | ||
Brian E Granger
|
r1256 | |||
In [33]: for i in range(100): | ||||
....: a = numpy.random.rand(10,10) | ||||
....: a = a+a.transpose() | ||||
....: evals = numpy.linalg.eigvals(a) | ||||
....: max_evals.append(evals[0].real) | ||||
....: | ||||
....: | ||||
Brian E Granger
|
r1338 | <IPython.kernel.multiengineclient.PendingResult object at 0x17af8f0> | ||
Brian E Granger
|
r1256 | |||
In [34]: %autopx | ||||
Auto Parallel Disabled | ||||
In [35]: mec.block=True | ||||
In [36]: px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals) | ||||
Executing command on Controller | ||||
Out[36]: | ||||
<Results List> | ||||
[0] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals) | ||||
[0] Out[13]: Average max eigenvalue is: 10.1387247332 | ||||
[1] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals) | ||||
[1] Out[12]: Average max eigenvalue is: 10.2076902286 | ||||
[2] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals) | ||||
[2] Out[13]: Average max eigenvalue is: 10.1891484655 | ||||
[3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals) | ||||
[3] Out[12]: Average max eigenvalue is: 10.1158837784 | ||||
Brian Granger
|
r1678 | Moving Python objects around | ||
============================ | ||||
Brian E Granger
|
r1256 | |||
Brian Granger
|
r1678 | In addition to executing code on engines, you can transfer Python objects to | ||
and from your IPython session and the engines. In IPython, these operations | ||||
are called :meth:`push` (sending an object to the engines) and :meth:`pull` | ||||
(getting an object from the engines). | ||||
Brian E Granger
|
r1256 | |||
Basic push and pull | ||||
------------------- | ||||
Brian Granger
|
r1788 | Here are some examples of how you use :meth:`push` and :meth:`pull`: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [38]: mec.push(dict(a=1.03234,b=3453)) | ||||
Out[38]: [None, None, None, None] | ||||
In [39]: mec.pull('a') | ||||
Out[39]: [1.03234, 1.03234, 1.03234, 1.03234] | ||||
In [40]: mec.pull('b',targets=0) | ||||
Out[40]: [3453] | ||||
In [41]: mec.pull(('a','b')) | ||||
Out[41]: [[1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453]] | ||||
In [42]: mec.zip_pull(('a','b')) | ||||
Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)] | ||||
In [43]: mec.push(dict(c='speed')) | ||||
Out[43]: [None, None, None, None] | ||||
In [44]: %px print c | ||||
Executing command on Controller | ||||
Out[44]: | ||||
<Results List> | ||||
[0] In [14]: print c | ||||
[0] Out[14]: speed | ||||
[1] In [13]: print c | ||||
[1] Out[13]: speed | ||||
[2] In [14]: print c | ||||
[2] Out[14]: speed | ||||
[3] In [13]: print c | ||||
[3] Out[13]: speed | ||||
Brian Granger
|
r1678 | In non-blocking mode :meth:`push` and :meth:`pull` also return | ||
Brian Granger
|
r1788 | :class:`PendingResult` objects: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [47]: mec.block=False | ||||
In [48]: pr = mec.pull('a') | ||||
In [49]: pr.r | ||||
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234] | ||||
Push and pull for functions | ||||
--------------------------- | ||||
Brian Granger
|
r1678 | Functions can also be pushed and pulled using :meth:`push_function` and | ||
Brian Granger
|
r1788 | :meth:`pull_function`: | ||
Brian Granger
|
r1678 | |||
Brian Granger
|
r1788 | .. sourcecode:: ipython | ||
Brian Granger
|
r1678 | |||
In [52]: mec.block=True | ||||
Brian E Granger
|
r1256 | |||
In [53]: def f(x): | ||||
....: return 2.0*x**4 | ||||
....: | ||||
In [54]: mec.push_function(dict(f=f)) | ||||
Out[54]: [None, None, None, None] | ||||
In [55]: mec.execute('y = f(4.0)') | ||||
Out[55]: | ||||
<Results List> | ||||
[0] In [15]: y = f(4.0) | ||||
[1] In [14]: y = f(4.0) | ||||
[2] In [15]: y = f(4.0) | ||||
[3] In [14]: y = f(4.0) | ||||
In [56]: px print y | ||||
Executing command on Controller | ||||
Out[56]: | ||||
<Results List> | ||||
[0] In [16]: print y | ||||
[0] Out[16]: 512.0 | ||||
[1] In [15]: print y | ||||
[1] Out[15]: 512.0 | ||||
[2] In [16]: print y | ||||
[2] Out[16]: 512.0 | ||||
[3] In [15]: print y | ||||
[3] Out[15]: 512.0 | ||||
Dictionary interface | ||||
-------------------- | ||||
Brian Granger
|
r1678 | As a shorthand to :meth:`push` and :meth:`pull`, the | ||
:class:`MultiEngineClient` class implements some of the Python dictionary | ||||
interface. This make the remote namespaces of the engines appear as a local | ||||
Brian Granger
|
r1788 | dictionary. Underneath, this uses :meth:`push` and :meth:`pull`: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [50]: mec.block=True | ||||
In [51]: mec['a']=['foo','bar'] | ||||
In [52]: mec['a'] | ||||
Out[52]: [['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar']] | ||||
Scatter and gather | ||||
------------------ | ||||
Brian Granger
|
r1678 | Sometimes it is useful to partition a sequence and push the partitions to | ||
different engines. In MPI language, this is know as scatter/gather and we | ||||
follow that terminology. However, it is important to remember that in | ||||
IPython's :class:`MultiEngineClient` class, :meth:`scatter` is from the | ||||
interactive IPython session to the engines and :meth:`gather` is from the | ||||
engines back to the interactive IPython session. For scatter/gather operations | ||||
Brian Granger
|
r1788 | between engines, MPI should be used: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [58]: mec.scatter('a',range(16)) | ||||
Out[58]: [None, None, None, None] | ||||
In [59]: px print a | ||||
Executing command on Controller | ||||
Out[59]: | ||||
<Results List> | ||||
[0] In [17]: print a | ||||
[0] Out[17]: [0, 1, 2, 3] | ||||
[1] In [16]: print a | ||||
[1] Out[16]: [4, 5, 6, 7] | ||||
[2] In [17]: print a | ||||
[2] Out[17]: [8, 9, 10, 11] | ||||
[3] In [16]: print a | ||||
[3] Out[16]: [12, 13, 14, 15] | ||||
In [60]: mec.gather('a') | ||||
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] | ||||
Other things to look at | ||||
======================= | ||||
How to do parallel list comprehensions | ||||
-------------------------------------- | ||||
Brian Granger
|
r1678 | In many cases list comprehensions are nicer than using the map function. While | ||
we don't have fully parallel list comprehensions, it is simple to get the | ||||
Brian Granger
|
r1788 | basic effect using :meth:`scatter` and :meth:`gather`: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [66]: mec.scatter('x',range(64)) | ||||
Out[66]: [None, None, None, None] | ||||
In [67]: px y = [i**10 for i in x] | ||||
Executing command on Controller | ||||
Out[67]: | ||||
<Results List> | ||||
[0] In [19]: y = [i**10 for i in x] | ||||
[1] In [18]: y = [i**10 for i in x] | ||||
[2] In [19]: y = [i**10 for i in x] | ||||
[3] In [18]: y = [i**10 for i in x] | ||||
In [68]: y = mec.gather('y') | ||||
In [69]: print y | ||||
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] | ||||
Brian Granger
|
r1678 | Parallel exceptions | ||
Brian E Granger
|
r1256 | ------------------- | ||
Brian Granger
|
r1678 | In the multiengine interface, parallel commands can raise Python exceptions, | ||
just like serial commands. But, it is a little subtle, because a single | ||||
parallel command can actually raise multiple exceptions (one for each engine | ||||
the command was run on). To express this idea, the MultiEngine interface has a | ||||
:exc:`CompositeError` exception class that will be raised in most cases. The | ||||
:exc:`CompositeError` class is a special type of exception that wraps one or | ||||
Brian Granger
|
r1788 | more other types of exceptions. Here is how it works: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [76]: mec.block=True | ||||
In [77]: mec.execute('1/0') | ||||
--------------------------------------------------------------------------- | ||||
CompositeError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block) | ||||
432 targets, block = self._findTargetsAndBlock(targets, block) | ||||
433 result = blockingCallFromThread(self.smultiengine.execute, lines, | ||||
--> 434 targets=targets, block=block) | ||||
435 if block: | ||||
436 result = ResultList(result) | ||||
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw) | ||||
72 result.raiseException() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
76 | ||||
CompositeError: one or more exceptions from call to method: execute | ||||
[0:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
Brian Granger
|
r1788 | Notice how the error message printed when :exc:`CompositeError` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [80]: try: | ||||
....: mec.execute('1/0') | ||||
....: except client.CompositeError, e: | ||||
....: e.raise_exception() | ||||
....: | ||||
....: | ||||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||||
/ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid) | ||||
156 raise IndexError("an exception with index %i does not exist"%excid) | ||||
157 else: | ||||
--> 158 raise et, ev, etb | ||||
159 | ||||
160 def collect_exceptions(rlist, method): | ||||
ZeroDivisionError: integer division or modulo by zero | ||||
Brian Granger
|
r1678 | If you are working in IPython, you can simple type ``%debug`` after one of | ||
these :exc:`CompositeError` exceptions is raised, and inspect the exception | ||||
Brian Granger
|
r1788 | instance: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [81]: mec.execute('1/0') | ||||
--------------------------------------------------------------------------- | ||||
CompositeError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block) | ||||
432 targets, block = self._findTargetsAndBlock(targets, block) | ||||
433 result = blockingCallFromThread(self.smultiengine.execute, lines, | ||||
--> 434 targets=targets, block=block) | ||||
435 if block: | ||||
436 result = ResultList(result) | ||||
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw) | ||||
72 result.raiseException() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
76 | ||||
CompositeError: one or more exceptions from call to method: execute | ||||
[0:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
In [82]: %debug | ||||
> | ||||
/ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
# With the debugger running, e is the exceptions instance. We can tab complete | ||||
# on it and see the extra methods that are available. | ||||
ipdb> e. | ||||
e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args | ||||
e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist | ||||
e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message | ||||
e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks | ||||
e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception | ||||
ipdb> e.print_tracebacks() | ||||
[0:execute]: | ||||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<string> in <module>() | ||||
ZeroDivisionError: integer division or modulo by zero | ||||
[1:execute]: | ||||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<string> in <module>() | ||||
ZeroDivisionError: integer division or modulo by zero | ||||
[2:execute]: | ||||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<string> in <module>() | ||||
ZeroDivisionError: integer division or modulo by zero | ||||
[3:execute]: | ||||
--------------------------------------------------------------------------- | ||||
ZeroDivisionError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<string> in <module>() | ||||
ZeroDivisionError: integer division or modulo by zero | ||||
Brian Granger
|
r1678 | .. note:: | ||
The above example appears to be broken right now because of a change in | ||||
how we are using Twisted. | ||||
Brian Granger
|
r1788 | All of this same error handling magic even works in non-blocking mode: | ||
.. sourcecode:: ipython | ||||
Brian E Granger
|
r1256 | |||
In [83]: mec.block=False | ||||
In [84]: pr = mec.execute('1/0') | ||||
In [85]: pr.r | ||||
--------------------------------------------------------------------------- | ||||
CompositeError Traceback (most recent call last) | ||||
/ipython1-client-r3021/docs/examples/<ipython console> in <module>() | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self) | ||||
170 | ||||
171 def _get_r(self): | ||||
--> 172 return self.get_result(block=True) | ||||
173 | ||||
174 r = property(_get_r) | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block) | ||||
131 return self.result | ||||
132 try: | ||||
--> 133 result = self.client.get_pending_deferred(self.result_id, block) | ||||
134 except error.ResultNotCompleted: | ||||
135 return default | ||||
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block) | ||||
385 | ||||
386 def get_pending_deferred(self, deferredID, block): | ||||
--> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block) | ||||
388 | ||||
389 def barrier(self, pendingResults): | ||||
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw) | ||||
72 result.raiseException() | ||||
73 except Exception, e: | ||||
---> 74 raise e | ||||
75 return result | ||||
76 | ||||
CompositeError: one or more exceptions from call to method: execute | ||||
[0:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:execute]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:execute]: ZeroDivisionError: integer division or modulo by zero | ||||