##// END OF EJS Templates
Added "ctr-K" to delete current buffer.
Added "ctr-K" to delete current buffer.

File last commit:

r1338:72652d65
r1363:68247b0c
Show More
parallel_multiengine.txt
728 lines | 23.8 KiB | text/plain | TextLexer
/ docs / source / parallel / parallel_multiengine.txt
.. _parallelmultiengine:
=================================
IPython's MultiEngine interface
=================================
.. contents::
The MultiEngine interface represents one possible way of working with a
set of IPython engines. The basic idea behind the MultiEngine interface is
that the capabilities of each engine are explicitly exposed to the user.
Thus, in the MultiEngine interface, each engine is given an id that is
used to identify the engine and give it work to do. This interface is very
intuitive and is designed with interactive usage in mind, and is thus the
best place for new users of IPython to begin.
Starting the IPython controller and engines
===========================================
To follow along with this tutorial, you will need to start the IPython
controller and four IPython engines. The simplest way of doing this is to
use the ``ipcluster`` command::
$ ipcluster -n 4
For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
Creating a ``MultiEngineClient`` instance
=========================================
The first step is to import the IPython ``client`` module and then create a ``MultiEngineClient`` instance::
In [1]: from IPython.kernel import client
In [2]: mec = client.MultiEngineClient()
To make sure there are engines connected to the controller, use can get a list of engine ids::
In [3]: mec.get_ids()
Out[3]: [0, 1, 2, 3]
Here we see that there are four engines ready to do work for us.
Running Python commands
=======================
The most basic type of operation that can be performed on the engines is to execute Python code. Executing Python code can be done in blocking or non-blocking mode (blocking is default) using the ``execute`` method.
Blocking execution
------------------
In blocking mode, the ``MultiEngineClient`` object (called ``mec`` in
these examples) submits the command to the controller, which places the
command in the engines' queues for execution. The ``execute`` call then
blocks until the engines are done executing the command::
# The default is to run on all engines
In [4]: mec.execute('a=5')
Out[4]:
<Results List>
[0] In [1]: a=5
[1] In [1]: a=5
[2] In [1]: a=5
[3] In [1]: a=5
In [5]: mec.execute('b=10')
Out[5]:
<Results List>
[0] In [2]: b=10
[1] In [2]: b=10
[2] In [2]: b=10
[3] In [2]: b=10
Python commands can be executed on specific engines by calling execute using the ``targets`` keyword argument::
In [6]: mec.execute('c=a+b',targets=[0,2])
Out[6]:
<Results List>
[0] In [3]: c=a+b
[2] In [3]: c=a+b
In [7]: mec.execute('c=a-b',targets=[1,3])
Out[7]:
<Results List>
[1] In [3]: c=a-b
[3] In [3]: c=a-b
In [8]: mec.execute('print c')
Out[8]:
<Results List>
[0] In [4]: print c
[0] Out[4]: 15
[1] In [4]: print c
[1] Out[4]: -5
[2] In [4]: print c
[2] Out[4]: 15
[3] In [4]: print c
[3] Out[4]: -5
This example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The ``execute`` method returns a Python ``dict`` that contains useful information::
In [9]: result_dict = mec.execute('d=10; print d')
In [10]: for r in result_dict:
....: print r
....:
....:
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 0, 'stdout': '10\n'}
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 1, 'stdout': '10\n'}
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 2, 'stdout': '10\n'}
{'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 3, 'stdout': '10\n'}
Non-blocking execution
----------------------
In non-blocking mode, ``execute`` submits the command to be executed and then returns a
``PendingResult`` object immediately. The ``PendingResult`` object gives you a way of getting a
result at a later time through its ``get_result`` method or ``r`` attribute. This allows you to
quickly submit long running commands without blocking your local Python/IPython session::
# In blocking mode
In [6]: mec.execute('import time')
Out[6]:
<Results List>
[0] In [1]: import time
[1] In [1]: import time
[2] In [1]: import time
[3] In [1]: import time
# In non-blocking mode
In [7]: pr = mec.execute('time.sleep(10)',block=False)
# Now block for the result
In [8]: pr.get_result()
Out[8]:
<Results List>
[0] In [2]: time.sleep(10)
[1] In [2]: time.sleep(10)
[2] In [2]: time.sleep(10)
[3] In [2]: time.sleep(10)
# Again in non-blocking mode
In [9]: pr = mec.execute('time.sleep(10)',block=False)
# Poll to see if the result is ready
In [10]: pr.get_result(block=False)
# A shorthand for get_result(block=True)
In [11]: pr.r
Out[11]:
<Results List>
[0] In [3]: time.sleep(10)
[1] In [3]: time.sleep(10)
[2] In [3]: time.sleep(10)
[3] In [3]: time.sleep(10)
Often, it is desirable to wait until a set of ``PendingResult`` objects are done. For this, there is a the method ``barrier``. This method takes a tuple of ``PendingResult`` objects and blocks until all of the associated results are ready::
In [72]: mec.block=False
# A trivial list of PendingResults objects
In [73]: pr_list = [mec.execute('time.sleep(3)') for i in range(10)]
# Wait until all of them are done
In [74]: mec.barrier(pr_list)
# Then, their results are ready using get_result or the r attribute
In [75]: pr_list[0].r
Out[75]:
<Results List>
[0] In [20]: time.sleep(3)
[1] In [19]: time.sleep(3)
[2] In [20]: time.sleep(3)
[3] In [19]: time.sleep(3)
The ``block`` and ``targets`` keyword arguments and attributes
--------------------------------------------------------------
Most commands in the multiengine interface (like ``execute``) accept ``block`` and ``targets``
as keyword arguments. As we have seen above, these keyword arguments control the blocking mode
and which engines the command is applied to. The ``MultiEngineClient`` class also has ``block``
and ``targets`` attributes that control the default behavior when the keyword arguments are not
provided. Thus the following logic is used for ``block`` and ``targets``:
* If no keyword argument is provided, the instance attributes are used.
* Keyword argument, if provided override the instance attributes.
The following examples demonstrate how to use the instance attributes::
In [16]: mec.targets = [0,2]
In [17]: mec.block = False
In [18]: pr = mec.execute('a=5')
In [19]: pr.r
Out[19]:
<Results List>
[0] In [6]: a=5
[2] In [6]: a=5
# Note targets='all' means all engines
In [20]: mec.targets = 'all'
In [21]: mec.block = True
In [22]: mec.execute('b=10; print b')
Out[22]:
<Results List>
[0] In [7]: b=10; print b
[0] Out[7]: 10
[1] In [6]: b=10; print b
[1] Out[6]: 10
[2] In [7]: b=10; print b
[2] Out[7]: 10
[3] In [6]: b=10; print b
[3] Out[6]: 10
The ``block`` and ``targets`` instance attributes also determine the behavior of the parallel
magic commands...
Parallel magic commands
-----------------------
We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to ``execute`` and ``get_result``. The ``%px`` magic executes a single Python command on the engines specified by the `magicTargets``targets` attribute of the ``MultiEngineClient`` instance (by default this is 'all')::
# Make this MultiEngineClient active for parallel magic commands
In [23]: mec.activate()
In [24]: mec.block=True
In [25]: import numpy
In [26]: %px import numpy
Executing command on Controller
Out[26]:
<Results List>
[0] In [8]: import numpy
[1] In [7]: import numpy
[2] In [8]: import numpy
[3] In [7]: import numpy
In [27]: %px a = numpy.random.rand(2,2)
Executing command on Controller
Out[27]:
<Results List>
[0] In [9]: a = numpy.random.rand(2,2)
[1] In [8]: a = numpy.random.rand(2,2)
[2] In [9]: a = numpy.random.rand(2,2)
[3] In [8]: a = numpy.random.rand(2,2)
In [28]: %px print numpy.linalg.eigvals(a)
Executing command on Controller
Out[28]:
<Results List>
[0] In [10]: print numpy.linalg.eigvals(a)
[0] Out[10]: [ 1.28167017 0.14197338]
[1] In [9]: print numpy.linalg.eigvals(a)
[1] Out[9]: [-0.14093616 1.27877273]
[2] In [10]: print numpy.linalg.eigvals(a)
[2] Out[10]: [-0.37023573 1.06779409]
[3] In [9]: print numpy.linalg.eigvals(a)
[3] Out[9]: [ 0.83664764 -0.25602658]
The ``%result`` magic gets and prints the stdin/stdout/stderr of the last command executed on each engine. It is simply a shortcut to the ``get_result`` method::
In [29]: %result
Out[29]:
<Results List>
[0] In [10]: print numpy.linalg.eigvals(a)
[0] Out[10]: [ 1.28167017 0.14197338]
[1] In [9]: print numpy.linalg.eigvals(a)
[1] Out[9]: [-0.14093616 1.27877273]
[2] In [10]: print numpy.linalg.eigvals(a)
[2] Out[10]: [-0.37023573 1.06779409]
[3] In [9]: print numpy.linalg.eigvals(a)
[3] Out[9]: [ 0.83664764 -0.25602658]
The ``%autopx`` magic switches to a mode where everything you type is executed on the engines given by the ``targets`` attribute::
In [30]: mec.block=False
In [31]: %autopx
Auto Parallel Enabled
Type %autopx to disable
In [32]: max_evals = []
<IPython.kernel.multiengineclient.PendingResult object at 0x17b8a70>
In [33]: for i in range(100):
....: a = numpy.random.rand(10,10)
....: a = a+a.transpose()
....: evals = numpy.linalg.eigvals(a)
....: max_evals.append(evals[0].real)
....:
....:
<IPython.kernel.multiengineclient.PendingResult object at 0x17af8f0>
In [34]: %autopx
Auto Parallel Disabled
In [35]: mec.block=True
In [36]: px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
Executing command on Controller
Out[36]:
<Results List>
[0] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[0] Out[13]: Average max eigenvalue is: 10.1387247332
[1] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[1] Out[12]: Average max eigenvalue is: 10.2076902286
[2] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[2] Out[13]: Average max eigenvalue is: 10.1891484655
[3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
[3] Out[12]: Average max eigenvalue is: 10.1158837784
Using the ``with`` statement of Python 2.5
------------------------------------------
Python 2.5 introduced the ``with`` statement. The ``MultiEngineClient`` can be used with the ``with`` statement to execute a block of code on the engines indicated by the ``targets`` attribute::
In [3]: with mec:
...: client.remote() # Required so the following code is not run locally
...: a = 10
...: b = 30
...: c = a+b
...:
...:
In [4]: mec.get_result()
Out[4]:
<Results List>
[0] In [1]: a = 10
b = 30
c = a+b
[1] In [1]: a = 10
b = 30
c = a+b
[2] In [1]: a = 10
b = 30
c = a+b
[3] In [1]: a = 10
b = 30
c = a+b
This is basically another way of calling execute, but one with allows you to avoid writing code in strings. When used in this way, the attributes ``targets`` and ``block`` are used to control how the code is executed. For now, if you run code in non-blocking mode you won't have access to the ``PendingResult``.
Moving Python object around
===========================
In addition to executing code on engines, you can transfer Python objects to and from your
IPython session and the engines. In IPython, these operations are called ``push`` (sending an
object to the engines) and ``pull`` (getting an object from the engines).
Basic push and pull
-------------------
Here are some examples of how you use ``push`` and ``pull``::
In [38]: mec.push(dict(a=1.03234,b=3453))
Out[38]: [None, None, None, None]
In [39]: mec.pull('a')
Out[39]: [1.03234, 1.03234, 1.03234, 1.03234]
In [40]: mec.pull('b',targets=0)
Out[40]: [3453]
In [41]: mec.pull(('a','b'))
Out[41]: [[1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453]]
In [42]: mec.zip_pull(('a','b'))
Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
In [43]: mec.push(dict(c='speed'))
Out[43]: [None, None, None, None]
In [44]: %px print c
Executing command on Controller
Out[44]:
<Results List>
[0] In [14]: print c
[0] Out[14]: speed
[1] In [13]: print c
[1] Out[13]: speed
[2] In [14]: print c
[2] Out[14]: speed
[3] In [13]: print c
[3] Out[13]: speed
In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects::
In [47]: mec.block=False
In [48]: pr = mec.pull('a')
In [49]: pr.r
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
Push and pull for functions
---------------------------
Functions can also be pushed and pulled using ``push_function`` and ``pull_function``::
In [53]: def f(x):
....: return 2.0*x**4
....:
In [54]: mec.push_function(dict(f=f))
Out[54]: [None, None, None, None]
In [55]: mec.execute('y = f(4.0)')
Out[55]:
<Results List>
[0] In [15]: y = f(4.0)
[1] In [14]: y = f(4.0)
[2] In [15]: y = f(4.0)
[3] In [14]: y = f(4.0)
In [56]: px print y
Executing command on Controller
Out[56]:
<Results List>
[0] In [16]: print y
[0] Out[16]: 512.0
[1] In [15]: print y
[1] Out[15]: 512.0
[2] In [16]: print y
[2] Out[16]: 512.0
[3] In [15]: print y
[3] Out[15]: 512.0
Dictionary interface
--------------------
As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class implements some of the Python dictionary interface. This make the remote namespaces of the engines appear as a local dictionary. Underneath, this uses ``push`` and ``pull``::
In [50]: mec.block=True
In [51]: mec['a']=['foo','bar']
In [52]: mec['a']
Out[52]: [['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar']]
Scatter and gather
------------------
Sometimes it is useful to partition a sequence and push the partitions to different engines. In
MPI language, this is know as scatter/gather and we follow that terminology. However, it is
important to remember that in IPython ``scatter`` is from the interactive IPython session to
the engines and ``gather`` is from the engines back to the interactive IPython session. For
scatter/gather operations between engines, MPI should be used::
In [58]: mec.scatter('a',range(16))
Out[58]: [None, None, None, None]
In [59]: px print a
Executing command on Controller
Out[59]:
<Results List>
[0] In [17]: print a
[0] Out[17]: [0, 1, 2, 3]
[1] In [16]: print a
[1] Out[16]: [4, 5, 6, 7]
[2] In [17]: print a
[2] Out[17]: [8, 9, 10, 11]
[3] In [16]: print a
[3] Out[16]: [12, 13, 14, 15]
In [60]: mec.gather('a')
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
Other things to look at
=======================
Parallel map
------------
Python's builtin ``map`` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, the MultiEngine interface in IPython already has a parallel version of ``map`` that works just like its serial counterpart::
In [63]: serial_result = map(lambda x:x**10, range(32))
In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
In [65]: serial_result==parallel_result
Out[65]: True
As you would expect, the parallel version of ``map`` is also influenced by the ``block`` and ``targets`` keyword arguments and attributes.
How to do parallel list comprehensions
--------------------------------------
In many cases list comprehensions are nicer than using the map function. While we don't have fully parallel list comprehensions, it is simple to get the basic effect using ``scatter`` and ``gather``::
In [66]: mec.scatter('x',range(64))
Out[66]: [None, None, None, None]
In [67]: px y = [i**10 for i in x]
Executing command on Controller
Out[67]:
<Results List>
[0] In [19]: y = [i**10 for i in x]
[1] In [18]: y = [i**10 for i in x]
[2] In [19]: y = [i**10 for i in x]
[3] In [18]: y = [i**10 for i in x]
In [68]: y = mec.gather('y')
In [69]: print y
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
Parallel Exceptions
-------------------
In the MultiEngine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, the MultiEngine interface has a ``CompositeError`` exception class that will be raised in most cases. The ``CompositeError`` class is a special type of exception that wraps one or more other types of exceptions. Here is how it works::
In [76]: mec.block=True
In [77]: mec.execute('1/0')
---------------------------------------------------------------------------
CompositeError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
432 targets, block = self._findTargetsAndBlock(targets, block)
433 result = blockingCallFromThread(self.smultiengine.execute, lines,
--> 434 targets=targets, block=block)
435 if block:
436 result = ResultList(result)
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
72 result.raiseException()
73 except Exception, e:
---> 74 raise e
75 return result
76
CompositeError: one or more exceptions from call to method: execute
[0:execute]: ZeroDivisionError: integer division or modulo by zero
[1:execute]: ZeroDivisionError: integer division or modulo by zero
[2:execute]: ZeroDivisionError: integer division or modulo by zero
[3:execute]: ZeroDivisionError: integer division or modulo by zero
Notice how the error message printed when ``CompositeError`` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
In [80]: try:
....: mec.execute('1/0')
....: except client.CompositeError, e:
....: e.raise_exception()
....:
....:
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
/ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
156 raise IndexError("an exception with index %i does not exist"%excid)
157 else:
--> 158 raise et, ev, etb
159
160 def collect_exceptions(rlist, method):
ZeroDivisionError: integer division or modulo by zero
If you are working in IPython, you can simple type ``%debug`` after one of these ``CompositeError`` is raised, and inspect the exception instance::
In [81]: mec.execute('1/0')
---------------------------------------------------------------------------
CompositeError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
432 targets, block = self._findTargetsAndBlock(targets, block)
433 result = blockingCallFromThread(self.smultiengine.execute, lines,
--> 434 targets=targets, block=block)
435 if block:
436 result = ResultList(result)
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
72 result.raiseException()
73 except Exception, e:
---> 74 raise e
75 return result
76
CompositeError: one or more exceptions from call to method: execute
[0:execute]: ZeroDivisionError: integer division or modulo by zero
[1:execute]: ZeroDivisionError: integer division or modulo by zero
[2:execute]: ZeroDivisionError: integer division or modulo by zero
[3:execute]: ZeroDivisionError: integer division or modulo by zero
In [82]: %debug
>
/ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
73 except Exception, e:
---> 74 raise e
75 return result
# With the debugger running, e is the exceptions instance. We can tab complete
# on it and see the extra methods that are available.
ipdb> e.
e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
ipdb> e.print_tracebacks()
[0:execute]:
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<string> in <module>()
ZeroDivisionError: integer division or modulo by zero
[1:execute]:
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<string> in <module>()
ZeroDivisionError: integer division or modulo by zero
[2:execute]:
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<string> in <module>()
ZeroDivisionError: integer division or modulo by zero
[3:execute]:
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<string> in <module>()
ZeroDivisionError: integer division or modulo by zero
All of this same error handling magic even works in non-blocking mode::
In [83]: mec.block=False
In [84]: pr = mec.execute('1/0')
In [85]: pr.r
---------------------------------------------------------------------------
CompositeError Traceback (most recent call last)
/ipython1-client-r3021/docs/examples/<ipython console> in <module>()
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
170
171 def _get_r(self):
--> 172 return self.get_result(block=True)
173
174 r = property(_get_r)
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
131 return self.result
132 try:
--> 133 result = self.client.get_pending_deferred(self.result_id, block)
134 except error.ResultNotCompleted:
135 return default
/ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
385
386 def get_pending_deferred(self, deferredID, block):
--> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
388
389 def barrier(self, pendingResults):
/ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
72 result.raiseException()
73 except Exception, e:
---> 74 raise e
75 return result
76
CompositeError: one or more exceptions from call to method: execute
[0:execute]: ZeroDivisionError: integer division or modulo by zero
[1:execute]: ZeroDivisionError: integer division or modulo by zero
[2:execute]: ZeroDivisionError: integer division or modulo by zero
[3:execute]: ZeroDivisionError: integer division or modulo by zero