parallel_multiengine.txt
869 lines
| 31.0 KiB
| text/plain
|
TextLexer
MinRK
|
r3664 | .. _parallel_multiengine: | ||
MinRK
|
r3586 | |||
MinRK
|
r3618 | ========================== | ||
IPython's Direct interface | ||||
========================== | ||||
MinRK
|
r3586 | |||
MinRK
|
r3618 | The direct, or multiengine, interface represents one possible way of working with a set of | ||
MinRK
|
r3586 | IPython engines. The basic idea behind the multiengine interface is that the | ||
capabilities of each engine are directly and explicitly exposed to the user. | ||||
Thus, in the multiengine interface, each engine is given an id that is used to | ||||
identify the engine and give it work to do. This interface is very intuitive | ||||
MinRK
|
r3664 | and is designed with interactive usage in mind, and is the best place for | ||
MinRK
|
r3586 | new users of IPython to begin. | ||
Starting the IPython controller and engines | ||||
=========================================== | ||||
To follow along with this tutorial, you will need to start the IPython | ||||
controller and four IPython engines. The simplest way of doing this is to use | ||||
MinRK
|
r3672 | the :command:`ipcluster` command:: | ||
MinRK
|
r3586 | |||
MinRK
|
r4608 | $ ipcluster start -n 4 | ||
MinRK
|
r3591 | |||
MinRK
|
r3586 | For more detailed information about starting the controller and engines, see | ||
Fernando Perez
|
r4435 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. | ||
MinRK
|
r3586 | |||
MinRK
|
r5169 | Creating a ``DirectView`` instance | ||
================================== | ||||
MinRK
|
r3586 | |||
MinRK
|
r3666 | The first step is to import the IPython :mod:`IPython.parallel` | ||
MinRK
|
r3591 | module and then create a :class:`.Client` instance: | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3666 | In [1]: from IPython.parallel import Client | ||
MinRK
|
r3591 | |||
MinRK
|
r3666 | In [2]: rc = Client() | ||
MinRK
|
r3586 | |||
MinRK
|
r3618 | This form assumes that the default connection information (stored in | ||
MinRK
|
r4060 | :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/profile_default/security`) is | ||
MinRK
|
r3618 | accurate. If the controller was started on a remote machine, you must copy that connection | ||
file to the client machine, or enter its contents as arguments to the Client constructor: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3618 | # If you have copied the json connector file from the controller: | ||
MinRK
|
r3666 | In [2]: rc = Client('/path/to/ipcontroller-client.json') | ||
MinRK
|
r3663 | # or to connect with a specific profile you have set up: | ||
MinRK
|
r3666 | In [3]: rc = Client(profile='mpi') | ||
MinRK
|
r3591 | |||
MinRK
|
r3586 | |||
MinRK
|
r3635 | To make sure there are engines connected to the controller, users can get a list | ||
MinRK
|
r3586 | of engine ids: | ||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [3]: rc.ids | ||
MinRK
|
r3635 | Out[3]: [0, 1, 2, 3] | ||
MinRK
|
r3586 | |||
Here we see that there are four engines ready to do work for us. | ||||
MinRK
|
r3639 | For direct execution, we will make use of a :class:`DirectView` object, which can be | ||
constructed via list-access to the client: | ||||
MinRK
|
r3663 | .. sourcecode:: ipython | ||
MinRK
|
r3639 | |||
In [4]: dview = rc[:] # use all engines | ||||
.. seealso:: | ||||
MinRK
|
r3655 | For more information, see the in-depth explanation of :ref:`Views <parallel_details>`. | ||
MinRK
|
r3639 | |||
MinRK
|
r3586 | Quick and easy parallelism | ||
========================== | ||||
In many cases, you simply want to apply a Python function to a sequence of | ||||
MinRK
|
r3591 | objects, but *in parallel*. The client interface provides a simple way | ||
MinRK
|
r3639 | of accomplishing this: using the DirectView's :meth:`~DirectView.map` method. | ||
MinRK
|
r3586 | |||
Parallel map | ||||
------------ | ||||
Python's builtin :func:`map` functions allows a function to be applied to a | ||||
sequence element-by-element. This type of code is typically trivial to | ||||
MinRK
|
r3594 | parallelize. In fact, since IPython's interface is all about functions anyway, | ||
MinRK
|
r3635 | you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a | ||
DirectView's :meth:`map` method: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [62]: serial_result = map(lambda x:x**10, range(32)) | ||
MinRK
|
r3639 | |||
MinRK
|
r3664 | In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32)) | ||
MinRK
|
r3586 | |||
MinRK
|
r3639 | In [67]: serial_result==parallel_result | ||
MinRK
|
r3591 | Out[67]: True | ||
MinRK
|
r3586 | |||
.. note:: | ||||
MinRK
|
r3635 | The :class:`DirectView`'s version of :meth:`map` does | ||
MinRK
|
r3639 | not do dynamic load balancing. For a load balanced version, use a | ||
MinRK
|
r3664 | :class:`LoadBalancedView`. | ||
MinRK
|
r3586 | |||
.. seealso:: | ||||
MinRK
|
r3591 | |||
MinRK
|
r3639 | :meth:`map` is implemented via :class:`ParallelFunction`. | ||
MinRK
|
r3586 | |||
MinRK
|
r3639 | Remote function decorators | ||
-------------------------- | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Remote functions are just like normal functions, but when they are called, | ||
they execute on one or more engines, rather than locally. IPython provides | ||||
MinRK
|
r3639 | two decorators: | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [10]: @dview.remote(block=True) | ||
MinRK
|
r5169 | ....: def getpid(): | ||
....: import os | ||||
....: return os.getpid() | ||||
....: | ||||
MinRK
|
r3639 | |||
In [11]: getpid() | ||||
Out[11]: [12345, 12346, 12347, 12348] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3664 | The ``@parallel`` decorator creates parallel functions, that break up an element-wise | ||
MinRK
|
r3639 | operations and distribute them, reconstructing the result. | ||
.. sourcecode:: ipython | ||||
In [12]: import numpy as np | ||||
In [13]: A = np.random.random((64,48)) | ||||
MinRK
|
r3664 | In [14]: @dview.parallel(block=True) | ||
MinRK
|
r5169 | ....: def pmul(A,B): | ||
....: return A*B | ||||
MinRK
|
r3639 | |||
In [15]: C_local = A*A | ||||
MinRK
|
r3664 | In [16]: C_remote = pmul(A,A) | ||
MinRK
|
r3639 | |||
In [17]: (C_local == C_remote).all() | ||||
Out[17]: True | ||||
MinRK
|
r3586 | |||
MinRK
|
r5634 | Calling a ``@parallel`` function *does not* correspond to map. It is used for splitting | ||
element-wise operations that operate on a sequence or array. For ``map`` behavior, | ||||
parallel functions do have a map method. | ||||
==================== ============================ ============================= | ||||
call pfunc(seq) pfunc.map(seq) | ||||
==================== ============================ ============================= | ||||
# of tasks # of engines (1 per engine) # of engines (1 per engine) | ||||
# of remote calls # of engines (1 per engine) ``len(seq)`` | ||||
argument to remote ``seq[i:j]`` (sub-sequence) ``seq[i]`` (single element) | ||||
==================== ============================ ============================= | ||||
A quick example to illustrate the difference in arguments for the two modes: | ||||
.. sourcecode:: ipython | ||||
In [16]: @dview.parallel(block=True) | ||||
....: def echo(x): | ||||
....: return str(x) | ||||
....: | ||||
In [17]: echo(range(5)) | ||||
Out[17]: ['[0, 1]', '[2]', '[3]', '[4]'] | ||||
In [18]: echo.map(range(5)) | ||||
Out[18]: ['0', '1', '2', '3', '4'] | ||||
MinRK
|
r3635 | .. seealso:: | ||
MinRK
|
r5634 | See the :func:`~.remotefunction.parallel` and :func:`~.remotefunction.remote` | ||
decorators for options. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | Calling Python functions | ||
======================== | ||||
MinRK
|
r3586 | |||
The most basic type of operation that can be performed on the engines is to | ||||
MinRK
|
r3591 | execute Python code or call Python functions. Executing Python code can be | ||
done in blocking or non-blocking mode (non-blocking is default) using the | ||||
MinRK
|
r3664 | :meth:`.View.execute` method, and calling functions can be done via the | ||
MinRK
|
r3591 | :meth:`.View.apply` method. | ||
MinRK
|
r3586 | |||
MinRK
|
r3594 | apply | ||
----- | ||||
The main method for doing remote execution (in fact, all methods that | ||||
MinRK
|
r3664 | communicate with the engines are built on top of it), is :meth:`View.apply`. | ||
MinRK
|
r3594 | |||
MinRK
|
r3664 | We strive to provide the cleanest interface we can, so `apply` has the following | ||
signature: | ||||
MinRK
|
r3594 | |||
MinRK
|
r3664 | .. sourcecode:: python | ||
MinRK
|
r3642 | |||
MinRK
|
r3664 | view.apply(f, *args, **kwargs) | ||
MinRK
|
r3642 | |||
MinRK
|
r3664 | There are various ways to call functions with IPython, and these flags are set as | ||
attributes of the View. The ``DirectView`` has just two of these flags: | ||||
MinRK
|
r3642 | |||
MinRK
|
r3664 | dv.block : bool | ||
whether to wait for the result, or return an :class:`AsyncResult` object | ||||
immediately | ||||
dv.track : bool | ||||
MinRK
|
r5167 | whether to instruct pyzmq to track when zeromq is done sending the message. | ||
MinRK
|
r3664 | This is primarily useful for non-copying sends of numpy arrays that you plan to | ||
edit in-place. You need to know when it becomes safe to edit the buffer | ||||
without corrupting the message. | ||||
MinRK
|
r5169 | dv.targets : int, list of ints | ||
which targets this view is associated with. | ||||
MinRK
|
r3642 | |||
MinRK
|
r3664 | Creating a view is simple: index-access on a client creates a :class:`.DirectView`. | ||
MinRK
|
r3594 | |||
.. sourcecode:: ipython | ||||
In [4]: view = rc[1:3] | ||||
Out[4]: <DirectView [1, 2]> | ||||
In [5]: view.apply<tab> | ||||
MinRK
|
r3670 | view.apply view.apply_async view.apply_sync | ||
MinRK
|
r3594 | |||
MinRK
|
r3664 | For convenience, you can set block temporarily for a single call with the extra sync/async methods. | ||
MinRK
|
r3594 | |||
MinRK
|
r3586 | Blocking execution | ||
------------------ | ||||
MinRK
|
r3591 | In blocking mode, the :class:`.DirectView` object (called ``dview`` in | ||
MinRK
|
r3586 | these examples) submits the command to the controller, which places the | ||
MinRK
|
r3591 | command in the engines' queues for execution. The :meth:`apply` call then | ||
MinRK
|
r3586 | blocks until the engines are done executing the command: | ||
.. sourcecode:: ipython | ||||
MinRK
|
r3639 | In [2]: dview = rc[:] # A DirectView of all engines | ||
In [3]: dview.block=True | ||||
MinRK
|
r3591 | In [4]: dview['a'] = 5 | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [5]: dview['b'] = 10 | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [6]: dview.apply(lambda x: a+b+x, 27) | ||
MinRK
|
r3655 | Out[6]: [42, 42, 42, 42] | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync` | ||
method: | ||||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [7]: dview.block=False | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [8]: dview.apply_sync(lambda x: a+b+x, 27) | ||
Out[8]: [42, 42, 42, 42] | ||||
MinRK
|
r3639 | |||
MinRK
|
r3664 | Python commands can be executed as strings on specific engines by using a View's ``execute`` | ||
method: | ||||
MinRK
|
r3655 | |||
MinRK
|
r3664 | .. sourcecode:: ipython | ||
MinRK
|
r3655 | |||
MinRK
|
r3664 | In [6]: rc[::2].execute('c=a+b') | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [7]: rc[1::2].execute('c=a-b') | ||
MinRK
|
r3586 | |||
MinRK
|
r3670 | In [8]: dview['c'] # shorthand for dview.pull('c', block=True) | ||
MinRK
|
r3664 | Out[8]: [15, -5, 15, -5] | ||
MinRK
|
r3586 | |||
Non-blocking execution | ||||
---------------------- | ||||
MinRK
|
r3591 | In non-blocking mode, :meth:`apply` submits the command to be executed and | ||
then returns a :class:`AsyncResult` object immediately. The | ||||
:class:`AsyncResult` object gives you a way of getting a result at a later | ||||
MinRK
|
r3639 | time through its :meth:`get` method. | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | .. Note:: | ||
MinRK
|
r3624 | The :class:`AsyncResult` object provides a superset of the interface in | ||
MinRK
|
r3591 | :py:class:`multiprocessing.pool.AsyncResult`. See the | ||
`official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_ | ||||
for more. | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | |||
This allows you to quickly submit long running commands without blocking your | ||||
local Python/IPython session: | ||||
.. sourcecode:: ipython | ||||
# define our function | ||||
MinRK
|
r3624 | In [6]: def wait(t): | ||
MinRK
|
r5169 | ....: import time | ||
....: tic = time.time() | ||||
....: time.sleep(t) | ||||
....: return time.time()-tic | ||||
MinRK
|
r3591 | |||
# In non-blocking mode | ||||
MinRK
|
r3639 | In [7]: ar = dview.apply_async(wait, 2) | ||
MinRK
|
r3591 | |||
# Now block for the result | ||||
MinRK
|
r3639 | In [8]: ar.get() | ||
MinRK
|
r3591 | Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] | ||
# Again in non-blocking mode | ||||
MinRK
|
r3639 | In [9]: ar = dview.apply_async(wait, 10) | ||
MinRK
|
r3591 | |||
# Poll to see if the result is ready | ||||
MinRK
|
r3639 | In [10]: ar.ready() | ||
MinRK
|
r3591 | Out[10]: False | ||
# ask for the result, but wait a maximum of 1 second: | ||||
MinRK
|
r3639 | In [45]: ar.get(1) | ||
MinRK
|
r3591 | --------------------------------------------------------------------------- | ||
TimeoutError Traceback (most recent call last) | ||||
/home/you/<ipython-input-45-7cd858bbb8e0> in <module>() | ||||
MinRK
|
r3639 | ----> 1 ar.get(1) | ||
MinRK
|
r3591 | |||
MinRK
|
r3666 | /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout) | ||
MinRK
|
r3591 | 62 raise self._exception | ||
63 else: | ||||
---> 64 raise error.TimeoutError("Result not ready.") | ||||
65 | ||||
66 def ready(self): | ||||
TimeoutError: Result not ready. | ||||
.. Note:: | ||||
Note the import inside the function. This is a common model, to ensure | ||||
MinRK
|
r3664 | that the appropriate modules are imported where the task is run. You can | ||
also manually import modules into the engine(s) namespace(s) via | ||||
:meth:`view.execute('import numpy')`. | ||||
MinRK
|
r3591 | |||
Often, it is desirable to wait until a set of :class:`AsyncResult` objects | ||||
MinRK
|
r3664 | are done. For this, there is a the method :meth:`wait`. This method takes a | ||
MinRK
|
r3639 | tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History), | ||
and blocks until all of the associated results are ready: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [72]: dview.block=False | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | # A trivial list of AsyncResults objects | ||
MinRK
|
r3635 | In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)] | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | # Wait until all of them are done | ||
MinRK
|
r3664 | In [74]: dview.wait(pr_list) | ||
MinRK
|
r3586 | |||
MinRK
|
r3624 | # Then, their results are ready using get() or the `.r` attribute | ||
MinRK
|
r3591 | In [75]: pr_list[0].get() | ||
Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | |||
MinRK
|
r3666 | The ``block`` and ``targets`` keyword arguments and attributes | ||
-------------------------------------------------------------- | ||||
MinRK
|
r5169 | Most DirectView methods (excluding :meth:`apply`) accept ``block`` and | ||
MinRK
|
r3666 | ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the | ||
blocking mode and which engines the command is applied to. The :class:`View` class also has | ||||
:attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword | ||||
arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`: | ||||
MinRK
|
r3586 | |||
* If no keyword argument is provided, the instance attributes are used. | ||||
MinRK
|
r3635 | * Keyword argument, if provided override the instance attributes for | ||
the duration of a single call. | ||||
MinRK
|
r3639 | |||
MinRK
|
r3586 | The following examples demonstrate how to use the instance attributes: | ||
.. sourcecode:: ipython | ||||
MinRK
|
r3666 | In [16]: dview.targets = [0,2] | ||
MinRK
|
r3664 | In [17]: dview.block = False | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [18]: ar = dview.apply(lambda : 10) | ||
MinRK
|
r3586 | |||
MinRK
|
r3635 | In [19]: ar.get() | ||
MinRK
|
r3666 | Out[19]: [10, 10] | ||
MinRK
|
r3586 | |||
MinRK
|
r3666 | In [16]: dview.targets = v.client.ids # all engines (4) | ||
MinRK
|
r3664 | In [21]: dview.block = True | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [22]: dview.apply(lambda : 42) | ||
MinRK
|
r3635 | Out[22]: [42, 42, 42, 42] | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | The :attr:`block` and :attr:`targets` instance attributes of the | ||
MinRK
|
r3635 | :class:`.DirectView` also determine the behavior of the parallel magic commands. | ||
MinRK
|
r3586 | |||
Parallel magic commands | ||||
----------------------- | ||||
We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) | ||||
that make it more pleasant to execute Python commands on the engines | ||||
interactively. These are simply shortcuts to :meth:`execute` and | ||||
MinRK
|
r3639 | :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single | ||
Python command on the engines specified by the :attr:`targets` attribute of the | ||||
:class:`DirectView` instance: | ||||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | # Create a DirectView for all targets | ||
In [22]: dv = rc[:] | ||||
# Make this DirectView active for parallel magic commands | ||||
In [23]: dv.activate() | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [24]: dv.block=True | ||
MinRK
|
r3586 | |||
MinRK
|
r5169 | # import numpy here and everywhere | ||
In [25]: with dv.sync_imports(): | ||||
....: import numpy | ||||
importing numpy on engine(s) | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [27]: %px a = numpy.random.rand(2,2) | ||
Parallel execution on engines: [0, 1, 2, 3] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [28]: %px ev = numpy.linalg.eigvals(a) | ||
Parallel execution on engines: [0, 1, 2, 3] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [28]: dv['ev'] | ||
MinRK
|
r3639 | Out[28]: [ array([ 1.09522024, -0.09645227]), | ||
MinRK
|
r5169 | ....: array([ 1.21435496, -0.35546712]), | ||
....: array([ 0.72180653, 0.07133042]), | ||||
....: array([ 1.46384341, 1.04353244e-04]) | ||||
....: ] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3639 | The ``%result`` magic gets the most recent result, or takes an argument | ||
specifying the index of the result to be requested. It is simply a shortcut to the | ||||
MinRK
|
r3586 | :meth:`get_result` method: | ||
.. sourcecode:: ipython | ||||
MinRK
|
r3639 | |||
MinRK
|
r3655 | In [29]: dv.apply_async(lambda : ev) | ||
MinRK
|
r3639 | |||
In [30]: %result | ||||
Out[30]: [ [ 1.28167017 0.14197338], | ||||
MinRK
|
r5169 | ....: [-0.14093616 1.27877273], | ||
....: [-0.37023573 1.06779409], | ||||
....: [ 0.83664764 -0.25602658] ] | ||||
MinRK
|
r3586 | |||
The ``%autopx`` magic switches to a mode where everything you type is executed | ||||
on the engines given by the :attr:`targets` attribute: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [30]: dv.block=False | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [31]: %autopx | ||
Auto Parallel Enabled | ||||
Type %autopx to disable | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [32]: max_evals = [] | ||
MinRK
|
r3673 | <IPython.parallel.AsyncResult object at 0x17b8a70> | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [33]: for i in range(100): | ||
....: a = numpy.random.rand(10,10) | ||||
....: a = a+a.transpose() | ||||
....: evals = numpy.linalg.eigvals(a) | ||||
....: max_evals.append(evals[0].real) | ||||
....: | ||||
....: | ||||
MinRK
|
r3673 | <IPython.parallel.AsyncResult object at 0x17af8f0> | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [34]: %autopx | ||
Auto Parallel Disabled | ||||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [35]: dv.block=True | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals)) | ||
Parallel execution on engines: [0, 1, 2, 3] | ||||
In [37]: dv['ans'] | ||||
MinRK
|
r3597 | Out[37]: [ 'Average max eigenvalue is: 10.1387247332', | ||
MinRK
|
r5169 | ....: 'Average max eigenvalue is: 10.2076902286', | ||
....: 'Average max eigenvalue is: 10.1891484655', | ||||
....: 'Average max eigenvalue is: 10.1158837784',] | ||||
MinRK
|
r3586 | |||
Moving Python objects around | ||||
============================ | ||||
MinRK
|
r3591 | In addition to calling functions and executing code on engines, you can | ||
transfer Python objects to and from your IPython session and the engines. In | ||||
IPython, these operations are called :meth:`push` (sending an object to the | ||||
MinRK
|
r3664 | engines) and :meth:`pull` (getting an object from the engines). | ||
MinRK
|
r3586 | |||
Basic push and pull | ||||
------------------- | ||||
Here are some examples of how you use :meth:`push` and :meth:`pull`: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [38]: dview.push(dict(a=1.03234,b=3453)) | ||
MinRK
|
r3597 | Out[38]: [None,None,None,None] | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [39]: dview.pull('a') | ||
MinRK
|
r3597 | Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234] | ||
MinRK
|
r3586 | |||
MinRK
|
r3670 | In [40]: dview.pull('b', targets=0) | ||
MinRK
|
r3591 | Out[40]: 3453 | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [41]: dview.pull(('a','b')) | ||
MinRK
|
r3597 | Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ] | ||
MinRK
|
r3591 | |||
MinRK
|
r3664 | In [43]: dview.push(dict(c='speed')) | ||
MinRK
|
r3597 | Out[43]: [None,None,None,None] | ||
MinRK
|
r3586 | |||
In non-blocking mode :meth:`push` and :meth:`pull` also return | ||||
MinRK
|
r3591 | :class:`AsyncResult` objects: | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [48]: ar = dview.pull('a', block=False) | ||
MinRK
|
r3586 | |||
MinRK
|
r3639 | In [49]: ar.get() | ||
MinRK
|
r3591 | Out[49]: [1.03234, 1.03234, 1.03234, 1.03234] | ||
MinRK
|
r3586 | |||
Dictionary interface | ||||
-------------------- | ||||
MinRK
|
r3642 | Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide | ||
MinRK
|
r3591 | dictionary-style access by key and methods such as :meth:`get` and | ||
:meth:`update` for convenience. This make the remote namespaces of the engines | ||||
MinRK
|
r3642 | appear as a local dictionary. Underneath, these methods call :meth:`apply`: | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3635 | In [51]: dview['a']=['foo','bar'] | ||
MinRK
|
r3586 | |||
MinRK
|
r3635 | In [52]: dview['a'] | ||
MinRK
|
r3597 | Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] | ||
MinRK
|
r3586 | |||
Scatter and gather | ||||
------------------ | ||||
Sometimes it is useful to partition a sequence and push the partitions to | ||||
different engines. In MPI language, this is know as scatter/gather and we | ||||
follow that terminology. However, it is important to remember that in | ||||
MinRK
|
r3591 | IPython's :class:`Client` class, :meth:`scatter` is from the | ||
MinRK
|
r3586 | interactive IPython session to the engines and :meth:`gather` is from the | ||
engines back to the interactive IPython session. For scatter/gather operations | ||||
MinRK
|
r5169 | between engines, MPI, pyzmq, or some other direct interconnect should be used. | ||
MinRK
|
r3586 | |||
.. sourcecode:: ipython | ||||
MinRK
|
r3635 | In [58]: dview.scatter('a',range(16)) | ||
MinRK
|
r3597 | Out[58]: [None,None,None,None] | ||
MinRK
|
r3586 | |||
MinRK
|
r3635 | In [59]: dview['a'] | ||
MinRK
|
r3597 | Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] | ||
MinRK
|
r3586 | |||
MinRK
|
r3635 | In [60]: dview.gather('a') | ||
MinRK
|
r3591 | Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] | ||
MinRK
|
r3586 | |||
Other things to look at | ||||
======================= | ||||
How to do parallel list comprehensions | ||||
-------------------------------------- | ||||
In many cases list comprehensions are nicer than using the map function. While | ||||
we don't have fully parallel list comprehensions, it is simple to get the | ||||
basic effect using :meth:`scatter` and :meth:`gather`: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3635 | In [66]: dview.scatter('x',range(64)) | ||
MinRK
|
r3586 | |||
MinRK
|
r3664 | In [67]: %px y = [i**10 for i in x] | ||
MinRK
|
r3600 | Parallel execution on engines: [0, 1, 2, 3] | ||
MinRK
|
r3670 | Out[67]: | ||
MinRK
|
r3586 | |||
MinRK
|
r3635 | In [68]: y = dview.gather('y') | ||
MinRK
|
r3586 | |||
MinRK
|
r3591 | In [69]: print y | ||
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] | ||||
MinRK
|
r3586 | |||
MinRK
|
r3670 | Remote imports | ||
-------------- | ||||
Sometimes you will want to import packages both in your interactive session | ||||
and on your remote engines. This can be done with the :class:`ContextManager` | ||||
created by a DirectView's :meth:`sync_imports` method: | ||||
.. sourcecode:: ipython | ||||
In [69]: with dview.sync_imports(): | ||||
MinRK
|
r5169 | ....: import numpy | ||
MinRK
|
r3670 | importing numpy on engine(s) | ||
Any imports made inside the block will also be performed on the view's engines. | ||||
sync_imports also takes a `local` boolean flag that defaults to True, which specifies | ||||
whether the local imports should also be performed. However, support for `local=False` | ||||
has not been implemented, so only packages that can be imported locally will work | ||||
this way. | ||||
You can also specify imports via the ``@require`` decorator. This is a decorator | ||||
designed for use in Dependencies, but can be used to handle remote imports as well. | ||||
Modules or module names passed to ``@require`` will be imported before the decorated | ||||
function is called. If they cannot be imported, the decorated function will never | ||||
execution, and will fail with an UnmetDependencyError. | ||||
.. sourcecode:: ipython | ||||
In [69]: from IPython.parallel import require | ||||
MinRK
|
r4271 | In [70]: @require('re'): | ||
MinRK
|
r5169 | ....: def findall(pat, x): | ||
....: # re is guaranteed to be available | ||||
....: return re.findall(pat, x) | ||||
MinRK
|
r3670 | |||
# you can also pass modules themselves, that you already have locally: | ||||
MinRK
|
r4271 | In [71]: @require(time): | ||
MinRK
|
r5169 | ....: def wait(t): | ||
....: time.sleep(t) | ||||
....: return t | ||||
MinRK
|
r3670 | |||
MinRK
|
r4109 | .. _parallel_exceptions: | ||
MinRK
|
r3670 | |||
MinRK
|
r3586 | Parallel exceptions | ||
------------------- | ||||
In the multiengine interface, parallel commands can raise Python exceptions, | ||||
just like serial commands. But, it is a little subtle, because a single | ||||
parallel command can actually raise multiple exceptions (one for each engine | ||||
MinRK
|
r3642 | the command was run on). To express this idea, we have a | ||
MinRK
|
r3586 | :exc:`CompositeError` exception class that will be raised in most cases. The | ||
:exc:`CompositeError` class is a special type of exception that wraps one or | ||||
more other types of exceptions. Here is how it works: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3642 | In [76]: dview.block=True | ||
MinRK
|
r3586 | |||
MinRK
|
r3642 | In [77]: dview.execute('1/0') | ||
MinRK
|
r3591 | --------------------------------------------------------------------------- | ||
CompositeError Traceback (most recent call last) | ||||
MinRK
|
r4109 | /home/user/<ipython-input-10-5d56b303a66c> in <module>() | ||
----> 1 dview.execute('1/0') | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block) | ||||
591 default: self.block | ||||
592 """ | ||||
--> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets) | ||||
594 | ||||
595 def run(self, filename, targets=None, block=None): | ||||
/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track) | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs) | ||||
55 def sync_results(f, self, *args, **kwargs): | ||||
56 """sync relevant results from self.client to our results attribute.""" | ||||
---> 57 ret = f(self, *args, **kwargs) | ||||
58 delta = self.outstanding.difference(self.client.outstanding) | ||||
59 completed = self.outstanding.intersection(delta) | ||||
/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track) | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs) | ||||
44 n_previous = len(self.client.history) | ||||
45 try: | ||||
---> 46 ret = f(self, *args, **kwargs) | ||||
47 finally: | ||||
48 nmsgs = len(self.client.history) - n_previous | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track) | ||||
529 if block: | ||||
530 try: | ||||
--> 531 return ar.get() | ||||
532 except KeyboardInterrupt: | ||||
533 pass | ||||
/path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout) | ||||
101 return self._result | ||||
102 else: | ||||
--> 103 raise self._exception | ||||
104 else: | ||||
105 raise error.TimeoutError("Result not ready.") | ||||
MinRK
|
r3642 | |||
CompositeError: one or more exceptions from call to method: _execute | ||||
[0:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
MinRK
|
r3586 | |||
Notice how the error message printed when :exc:`CompositeError` is raised has | ||||
information about the individual exceptions that were raised on each engine. | ||||
If you want, you can even raise one of these original exceptions: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3591 | In [80]: try: | ||
MinRK
|
r3664 | ....: dview.execute('1/0') | ||
MinRK
|
r4109 | ....: except parallel.error.CompositeError, e: | ||
MinRK
|
r3591 | ....: e.raise_exception() | ||
....: | ||||
....: | ||||
--------------------------------------------------------------------------- | ||||
MinRK
|
r4109 | RemoteError Traceback (most recent call last) | ||
/home/user/<ipython-input-17-8597e7e39858> in <module>() | ||||
2 dview.execute('1/0') | ||||
3 except CompositeError as e: | ||||
----> 4 e.raise_exception() | ||||
/path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid) | ||||
266 raise IndexError("an exception with index %i does not exist"%excid) | ||||
267 else: | ||||
--> 268 raise RemoteError(en, ev, etb, ei) | ||||
269 | ||||
270 | ||||
RemoteError: ZeroDivisionError(integer division or modulo by zero) | ||||
Traceback (most recent call last): | ||||
File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request | ||||
exec code in working,working | ||||
File "<string>", line 1, in <module> | ||||
File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute | ||||
exec code in globals() | ||||
File "<string>", line 1, in <module> | ||||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
If you are working in IPython, you can simple type ``%debug`` after one of | ||||
these :exc:`CompositeError` exceptions is raised, and inspect the exception | ||||
instance: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [81]: dview.execute('1/0') | ||
MinRK
|
r3591 | --------------------------------------------------------------------------- | ||
CompositeError Traceback (most recent call last) | ||||
MinRK
|
r4109 | /home/user/<ipython-input-10-5d56b303a66c> in <module>() | ||
----> 1 dview.execute('1/0') | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block) | ||||
591 default: self.block | ||||
592 """ | ||||
--> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets) | ||||
594 | ||||
595 def run(self, filename, targets=None, block=None): | ||||
/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track) | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs) | ||||
55 def sync_results(f, self, *args, **kwargs): | ||||
56 """sync relevant results from self.client to our results attribute.""" | ||||
---> 57 ret = f(self, *args, **kwargs) | ||||
58 delta = self.outstanding.difference(self.client.outstanding) | ||||
59 completed = self.outstanding.intersection(delta) | ||||
/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track) | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs) | ||||
44 n_previous = len(self.client.history) | ||||
45 try: | ||||
---> 46 ret = f(self, *args, **kwargs) | ||||
47 finally: | ||||
48 nmsgs = len(self.client.history) - n_previous | ||||
/path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track) | ||||
529 if block: | ||||
530 try: | ||||
--> 531 return ar.get() | ||||
532 except KeyboardInterrupt: | ||||
533 pass | ||||
/path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout) | ||||
101 return self._result | ||||
102 else: | ||||
--> 103 raise self._exception | ||||
104 else: | ||||
105 raise error.TimeoutError("Result not ready.") | ||||
MinRK
|
r3642 | |||
CompositeError: one or more exceptions from call to method: _execute | ||||
[0:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
MinRK
|
r3591 | In [82]: %debug | ||
MinRK
|
r4109 | > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get() | ||
102 else: | ||||
--> 103 raise self._exception | ||||
104 else: | ||||
MinRK
|
r3642 | |||
MinRK
|
r4109 | # With the debugger running, self._exception is the exceptions instance. We can tab complete | ||
MinRK
|
r3591 | # on it and see the extra methods that are available. | ||
MinRK
|
r4109 | ipdb> self._exception.<tab> | ||
MinRK
|
r3591 | e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args | ||
e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist | ||||
e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message | ||||
e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks | ||||
e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception | ||||
MinRK
|
r4109 | ipdb> self._exception.print_tracebacks() | ||
MinRK
|
r3642 | [0:apply]: | ||
Traceback (most recent call last): | ||||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request | ||
exec code in working,working | ||||
MinRK
|
r3642 | File "<string>", line 1, in <module> | ||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute | ||
MinRK
|
r3642 | exec code in globals() | ||
File "<string>", line 1, in <module> | ||||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
MinRK
|
r3642 | [1:apply]: | ||
Traceback (most recent call last): | ||||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request | ||
exec code in working,working | ||||
MinRK
|
r3642 | File "<string>", line 1, in <module> | ||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute | ||
MinRK
|
r3642 | exec code in globals() | ||
File "<string>", line 1, in <module> | ||||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
MinRK
|
r3642 | [2:apply]: | ||
Traceback (most recent call last): | ||||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request | ||
exec code in working,working | ||||
MinRK
|
r3642 | File "<string>", line 1, in <module> | ||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute | ||
MinRK
|
r3642 | exec code in globals() | ||
File "<string>", line 1, in <module> | ||||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3586 | |||
MinRK
|
r3642 | [3:apply]: | ||
Traceback (most recent call last): | ||||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request | ||
exec code in working,working | ||||
MinRK
|
r3642 | File "<string>", line 1, in <module> | ||
MinRK
|
r4109 | File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute | ||
MinRK
|
r3642 | exec code in globals() | ||
File "<string>", line 1, in <module> | ||||
MinRK
|
r3591 | ZeroDivisionError: integer division or modulo by zero | ||
MinRK
|
r3664 | |||
MinRK
|
r3586 | |||
All of this same error handling magic even works in non-blocking mode: | ||||
.. sourcecode:: ipython | ||||
MinRK
|
r3664 | In [83]: dview.block=False | ||
MinRK
|
r3591 | |||
MinRK
|
r3664 | In [84]: ar = dview.execute('1/0') | ||
MinRK
|
r3591 | |||
MinRK
|
r3639 | In [85]: ar.get() | ||
MinRK
|
r3591 | --------------------------------------------------------------------------- | ||
CompositeError Traceback (most recent call last) | ||||
MinRK
|
r4109 | /home/user/<ipython-input-21-8531eb3d26fb> in <module>() | ||
MinRK
|
r3642 | ----> 1 ar.get() | ||
MinRK
|
r4109 | /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout) | ||
101 return self._result | ||||
102 else: | ||||
--> 103 raise self._exception | ||||
104 else: | ||||
105 raise error.TimeoutError("Result not ready.") | ||||
MinRK
|
r3642 | |||
CompositeError: one or more exceptions from call to method: _execute | ||||
[0:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[1:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[2:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
[3:apply]: ZeroDivisionError: integer division or modulo by zero | ||||
MinRK
|
r3586 | |||