Show More
@@ -1,79 +1,87 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """ |
|
2 | """ | |
3 | Script for setting up and using [all]reduce with a binary-tree engine interconnect. |
|
3 | Script for setting up and using [all]reduce with a binary-tree engine interconnect. | |
4 |
|
4 | |||
5 | Binary trees allow large data communications to be highly scalable, because even |
|
|||
6 | in a global communication, never more than two messages are occupant on a single node. |
|
|||
7 |
|
||||
8 | usage: `python bintree_script.py` |
|
5 | usage: `python bintree_script.py` | |
9 |
|
6 | |||
|
7 | This spanning tree strategy ensures that a single node node mailbox will never | |||
|
8 | receive more that 2 messages at once. This is very important to scale to large | |||
|
9 | clusters (e.g. 1000 nodes) since if you have many incoming messages of a couple | |||
|
10 | of megabytes you might saturate the network interface of a single node and | |||
|
11 | potentially its memory buffers if the messages are not consumed in a streamed | |||
|
12 | manner. | |||
|
13 | ||||
|
14 | Note that the AllReduce scheme implemented with the spanning tree strategy | |||
|
15 | impose the aggregation function to be commutative and distributive. It might | |||
|
16 | not be the case if you implement the naive gather / reduce / broadcast strategy | |||
|
17 | where you can reorder the partial data before performing the reduce. | |||
10 | """ |
|
18 | """ | |
11 |
|
19 | |||
12 | from IPython.parallel import Client, Reference |
|
20 | from IPython.parallel import Client, Reference | |
13 |
|
21 | |||
14 |
|
22 | |||
15 | # connect client and create views |
|
23 | # connect client and create views | |
16 | rc = Client() |
|
24 | rc = Client() | |
17 | rc.block=True |
|
25 | rc.block=True | |
18 | ids = rc.ids |
|
26 | ids = rc.ids | |
19 |
|
27 | |||
20 | root_id = ids[0] |
|
28 | root_id = ids[0] | |
21 | root = rc[root_id] |
|
29 | root = rc[root_id] | |
22 |
|
30 | |||
23 | view = rc[:] |
|
31 | view = rc[:] | |
24 |
|
32 | |||
25 | # run bintree.py script defining bintree functions, etc. |
|
33 | # run bintree.py script defining bintree functions, etc. | |
26 | execfile('bintree.py') |
|
34 | execfile('bintree.py') | |
27 |
|
35 | |||
28 | # generate binary tree of parents |
|
36 | # generate binary tree of parents | |
29 | btree = bintree(ids) |
|
37 | btree = bintree(ids) | |
30 |
|
38 | |||
31 | print "setting up binary tree interconnect:" |
|
39 | print "setting up binary tree interconnect:" | |
32 | print_bintree(btree) |
|
40 | print_bintree(btree) | |
33 |
|
41 | |||
34 | view.run('bintree.py') |
|
42 | view.run('bintree.py') | |
35 | view.scatter('id', ids, flatten=True) |
|
43 | view.scatter('id', ids, flatten=True) | |
36 | view['root_id'] = root_id |
|
44 | view['root_id'] = root_id | |
37 |
|
45 | |||
38 | # create the Communicator objects on the engines |
|
46 | # create the Communicator objects on the engines | |
39 | view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )') |
|
47 | view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )') | |
40 | pub_url = root.apply_sync(lambda : com.pub_url) |
|
48 | pub_url = root.apply_sync(lambda : com.pub_url) | |
41 |
|
49 | |||
42 | # gather the connection information into a dict |
|
50 | # gather the connection information into a dict | |
43 | ar = view.apply_async(lambda : com.info) |
|
51 | ar = view.apply_async(lambda : com.info) | |
44 | peers = ar.get_dict() |
|
52 | peers = ar.get_dict() | |
45 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators |
|
53 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |
46 |
|
54 | |||
47 | # connect the engines to each other: |
|
55 | # connect the engines to each other: | |
48 | def connect(com, peers, tree, pub_url, root_id): |
|
56 | def connect(com, peers, tree, pub_url, root_id): | |
49 | """this function will be called on the engines""" |
|
57 | """this function will be called on the engines""" | |
50 | com.connect(peers, tree, pub_url, root_id) |
|
58 | com.connect(peers, tree, pub_url, root_id) | |
51 |
|
59 | |||
52 | view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id) |
|
60 | view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id) | |
53 |
|
61 | |||
54 | # functions that can be used for reductions |
|
62 | # functions that can be used for reductions | |
55 | # max and min builtins can be used as well |
|
63 | # max and min builtins can be used as well | |
56 | def add(a,b): |
|
64 | def add(a,b): | |
57 | """cumulative sum reduction""" |
|
65 | """cumulative sum reduction""" | |
58 | return a+b |
|
66 | return a+b | |
59 |
|
67 | |||
60 | def mul(a,b): |
|
68 | def mul(a,b): | |
61 | """cumulative product reduction""" |
|
69 | """cumulative product reduction""" | |
62 | return a*b |
|
70 | return a*b | |
63 |
|
71 | |||
64 | view['add'] = add |
|
72 | view['add'] = add | |
65 | view['mul'] = mul |
|
73 | view['mul'] = mul | |
66 |
|
74 | |||
67 | # scatter some data |
|
75 | # scatter some data | |
68 | data = range(1000) |
|
76 | data = range(1000) | |
69 | view.scatter('data', data) |
|
77 | view.scatter('data', data) | |
70 |
|
78 | |||
71 | # perform cumulative sum via allreduce |
|
79 | # perform cumulative sum via allreduce | |
72 | view.execute("data_sum = com.allreduce(add, data, flat=False)") |
|
80 | view.execute("data_sum = com.allreduce(add, data, flat=False)") | |
73 | print "allreduce sum of data on all engines:", view['data_sum'] |
|
81 | print "allreduce sum of data on all engines:", view['data_sum'] | |
74 |
|
82 | |||
75 | # perform cumulative sum *without* final broadcast |
|
83 | # perform cumulative sum *without* final broadcast | |
76 | # when not broadcasting with allreduce, the final result resides on the root node: |
|
84 | # when not broadcasting with allreduce, the final result resides on the root node: | |
77 | view.execute("ids_sum = com.reduce(add, id, flat=True)") |
|
85 | view.execute("ids_sum = com.reduce(add, id, flat=True)") | |
78 | print "reduce sum of engine ids (not broadcast):", root['ids_sum'] |
|
86 | print "reduce sum of engine ids (not broadcast):", root['ids_sum'] | |
79 | print "partial result on each engine:", view['ids_sum'] |
|
87 | print "partial result on each engine:", view['ids_sum'] |
General Comments 0
You need to be logged in to leave comments.
Login now