##// END OF EJS Templates
add bintree paragraphs by @ogrisel...
MinRK -
Show More
@@ -1,79 +1,87 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Script for setting up and using [all]reduce with a binary-tree engine interconnect.
3 Script for setting up and using [all]reduce with a binary-tree engine interconnect.
4
4
5 Binary trees allow large data communications to be highly scalable, because even
6 in a global communication, never more than two messages are occupant on a single node.
7
8 usage: `python bintree_script.py`
5 usage: `python bintree_script.py`
9
6
7 This spanning tree strategy ensures that a single node node mailbox will never
8 receive more that 2 messages at once. This is very important to scale to large
9 clusters (e.g. 1000 nodes) since if you have many incoming messages of a couple
10 of megabytes you might saturate the network interface of a single node and
11 potentially its memory buffers if the messages are not consumed in a streamed
12 manner.
13
14 Note that the AllReduce scheme implemented with the spanning tree strategy
15 impose the aggregation function to be commutative and distributive. It might
16 not be the case if you implement the naive gather / reduce / broadcast strategy
17 where you can reorder the partial data before performing the reduce.
10 """
18 """
11
19
12 from IPython.parallel import Client, Reference
20 from IPython.parallel import Client, Reference
13
21
14
22
15 # connect client and create views
23 # connect client and create views
16 rc = Client()
24 rc = Client()
17 rc.block=True
25 rc.block=True
18 ids = rc.ids
26 ids = rc.ids
19
27
20 root_id = ids[0]
28 root_id = ids[0]
21 root = rc[root_id]
29 root = rc[root_id]
22
30
23 view = rc[:]
31 view = rc[:]
24
32
25 # run bintree.py script defining bintree functions, etc.
33 # run bintree.py script defining bintree functions, etc.
26 execfile('bintree.py')
34 execfile('bintree.py')
27
35
28 # generate binary tree of parents
36 # generate binary tree of parents
29 btree = bintree(ids)
37 btree = bintree(ids)
30
38
31 print "setting up binary tree interconnect:"
39 print "setting up binary tree interconnect:"
32 print_bintree(btree)
40 print_bintree(btree)
33
41
34 view.run('bintree.py')
42 view.run('bintree.py')
35 view.scatter('id', ids, flatten=True)
43 view.scatter('id', ids, flatten=True)
36 view['root_id'] = root_id
44 view['root_id'] = root_id
37
45
38 # create the Communicator objects on the engines
46 # create the Communicator objects on the engines
39 view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )')
47 view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )')
40 pub_url = root.apply_sync(lambda : com.pub_url)
48 pub_url = root.apply_sync(lambda : com.pub_url)
41
49
42 # gather the connection information into a dict
50 # gather the connection information into a dict
43 ar = view.apply_async(lambda : com.info)
51 ar = view.apply_async(lambda : com.info)
44 peers = ar.get_dict()
52 peers = ar.get_dict()
45 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
53 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
46
54
47 # connect the engines to each other:
55 # connect the engines to each other:
48 def connect(com, peers, tree, pub_url, root_id):
56 def connect(com, peers, tree, pub_url, root_id):
49 """this function will be called on the engines"""
57 """this function will be called on the engines"""
50 com.connect(peers, tree, pub_url, root_id)
58 com.connect(peers, tree, pub_url, root_id)
51
59
52 view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id)
60 view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id)
53
61
54 # functions that can be used for reductions
62 # functions that can be used for reductions
55 # max and min builtins can be used as well
63 # max and min builtins can be used as well
56 def add(a,b):
64 def add(a,b):
57 """cumulative sum reduction"""
65 """cumulative sum reduction"""
58 return a+b
66 return a+b
59
67
60 def mul(a,b):
68 def mul(a,b):
61 """cumulative product reduction"""
69 """cumulative product reduction"""
62 return a*b
70 return a*b
63
71
64 view['add'] = add
72 view['add'] = add
65 view['mul'] = mul
73 view['mul'] = mul
66
74
67 # scatter some data
75 # scatter some data
68 data = range(1000)
76 data = range(1000)
69 view.scatter('data', data)
77 view.scatter('data', data)
70
78
71 # perform cumulative sum via allreduce
79 # perform cumulative sum via allreduce
72 view.execute("data_sum = com.allreduce(add, data, flat=False)")
80 view.execute("data_sum = com.allreduce(add, data, flat=False)")
73 print "allreduce sum of data on all engines:", view['data_sum']
81 print "allreduce sum of data on all engines:", view['data_sum']
74
82
75 # perform cumulative sum *without* final broadcast
83 # perform cumulative sum *without* final broadcast
76 # when not broadcasting with allreduce, the final result resides on the root node:
84 # when not broadcasting with allreduce, the final result resides on the root node:
77 view.execute("ids_sum = com.reduce(add, id, flat=True)")
85 view.execute("ids_sum = com.reduce(add, id, flat=True)")
78 print "reduce sum of engine ids (not broadcast):", root['ids_sum']
86 print "reduce sum of engine ids (not broadcast):", root['ids_sum']
79 print "partial result on each engine:", view['ids_sum']
87 print "partial result on each engine:", view['ids_sum']
General Comments 0
You need to be logged in to leave comments. Login now