Show More
@@ -1,52 +1,65 b'' | |||||
1 | """Example of iteration through AsyncMapResult, without waiting for all results |
|
1 | """Example of iteration through AsyncMapResults, without waiting for all results | |
|
2 | ||||
|
3 | When you call view.map(func, sequence), you will receive a special AsyncMapResult | |||
|
4 | object. These objects are used to reconstruct the results of the split call. | |||
|
5 | One feature AsyncResults provide is that they are iterable *immediately*, so | |||
|
6 | you can iterate through the actual results as they complete. | |||
|
7 | ||||
|
8 | This is useful if you submit a large number of tasks that may take some time, | |||
|
9 | but want to perform logic on elements in the result, or even abort subsequent | |||
|
10 | tasks in cases where you are searching for the first affirmative result. | |||
|
11 | ||||
|
12 | By default, the results will match the ordering of the submitted sequence, but | |||
|
13 | if you call `map(...ordered=False)`, then results will be provided to the iterator | |||
|
14 | on a first come first serve basis. | |||
2 |
|
15 | |||
3 | Authors |
|
16 | Authors | |
4 | ------- |
|
17 | ------- | |
5 | * MinRK |
|
18 | * MinRK | |
6 | """ |
|
19 | """ | |
7 | import time |
|
20 | import time | |
8 |
|
21 | |||
9 | from IPython import parallel |
|
22 | from IPython import parallel | |
10 |
|
23 | |||
11 | # create client & view |
|
24 | # create client & view | |
12 | rc = parallel.Client() |
|
25 | rc = parallel.Client() | |
13 | dv = rc[:] |
|
26 | dv = rc[:] | |
14 | v = rc.load_balanced_view() |
|
27 | v = rc.load_balanced_view() | |
15 |
|
28 | |||
16 | # scatter 'id', so id=0,1,2 on engines 0,1,2 |
|
29 | # scatter 'id', so id=0,1,2 on engines 0,1,2 | |
17 | dv.scatter('id', rc.ids, flatten=True) |
|
30 | dv.scatter('id', rc.ids, flatten=True) | |
18 | print "Engine IDs: ", dv['id'] |
|
31 | print "Engine IDs: ", dv['id'] | |
19 |
|
32 | |||
20 | # create a Reference to `id`. This will be a different value on each engine |
|
33 | # create a Reference to `id`. This will be a different value on each engine | |
21 | ref = parallel.Reference('id') |
|
34 | ref = parallel.Reference('id') | |
22 | print "sleeping for `id` seconds on each engine" |
|
35 | print "sleeping for `id` seconds on each engine" | |
23 | tic = time.time() |
|
36 | tic = time.time() | |
24 | ar = dv.apply(time.sleep, ref) |
|
37 | ar = dv.apply(time.sleep, ref) | |
25 | for i,r in enumerate(ar): |
|
38 | for i,r in enumerate(ar): | |
26 | print "%i: %.3f"%(i, time.time()-tic) |
|
39 | print "%i: %.3f"%(i, time.time()-tic) | |
27 |
|
40 | |||
28 | def sleep_here(t): |
|
41 | def sleep_here(t): | |
29 | import time |
|
42 | import time | |
30 | time.sleep(t) |
|
43 | time.sleep(t) | |
31 | return id,t |
|
44 | return id,t | |
32 |
|
45 | |||
33 | # one call per task |
|
46 | # one call per task | |
34 | print "running with one call per task" |
|
47 | print "running with one call per task" | |
35 | amr = v.map(sleep_here, [.01*t for t in range(100)]) |
|
48 | amr = v.map(sleep_here, [.01*t for t in range(100)]) | |
36 | tic = time.time() |
|
49 | tic = time.time() | |
37 | for i,r in enumerate(amr): |
|
50 | for i,r in enumerate(amr): | |
38 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) |
|
51 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) | |
39 |
|
52 | |||
40 | print "running with four calls per task" |
|
53 | print "running with four calls per task" | |
41 | # with chunksize, we can have four calls per task |
|
54 | # with chunksize, we can have four calls per task | |
42 | amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4) |
|
55 | amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4) | |
43 | tic = time.time() |
|
56 | tic = time.time() | |
44 | for i,r in enumerate(amr): |
|
57 | for i,r in enumerate(amr): | |
45 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) |
|
58 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) | |
46 |
|
59 | |||
47 | print "running with two calls per task, with unordered results" |
|
60 | print "running with two calls per task, with unordered results" | |
48 | # We can even iterate through faster results first, with ordered=False |
|
61 | # We can even iterate through faster results first, with ordered=False | |
49 | amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2) |
|
62 | amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2) | |
50 | tic = time.time() |
|
63 | tic = time.time() | |
51 | for i,r in enumerate(amr): |
|
64 | for i,r in enumerate(amr): | |
52 | print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic) |
|
65 | print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic) |
General Comments 0
You need to be logged in to leave comments.
Login now