asyncresult.txt
150 lines
| 5.1 KiB
| text/plain
|
TextLexer
MinRK
|
r6472 | .. _parallel_asyncresult: | ||
====================== | ||||
The AsyncResult object | ||||
====================== | ||||
In non-blocking mode, :meth:`apply` submits the command to be executed and | ||||
then returns a :class:`~.AsyncResult` object immediately. The | ||||
AsyncResult object gives you a way of getting a result at a later | ||||
time through its :meth:`get` method, but it also collects metadata | ||||
on execution. | ||||
Beyond multiprocessing's AsyncResult | ||||
==================================== | ||||
.. Note:: | ||||
The :class:`~.AsyncResult` object provides a superset of the interface in | ||||
:py:class:`multiprocessing.pool.AsyncResult`. See the | ||||
`official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_ | ||||
for more on the basics of this interface. | ||||
Our AsyncResult objects add a number of convenient features for working with | ||||
parallel results, beyond what is provided by the original AsyncResult. | ||||
get_dict | ||||
-------- | ||||
First, is :meth:`.AsyncResult.get_dict`, which pulls results as a dictionary | ||||
keyed by engine_id, rather than a flat list. This is useful for quickly | ||||
coordinating or distributing information about all of the engines. | ||||
As an example, here is a quick call that gives every engine a dict showing | ||||
the PID of every other engine: | ||||
.. sourcecode:: ipython | ||||
In [10]: ar = rc[:].apply_async(os.getpid) | ||||
In [11]: pids = ar.get_dict() | ||||
In [12]: rc[:]['pid_map'] = pids | ||||
This trick is particularly useful when setting up inter-engine communication, | ||||
as in IPython's :file:`examples/parallel/interengine` examples. | ||||
Metadata | ||||
======== | ||||
IPython.parallel tracks some metadata about the tasks, which is stored | ||||
in the :attr:`.Client.metadata` dict. The AsyncResult object gives you an | ||||
interface for this information as well, including timestamps stdout/err, | ||||
and engine IDs. | ||||
Timing | ||||
------ | ||||
IPython tracks various timestamps as :py:class:`.datetime` objects, | ||||
and the AsyncResult object has a few properties that turn these into useful | ||||
times (in seconds as floats). | ||||
For use while the tasks are still pending: | ||||
* :attr:`ar.elapsed` is just the elapsed seconds since submission, for use | ||||
before the AsyncResult is complete. | ||||
* :attr:`ar.progress` is the number of tasks that have completed. Fractional progress | ||||
would be:: | ||||
1.0 * ar.progress / len(ar) | ||||
* :meth:`AsyncResult.wait_interactive` will wait for the result to finish, but | ||||
print out status updates on progress and elapsed time while it waits. | ||||
For use after the tasks are done: | ||||
* :attr:`ar.serial_time` is the sum of the computation time of all of the tasks | ||||
done in parallel. | ||||
* :attr:`ar.wall_time` is the time between the first task submitted and last result | ||||
received. This is the actual cost of computation, including IPython overhead. | ||||
.. note:: | ||||
wall_time is only precise if the Client is waiting for results when | ||||
the task finished, because the `received` timestamp is made when the result is | ||||
unpacked by the Client, triggered by the :meth:`~Client.spin` call. If you | ||||
are doing work in the Client, and not waiting/spinning, then `received` might | ||||
be artificially high. | ||||
An often interesting metric is the time it actually cost to do the work in parallel | ||||
relative to the serial computation, and this can be given simply with | ||||
.. sourcecode:: python | ||||
speedup = ar.serial_time / ar.wall_time | ||||
Map results are iterable! | ||||
========================= | ||||
When an AsyncResult object has multiple results (e.g. the :class:`~AsyncMapResult` | ||||
MinRK
|
r7480 | object), you can actually iterate through results themselves, and act on them as they arrive: | ||
MinRK
|
r6472 | |||
.. literalinclude:: ../../examples/parallel/itermapresult.py | ||||
:language: python | ||||
timo
|
r7071 | :lines: 20-67 | ||
MinRK
|
r7480 | |||
That is to say, if you treat an AsyncMapResult as if it were a list of your actual | ||||
results, it should behave as you would expect, with the only difference being | ||||
that you can start iterating through the results before they have even been computed. | ||||
This lets you do a dumb version of map/reduce with the builtin Python functions, | ||||
and the only difference between doing this locally and doing it remotely in parallel | ||||
is using the asynchronous view.map instead of the builtin map. | ||||
Here is a simple one-line RMS (root-mean-square) implemented with Python's builtin map/reduce. | ||||
.. sourcecode:: ipython | ||||
In [38]: X = np.linspace(0,100) | ||||
In [39]: from math import sqrt | ||||
In [40]: add = lambda a,b: a+b | ||||
In [41]: sq = lambda x: x*x | ||||
In [42]: sqrt(reduce(add, map(sq, X)) / len(X)) | ||||
Out[42]: 58.028845747399714 | ||||
In [43]: sqrt(reduce(add, view.map(sq, X)) / len(X)) | ||||
Out[43]: 58.028845747399714 | ||||
To break that down: | ||||
1. ``map(sq, X)`` Compute the square of each element in the list (locally, or in parallel) | ||||
2. ``reduce(add, sqX) / len(X)`` compute the mean by summing over the list (or AsyncMapResult) | ||||
and dividing by the size | ||||
3. take the square root of the resulting number | ||||
MinRK
|
r6472 | |||
.. seealso:: | ||||
When AsyncResult or the AsyncMapResult don't provide what you need (for instance, | ||||
handling individual results as they arrive, but with metadata), you can always | ||||
just split the original result's ``msg_ids`` attribute, and handle them as you like. | ||||
For an example of this, see :file:`docs/examples/parallel/customresult.py` | ||||