##// END OF EJS Templates
Merge pull request #2808 from minrk/parallel_wait...
Merge pull request #2808 from minrk/parallel_wait improve patience for slow Hub in client tests adds a first step in `_wait_for_idle`, where it waits for all tasks to arrive before waiting for no tasks to be running. On a super slow machine, it was possible for `_wait_for_idle` to return prematurely, before tasks had even started. closes #2807

File last commit:

r8141:8194b678
r9181:9db88115 merge
Show More
publish_data.ipynb
251 lines | 60.6 KiB | text/plain | TextLexer

IPython's Data Publication API¶

IPython has an API that allows IPython Engines to publish data back to the Client. This example shows how this API works.

Setup¶

We begin by enabling pylab mode and creating a Client object to work with an IPython cluster.

In [48]:
%pylab inline
Welcome to pylab, a matplotlib-based Python environment [backend: module://IPython.zmq.pylab.backend_inline].
For more information, type 'help(pylab)'.
In [12]:
from IPython.parallel import Client
In [13]:
c = Client()
dv = c[:]
dv.block = False

Simple publication¶

Here is a simple Python function we are going to run on the Engines. This function uses publish_data to publish a simple Python dictionary when it is run.

In [14]:
def publish_it():
    from IPython.zmq.datapub import publish_data
    publish_data(dict(a='hi'))

We run the function on the Engines using apply_async and save the returned AsyncResult object:

In [15]:
ar = dv.apply_async(publish_it)

The published data from each engine is then available under the .data attribute of the AsyncResult object.

In [16]:
ar.data
Out[16]:
[{'a': 'hi'}, {'a': 'hi'}, {'a': 'hi'}, {'a': 'hi'}]

Each time publish_data is called, the .data attribute is updated with the most recently published data.

Simulation loop¶

In many cases, the Engines will be running a simulation loop and we will want to publish data at each time step of the simulation. To show how this works, we create a mock simulation function that iterates over a loop and publishes a NumPy array and loop variable at each time step. By inserting a call to time.sleep(1), we ensure that new data will be published every second.

In [57]:
def simulation_loop():
    from IPython.zmq.datapub import publish_data
    import time
    import numpy as np
    for i in range(10):
        publish_data(dict(a=np.random.rand(20), i=i))
        time.sleep(1)

Again, we run the simulation_loop function in parallel using apply_async and save the returned AsyncResult object.

In [58]:
ar = dv.apply_async(simulation_loop)

New data will be published by the Engines every second. Anytime we access ar.data, we will get the most recently published data.

In [61]:
data = ar.data
for i, d in enumerate(data):
    plot(d['a'], label='engine: '+str(i))
title('Data published at time step: ' + str(data[0]['i']))
legend()
Out[61]:
<matplotlib.legend.Legend at 0x10a8ed8d0>
No description has been provided for this image