customresults.py
61 lines
| 1.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r5169 | """An example for handling results in a way that AsyncMapResult doesn't provide | ||
Specifically, out-of-order results with some special handing of metadata. | ||||
This just submits a bunch of jobs, waits on the results, and prints the stdout | ||||
and results of each as they finish. | ||||
Authors | ||||
------- | ||||
* MinRK | ||||
""" | ||||
import time | ||||
import random | ||||
from IPython import parallel | ||||
# create client & views | ||||
rc = parallel.Client() | ||||
dv = rc[:] | ||||
v = rc.load_balanced_view() | ||||
# scatter 'id', so id=0,1,2 on engines 0,1,2 | ||||
dv.scatter('id', rc.ids, flatten=True) | ||||
Thomas Kluyver
|
r6455 | print(dv['id']) | ||
MinRK
|
r5169 | |||
def sleep_here(count, t): | ||||
"""simple function that takes args, prints a short message, sleeps for a time, and returns the same args""" | ||||
import time,sys | ||||
Thomas Kluyver
|
r6455 | print("hi from engine %i" % id) | ||
MinRK
|
r5169 | sys.stdout.flush() | ||
time.sleep(t) | ||||
return count,t | ||||
amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2) | ||||
pending = set(amr.msg_ids) | ||||
while pending: | ||||
try: | ||||
rc.wait(pending, 1e-3) | ||||
except parallel.TimeoutError: | ||||
# ignore timeouterrors, since they only mean that at least one isn't done | ||||
pass | ||||
# finished is the set of msg_ids that are complete | ||||
finished = pending.difference(rc.outstanding) | ||||
# update pending to exclude those that just finished | ||||
pending = pending.difference(finished) | ||||
for msg_id in finished: | ||||
# we know these are done, so don't worry about blocking | ||||
ar = rc.get_result(msg_id) | ||||
Thomas Kluyver
|
r6455 | print("job id %s finished on engine %i" % (msg_id, ar.engine_id)) | ||
print("with stdout:") | ||||
print(' ' + ar.stdout.replace('\n', '\n ').rstrip()) | ||||
print("and results:") | ||||
MinRK
|
r5169 | |||
# note that each job in a map always returns a list of length chunksize | ||||
# even if chunksize == 1 | ||||
for (count,t) in ar.result: | ||||
Thomas Kluyver
|
r6455 | print(" item %i: slept for %.2fs" % (count, t)) | ||
MinRK
|
r5169 | |||