|
|
#!/usr/bin/env python
|
|
|
"""
|
|
|
Script for setting up and using [all]reduce with a binary-tree engine interconnect.
|
|
|
|
|
|
usage: `python bintree_script.py`
|
|
|
|
|
|
This spanning tree strategy ensures that a single node node mailbox will never
|
|
|
receive more that 2 messages at once. This is very important to scale to large
|
|
|
clusters (e.g. 1000 nodes) since if you have many incoming messages of a couple
|
|
|
of megabytes you might saturate the network interface of a single node and
|
|
|
potentially its memory buffers if the messages are not consumed in a streamed
|
|
|
manner.
|
|
|
|
|
|
Note that the AllReduce scheme implemented with the spanning tree strategy
|
|
|
impose the aggregation function to be commutative and distributive. It might
|
|
|
not be the case if you implement the naive gather / reduce / broadcast strategy
|
|
|
where you can reorder the partial data before performing the reduce.
|
|
|
"""
|
|
|
|
|
|
from IPython.parallel import Client, Reference
|
|
|
|
|
|
|
|
|
# connect client and create views
|
|
|
rc = Client()
|
|
|
rc.block=True
|
|
|
ids = rc.ids
|
|
|
|
|
|
root_id = ids[0]
|
|
|
root = rc[root_id]
|
|
|
|
|
|
view = rc[:]
|
|
|
|
|
|
# run bintree.py script defining bintree functions, etc.
|
|
|
execfile('bintree.py')
|
|
|
|
|
|
# generate binary tree of parents
|
|
|
btree = bintree(ids)
|
|
|
|
|
|
print "setting up binary tree interconnect:"
|
|
|
print_bintree(btree)
|
|
|
|
|
|
view.run('bintree.py')
|
|
|
view.scatter('id', ids, flatten=True)
|
|
|
view['root_id'] = root_id
|
|
|
|
|
|
# create the Communicator objects on the engines
|
|
|
view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )')
|
|
|
pub_url = root.apply_sync(lambda : com.pub_url)
|
|
|
|
|
|
# gather the connection information into a dict
|
|
|
ar = view.apply_async(lambda : com.info)
|
|
|
peers = ar.get_dict()
|
|
|
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
|
|
|
|
|
|
# connect the engines to each other:
|
|
|
def connect(com, peers, tree, pub_url, root_id):
|
|
|
"""this function will be called on the engines"""
|
|
|
com.connect(peers, tree, pub_url, root_id)
|
|
|
|
|
|
view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id)
|
|
|
|
|
|
# functions that can be used for reductions
|
|
|
# max and min builtins can be used as well
|
|
|
def add(a,b):
|
|
|
"""cumulative sum reduction"""
|
|
|
return a+b
|
|
|
|
|
|
def mul(a,b):
|
|
|
"""cumulative product reduction"""
|
|
|
return a*b
|
|
|
|
|
|
view['add'] = add
|
|
|
view['mul'] = mul
|
|
|
|
|
|
# scatter some data
|
|
|
data = range(1000)
|
|
|
view.scatter('data', data)
|
|
|
|
|
|
# perform cumulative sum via allreduce
|
|
|
view.execute("data_sum = com.allreduce(add, data, flat=False)")
|
|
|
print "allreduce sum of data on all engines:", view['data_sum']
|
|
|
|
|
|
# perform cumulative sum *without* final broadcast
|
|
|
# when not broadcasting with allreduce, the final result resides on the root node:
|
|
|
view.execute("ids_sum = com.reduce(add, id, flat=True)")
|
|
|
print "reduce sum of engine ids (not broadcast):", root['ids_sum']
|
|
|
print "partial result on each engine:", view['ids_sum']
|
|
|
|