|
|
"""An example for handling results in a way that AsyncMapResult doesn't provide
|
|
|
|
|
|
Specifically, out-of-order results with some special handing of metadata.
|
|
|
|
|
|
This just submits a bunch of jobs, waits on the results, and prints the stdout
|
|
|
and results of each as they finish.
|
|
|
|
|
|
Authors
|
|
|
-------
|
|
|
* MinRK
|
|
|
"""
|
|
|
import time
|
|
|
import random
|
|
|
|
|
|
from IPython import parallel
|
|
|
|
|
|
# create client & views
|
|
|
rc = parallel.Client()
|
|
|
dv = rc[:]
|
|
|
v = rc.load_balanced_view()
|
|
|
|
|
|
|
|
|
# scatter 'id', so id=0,1,2 on engines 0,1,2
|
|
|
dv.scatter('id', rc.ids, flatten=True)
|
|
|
print(dv['id'])
|
|
|
|
|
|
|
|
|
def sleep_here(count, t):
|
|
|
"""simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
|
|
|
import time,sys
|
|
|
print("hi from engine %i" % id)
|
|
|
sys.stdout.flush()
|
|
|
time.sleep(t)
|
|
|
return count,t
|
|
|
|
|
|
amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)
|
|
|
|
|
|
pending = set(amr.msg_ids)
|
|
|
while pending:
|
|
|
try:
|
|
|
rc.wait(pending, 1e-3)
|
|
|
except parallel.TimeoutError:
|
|
|
# ignore timeouterrors, since they only mean that at least one isn't done
|
|
|
pass
|
|
|
# finished is the set of msg_ids that are complete
|
|
|
finished = pending.difference(rc.outstanding)
|
|
|
# update pending to exclude those that just finished
|
|
|
pending = pending.difference(finished)
|
|
|
for msg_id in finished:
|
|
|
# we know these are done, so don't worry about blocking
|
|
|
ar = rc.get_result(msg_id)
|
|
|
print("job id %s finished on engine %i" % (msg_id, ar.engine_id))
|
|
|
print("with stdout:")
|
|
|
print(' ' + ar.stdout.replace('\n', '\n ').rstrip())
|
|
|
print("and results:")
|
|
|
|
|
|
# note that each job in a map always returns a list of length chunksize
|
|
|
# even if chunksize == 1
|
|
|
for (count,t) in ar.result:
|
|
|
print(" item %i: slept for %.2fs" % (count, t))
|
|
|
|
|
|
|