##// END OF EJS Templates
Merge pull request #1295 from minrk/btree...
Fernando Perez -
r6662:eada8294 merge
parent child Browse files
Show More
@@ -0,0 +1,245 b''
1 """
2 BinaryTree inter-engine communication class
3
4 use from bintree_script.py
5
6 Provides parallel [all]reduce functionality
7
8 """
9
10 import cPickle as pickle
11 import re
12 import socket
13 import uuid
14
15 import zmq
16
17 from IPython.parallel.util import disambiguate_url
18
19
20 #----------------------------------------------------------------------------
21 # bintree-related construction/printing helpers
22 #----------------------------------------------------------------------------
23
24 def bintree(ids, parent=None):
25 """construct {child:parent} dict representation of a binary tree
26
27 keys are the nodes in the tree, and values are the parent of each node.
28
29 The root node has parent `parent`, default: None.
30
31 >>> tree = bintree(range(7))
32 >>> tree
33 {0: None, 1: 0, 2: 1, 3: 1, 4: 0, 5: 4, 6: 4}
34 >>> print_bintree(tree)
35 0
36 1
37 2
38 3
39 4
40 5
41 6
42 """
43 parents = {}
44 n = len(ids)
45 if n == 0:
46 return parents
47 root = ids[0]
48 parents[root] = parent
49 if len(ids) == 1:
50 return parents
51 else:
52 ids = ids[1:]
53 n = len(ids)
54 left = bintree(ids[:n/2], parent=root)
55 right = bintree(ids[n/2:], parent=root)
56 parents.update(left)
57 parents.update(right)
58 return parents
59
60 def reverse_bintree(parents):
61 """construct {parent:[children]} dict from {child:parent}
62
63 keys are the nodes in the tree, and values are the lists of children
64 of that node in the tree.
65
66 reverse_tree[None] is the root node
67
68 >>> tree = bintree(range(7))
69 >>> reverse_bintree(tree)
70 {None: 0, 0: [1, 4], 4: [5, 6], 1: [2, 3]}
71 """
72 children = {}
73 for child,parent in parents.iteritems():
74 if parent is None:
75 children[None] = child
76 continue
77 elif parent not in children:
78 children[parent] = []
79 children[parent].append(child)
80
81 return children
82
83 def depth(n, tree):
84 """get depth of an element in the tree"""
85 d = 0
86 parent = tree[n]
87 while parent is not None:
88 d += 1
89 parent = tree[parent]
90 return d
91
92 def print_bintree(tree, indent=' '):
93 """print a binary tree"""
94 for n in sorted(tree.keys()):
95 print "%s%s" % (indent * depth(n,tree), n)
96
97 #----------------------------------------------------------------------------
98 # Communicator class for a binary-tree map
99 #----------------------------------------------------------------------------
100
101 ip_pat = re.compile(r'^\d+\.\d+\.\d+\.\d+$')
102
103 def disambiguate_dns_url(url, location):
104 """accept either IP address or dns name, and return IP"""
105 if not ip_pat.match(location):
106 location = socket.gethostbyname(location)
107 return disambiguate_url(url, location)
108
109 class BinaryTreeCommunicator(object):
110
111 id = None
112 pub = None
113 sub = None
114 downstream = None
115 upstream = None
116 pub_url = None
117 tree_url = None
118
119 def __init__(self, id, interface='tcp://*', root=False):
120 self.id = id
121 self.root = root
122
123 # create context and sockets
124 self._ctx = zmq.Context()
125 if root:
126 self.pub = self._ctx.socket(zmq.PUB)
127 else:
128 self.sub = self._ctx.socket(zmq.SUB)
129 self.sub.setsockopt(zmq.SUBSCRIBE, b'')
130 self.downstream = self._ctx.socket(zmq.PULL)
131 self.upstream = self._ctx.socket(zmq.PUSH)
132
133 # bind to ports
134 interface_f = interface + ":%i"
135 if self.root:
136 pub_port = self.pub.bind_to_random_port(interface)
137 self.pub_url = interface_f % pub_port
138
139 tree_port = self.downstream.bind_to_random_port(interface)
140 self.tree_url = interface_f % tree_port
141 self.downstream_poller = zmq.Poller()
142 self.downstream_poller.register(self.downstream, zmq.POLLIN)
143
144 # guess first public IP from socket
145 self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0]
146
147 def __del__(self):
148 self.downstream.close()
149 self.upstream.close()
150 if self.root:
151 self.pub.close()
152 else:
153 self.sub.close()
154 self._ctx.term()
155
156 @property
157 def info(self):
158 """return the connection info for this object's sockets."""
159 return (self.tree_url, self.location)
160
161 def connect(self, peers, btree, pub_url, root_id=0):
162 """connect to peers. `peers` will be a dict of 4-tuples, keyed by name.
163 {peer : (ident, addr, pub_addr, location)}
164 where peer is the name, ident is the XREP identity, addr,pub_addr are the
165 """
166
167 # count the number of children we have
168 self.nchildren = btree.values().count(self.id)
169
170 if self.root:
171 return # root only binds
172
173 root_location = peers[root_id][-1]
174 self.sub.connect(disambiguate_dns_url(pub_url, root_location))
175
176 parent = btree[self.id]
177
178 tree_url, location = peers[parent]
179 self.upstream.connect(disambiguate_dns_url(tree_url, location))
180
181 def serialize(self, obj):
182 """serialize objects.
183
184 Must return list of sendable buffers.
185
186 Can be extended for more efficient/noncopying serialization of numpy arrays, etc.
187 """
188 return [pickle.dumps(obj)]
189
190 def unserialize(self, msg):
191 """inverse of serialize"""
192 return pickle.loads(msg[0])
193
194 def publish(self, value):
195 assert self.root
196 self.pub.send_multipart(self.serialize(value))
197
198 def consume(self):
199 assert not self.root
200 return self.unserialize(self.sub.recv_multipart())
201
202 def send_upstream(self, value, flags=0):
203 assert not self.root
204 self.upstream.send_multipart(self.serialize(value), flags=flags|zmq.NOBLOCK)
205
206 def recv_downstream(self, flags=0, timeout=2000.):
207 # wait for a message, so we won't block if there was a bug
208 self.downstream_poller.poll(timeout)
209
210 msg = self.downstream.recv_multipart(zmq.NOBLOCK|flags)
211 return self.unserialize(msg)
212
213 def reduce(self, f, value, flat=True, all=False):
214 """parallel reduce on binary tree
215
216 if flat:
217 value is an entry in the sequence
218 else:
219 value is a list of entries in the sequence
220
221 if all:
222 broadcast final result to all nodes
223 else:
224 only root gets final result
225 """
226 if not flat:
227 value = reduce(f, value)
228
229 for i in range(self.nchildren):
230 value = f(value, self.recv_downstream())
231
232 if not self.root:
233 self.send_upstream(value)
234
235 if all:
236 if self.root:
237 self.publish(value)
238 else:
239 value = self.consume()
240 return value
241
242 def allreduce(self, f, value, flat=True):
243 """parallel reduce followed by broadcast of the result"""
244 return self.reduce(f, value, flat=flat, all=True)
245
@@ -0,0 +1,87 b''
1 #!/usr/bin/env python
2 """
3 Script for setting up and using [all]reduce with a binary-tree engine interconnect.
4
5 usage: `python bintree_script.py`
6
7 This spanning tree strategy ensures that a single node node mailbox will never
8 receive more that 2 messages at once. This is very important to scale to large
9 clusters (e.g. 1000 nodes) since if you have many incoming messages of a couple
10 of megabytes you might saturate the network interface of a single node and
11 potentially its memory buffers if the messages are not consumed in a streamed
12 manner.
13
14 Note that the AllReduce scheme implemented with the spanning tree strategy
15 impose the aggregation function to be commutative and distributive. It might
16 not be the case if you implement the naive gather / reduce / broadcast strategy
17 where you can reorder the partial data before performing the reduce.
18 """
19
20 from IPython.parallel import Client, Reference
21
22
23 # connect client and create views
24 rc = Client()
25 rc.block=True
26 ids = rc.ids
27
28 root_id = ids[0]
29 root = rc[root_id]
30
31 view = rc[:]
32
33 # run bintree.py script defining bintree functions, etc.
34 execfile('bintree.py')
35
36 # generate binary tree of parents
37 btree = bintree(ids)
38
39 print "setting up binary tree interconnect:"
40 print_bintree(btree)
41
42 view.run('bintree.py')
43 view.scatter('id', ids, flatten=True)
44 view['root_id'] = root_id
45
46 # create the Communicator objects on the engines
47 view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )')
48 pub_url = root.apply_sync(lambda : com.pub_url)
49
50 # gather the connection information into a dict
51 ar = view.apply_async(lambda : com.info)
52 peers = ar.get_dict()
53 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
54
55 # connect the engines to each other:
56 def connect(com, peers, tree, pub_url, root_id):
57 """this function will be called on the engines"""
58 com.connect(peers, tree, pub_url, root_id)
59
60 view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id)
61
62 # functions that can be used for reductions
63 # max and min builtins can be used as well
64 def add(a,b):
65 """cumulative sum reduction"""
66 return a+b
67
68 def mul(a,b):
69 """cumulative product reduction"""
70 return a*b
71
72 view['add'] = add
73 view['mul'] = mul
74
75 # scatter some data
76 data = range(1000)
77 view.scatter('data', data)
78
79 # perform cumulative sum via allreduce
80 view.execute("data_sum = com.allreduce(add, data, flat=False)")
81 print "allreduce sum of data on all engines:", view['data_sum']
82
83 # perform cumulative sum *without* final broadcast
84 # when not broadcasting with allreduce, the final result resides on the root node:
85 view.execute("ids_sum = com.reduce(add, id, flat=True)")
86 print "reduce sum of engine ids (not broadcast):", root['ids_sum']
87 print "partial result on each engine:", view['ids_sum']
@@ -1,3 +1,5 b''
1 .. _parallel_examples:
2
1 =================
3 =================
2 Parallel examples
4 Parallel examples
3 =================
5 =================
@@ -4,6 +4,17 b''
4 Overview and getting started
4 Overview and getting started
5 ============================
5 ============================
6
6
7
8 Examples
9 ========
10
11 We have various example scripts and notebooks for using IPython.parallel in our
12 :file:`docs/examples/parallel` directory, or they can be found `on GitHub`__.
13 Some of these are covered in more detail in the :ref:`examples
14 <parallel_examples>` section.
15
16 .. __: https://github.com/ipython/ipython/tree/master/docs/examples/parallel
17
7 Introduction
18 Introduction
8 ============
19 ============
9
20
General Comments 0
You need to be logged in to leave comments. Login now