Show More
@@ -1,52 +1,65 b'' | |||
|
1 | """Example of iteration through AsyncMapResult, without waiting for all results | |
|
1 | """Example of iteration through AsyncMapResults, without waiting for all results | |
|
2 | ||
|
3 | When you call view.map(func, sequence), you will receive a special AsyncMapResult | |
|
4 | object. These objects are used to reconstruct the results of the split call. | |
|
5 | One feature AsyncResults provide is that they are iterable *immediately*, so | |
|
6 | you can iterate through the actual results as they complete. | |
|
7 | ||
|
8 | This is useful if you submit a large number of tasks that may take some time, | |
|
9 | but want to perform logic on elements in the result, or even abort subsequent | |
|
10 | tasks in cases where you are searching for the first affirmative result. | |
|
11 | ||
|
12 | By default, the results will match the ordering of the submitted sequence, but | |
|
13 | if you call `map(...ordered=False)`, then results will be provided to the iterator | |
|
14 | on a first come first serve basis. | |
|
2 | 15 | |
|
3 | 16 | Authors |
|
4 | 17 | ------- |
|
5 | 18 | * MinRK |
|
6 | 19 | """ |
|
7 | 20 | import time |
|
8 | 21 | |
|
9 | 22 | from IPython import parallel |
|
10 | 23 | |
|
11 | 24 | # create client & view |
|
12 | 25 | rc = parallel.Client() |
|
13 | 26 | dv = rc[:] |
|
14 | 27 | v = rc.load_balanced_view() |
|
15 | 28 | |
|
16 | 29 | # scatter 'id', so id=0,1,2 on engines 0,1,2 |
|
17 | 30 | dv.scatter('id', rc.ids, flatten=True) |
|
18 | 31 | print "Engine IDs: ", dv['id'] |
|
19 | 32 | |
|
20 | 33 | # create a Reference to `id`. This will be a different value on each engine |
|
21 | 34 | ref = parallel.Reference('id') |
|
22 | 35 | print "sleeping for `id` seconds on each engine" |
|
23 | 36 | tic = time.time() |
|
24 | 37 | ar = dv.apply(time.sleep, ref) |
|
25 | 38 | for i,r in enumerate(ar): |
|
26 | 39 | print "%i: %.3f"%(i, time.time()-tic) |
|
27 | 40 | |
|
28 | 41 | def sleep_here(t): |
|
29 | 42 | import time |
|
30 | 43 | time.sleep(t) |
|
31 | 44 | return id,t |
|
32 | 45 | |
|
33 | 46 | # one call per task |
|
34 | 47 | print "running with one call per task" |
|
35 | 48 | amr = v.map(sleep_here, [.01*t for t in range(100)]) |
|
36 | 49 | tic = time.time() |
|
37 | 50 | for i,r in enumerate(amr): |
|
38 | 51 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) |
|
39 | 52 | |
|
40 | 53 | print "running with four calls per task" |
|
41 | 54 | # with chunksize, we can have four calls per task |
|
42 | 55 | amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4) |
|
43 | 56 | tic = time.time() |
|
44 | 57 | for i,r in enumerate(amr): |
|
45 | 58 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) |
|
46 | 59 | |
|
47 | 60 | print "running with two calls per task, with unordered results" |
|
48 | 61 | # We can even iterate through faster results first, with ordered=False |
|
49 | 62 | amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2) |
|
50 | 63 | tic = time.time() |
|
51 | 64 | for i,r in enumerate(amr): |
|
52 | 65 | print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic) |
General Comments 0
You need to be logged in to leave comments.
Login now