itermapresult.py
67 lines
| 2.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r5217 | """Example of iteration through AsyncMapResults, without waiting for all results | ||
When you call view.map(func, sequence), you will receive a special AsyncMapResult | ||||
object. These objects are used to reconstruct the results of the split call. | ||||
One feature AsyncResults provide is that they are iterable *immediately*, so | ||||
you can iterate through the actual results as they complete. | ||||
This is useful if you submit a large number of tasks that may take some time, | ||||
but want to perform logic on elements in the result, or even abort subsequent | ||||
tasks in cases where you are searching for the first affirmative result. | ||||
By default, the results will match the ordering of the submitted sequence, but | ||||
if you call `map(...ordered=False)`, then results will be provided to the iterator | ||||
on a first come first serve basis. | ||||
MinRK
|
r5169 | |||
Authors | ||||
------- | ||||
* MinRK | ||||
""" | ||||
Thomas Kluyver
|
r6455 | from __future__ import print_function | ||
MinRK
|
r5169 | import time | ||
from IPython import parallel | ||||
# create client & view | ||||
rc = parallel.Client() | ||||
dv = rc[:] | ||||
v = rc.load_balanced_view() | ||||
# scatter 'id', so id=0,1,2 on engines 0,1,2 | ||||
dv.scatter('id', rc.ids, flatten=True) | ||||
Thomas Kluyver
|
r6455 | print("Engine IDs: ", dv['id']) | ||
MinRK
|
r5169 | |||
# create a Reference to `id`. This will be a different value on each engine | ||||
ref = parallel.Reference('id') | ||||
Thomas Kluyver
|
r6455 | print("sleeping for `id` seconds on each engine") | ||
MinRK
|
r5169 | tic = time.time() | ||
ar = dv.apply(time.sleep, ref) | ||||
for i,r in enumerate(ar): | ||||
Thomas Kluyver
|
r6455 | print("%i: %.3f"%(i, time.time()-tic)) | ||
MinRK
|
r5169 | |||
def sleep_here(t): | ||||
import time | ||||
time.sleep(t) | ||||
return id,t | ||||
# one call per task | ||||
Thomas Kluyver
|
r6455 | print("running with one call per task") | ||
MinRK
|
r5169 | amr = v.map(sleep_here, [.01*t for t in range(100)]) | ||
tic = time.time() | ||||
for i,r in enumerate(amr): | ||||
Thomas Kluyver
|
r6455 | print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)) | ||
MinRK
|
r5169 | |||
Thomas Kluyver
|
r6455 | print("running with four calls per task") | ||
MinRK
|
r5169 | # with chunksize, we can have four calls per task | ||
amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4) | ||||
tic = time.time() | ||||
for i,r in enumerate(amr): | ||||
Thomas Kluyver
|
r6455 | print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)) | ||
MinRK
|
r5169 | |||
Thomas Kluyver
|
r6455 | print("running with two calls per task, with unordered results") | ||
MinRK
|
r5169 | # We can even iterate through faster results first, with ordered=False | ||
amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2) | ||||
tic = time.time() | ||||
for i,r in enumerate(amr): | ||||
Thomas Kluyver
|
r6455 | print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)) | ||