|
|
"""Example of iteration through AsyncMapResult, without waiting for all results
|
|
|
|
|
|
Authors
|
|
|
-------
|
|
|
* MinRK
|
|
|
"""
|
|
|
import time
|
|
|
|
|
|
from IPython import parallel
|
|
|
|
|
|
# create client & view
|
|
|
rc = parallel.Client()
|
|
|
dv = rc[:]
|
|
|
v = rc.load_balanced_view()
|
|
|
|
|
|
# scatter 'id', so id=0,1,2 on engines 0,1,2
|
|
|
dv.scatter('id', rc.ids, flatten=True)
|
|
|
print "Engine IDs: ", dv['id']
|
|
|
|
|
|
# create a Reference to `id`. This will be a different value on each engine
|
|
|
ref = parallel.Reference('id')
|
|
|
print "sleeping for `id` seconds on each engine"
|
|
|
tic = time.time()
|
|
|
ar = dv.apply(time.sleep, ref)
|
|
|
for i,r in enumerate(ar):
|
|
|
print "%i: %.3f"%(i, time.time()-tic)
|
|
|
|
|
|
def sleep_here(t):
|
|
|
import time
|
|
|
time.sleep(t)
|
|
|
return id,t
|
|
|
|
|
|
# one call per task
|
|
|
print "running with one call per task"
|
|
|
amr = v.map(sleep_here, [.01*t for t in range(100)])
|
|
|
tic = time.time()
|
|
|
for i,r in enumerate(amr):
|
|
|
print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
|
|
|
|
|
|
print "running with four calls per task"
|
|
|
# with chunksize, we can have four calls per task
|
|
|
amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
|
|
|
tic = time.time()
|
|
|
for i,r in enumerate(amr):
|
|
|
print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
|
|
|
|
|
|
print "running with two calls per task, with unordered results"
|
|
|
# We can even iterate through faster results first, with ordered=False
|
|
|
amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
|
|
|
tic = time.time()
|
|
|
for i,r in enumerate(amr):
|
|
|
print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
|
|
|
|