bintree_script.py
87 lines
| 2.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r5925 | #!/usr/bin/env python | ||
MinRK
|
r5924 | """ | ||
Script for setting up and using [all]reduce with a binary-tree engine interconnect. | ||||
usage: `python bintree_script.py` | ||||
MinRK
|
r6656 | This spanning tree strategy ensures that a single node node mailbox will never | ||
receive more that 2 messages at once. This is very important to scale to large | ||||
clusters (e.g. 1000 nodes) since if you have many incoming messages of a couple | ||||
of megabytes you might saturate the network interface of a single node and | ||||
potentially its memory buffers if the messages are not consumed in a streamed | ||||
manner. | ||||
Note that the AllReduce scheme implemented with the spanning tree strategy | ||||
impose the aggregation function to be commutative and distributive. It might | ||||
not be the case if you implement the naive gather / reduce / broadcast strategy | ||||
where you can reorder the partial data before performing the reduce. | ||||
MinRK
|
r5924 | """ | ||
from IPython.parallel import Client, Reference | ||||
# connect client and create views | ||||
rc = Client() | ||||
rc.block=True | ||||
ids = rc.ids | ||||
root_id = ids[0] | ||||
root = rc[root_id] | ||||
view = rc[:] | ||||
# run bintree.py script defining bintree functions, etc. | ||||
execfile('bintree.py') | ||||
# generate binary tree of parents | ||||
btree = bintree(ids) | ||||
print "setting up binary tree interconnect:" | ||||
print_bintree(btree) | ||||
view.run('bintree.py') | ||||
view.scatter('id', ids, flatten=True) | ||||
view['root_id'] = root_id | ||||
# create the Communicator objects on the engines | ||||
view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )') | ||||
pub_url = root.apply_sync(lambda : com.pub_url) | ||||
# gather the connection information into a dict | ||||
ar = view.apply_async(lambda : com.info) | ||||
peers = ar.get_dict() | ||||
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | ||||
# connect the engines to each other: | ||||
def connect(com, peers, tree, pub_url, root_id): | ||||
"""this function will be called on the engines""" | ||||
com.connect(peers, tree, pub_url, root_id) | ||||
view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id) | ||||
# functions that can be used for reductions | ||||
# max and min builtins can be used as well | ||||
def add(a,b): | ||||
"""cumulative sum reduction""" | ||||
return a+b | ||||
def mul(a,b): | ||||
"""cumulative product reduction""" | ||||
return a*b | ||||
view['add'] = add | ||||
view['mul'] = mul | ||||
# scatter some data | ||||
data = range(1000) | ||||
view.scatter('data', data) | ||||
# perform cumulative sum via allreduce | ||||
view.execute("data_sum = com.allreduce(add, data, flat=False)") | ||||
print "allreduce sum of data on all engines:", view['data_sum'] | ||||
# perform cumulative sum *without* final broadcast | ||||
# when not broadcasting with allreduce, the final result resides on the root node: | ||||
view.execute("ids_sum = com.reduce(add, id, flat=True)") | ||||
print "reduce sum of engine ids (not broadcast):", root['ids_sum'] | ||||
print "partial result on each engine:", view['ids_sum'] | ||||