Show More
@@ -0,0 +1,245 b'' | |||
|
1 | """ | |
|
2 | BinaryTree inter-engine communication class | |
|
3 | ||
|
4 | use from bintree_script.py | |
|
5 | ||
|
6 | Provides parallel [all]reduce functionality | |
|
7 | ||
|
8 | """ | |
|
9 | ||
|
10 | import cPickle as pickle | |
|
11 | import re | |
|
12 | import socket | |
|
13 | import uuid | |
|
14 | ||
|
15 | import zmq | |
|
16 | ||
|
17 | from IPython.parallel.util import disambiguate_url | |
|
18 | ||
|
19 | ||
|
20 | #---------------------------------------------------------------------------- | |
|
21 | # bintree-related construction/printing helpers | |
|
22 | #---------------------------------------------------------------------------- | |
|
23 | ||
|
24 | def bintree(ids, parent=None): | |
|
25 | """construct {child:parent} dict representation of a binary tree | |
|
26 | ||
|
27 | keys are the nodes in the tree, and values are the parent of each node. | |
|
28 | ||
|
29 | The root node has parent `parent`, default: None. | |
|
30 | ||
|
31 | >>> tree = bintree(range(7)) | |
|
32 | >>> tree | |
|
33 | {0: None, 1: 0, 2: 1, 3: 1, 4: 0, 5: 4, 6: 4} | |
|
34 | >>> print_bintree(tree) | |
|
35 | 0 | |
|
36 | 1 | |
|
37 | 2 | |
|
38 | 3 | |
|
39 | 4 | |
|
40 | 5 | |
|
41 | 6 | |
|
42 | """ | |
|
43 | parents = {} | |
|
44 | n = len(ids) | |
|
45 | if n == 0: | |
|
46 | return parents | |
|
47 | root = ids[0] | |
|
48 | parents[root] = parent | |
|
49 | if len(ids) == 1: | |
|
50 | return parents | |
|
51 | else: | |
|
52 | ids = ids[1:] | |
|
53 | n = len(ids) | |
|
54 | left = bintree(ids[:n/2], parent=root) | |
|
55 | right = bintree(ids[n/2:], parent=root) | |
|
56 | parents.update(left) | |
|
57 | parents.update(right) | |
|
58 | return parents | |
|
59 | ||
|
60 | def reverse_bintree(parents): | |
|
61 | """construct {parent:[children]} dict from {child:parent} | |
|
62 | ||
|
63 | keys are the nodes in the tree, and values are the lists of children | |
|
64 | of that node in the tree. | |
|
65 | ||
|
66 | reverse_tree[None] is the root node | |
|
67 | ||
|
68 | >>> tree = bintree(range(7)) | |
|
69 | >>> reverse_bintree(tree) | |
|
70 | {None: 0, 0: [1, 4], 4: [5, 6], 1: [2, 3]} | |
|
71 | """ | |
|
72 | children = {} | |
|
73 | for child,parent in parents.iteritems(): | |
|
74 | if parent is None: | |
|
75 | children[None] = child | |
|
76 | continue | |
|
77 | elif parent not in children: | |
|
78 | children[parent] = [] | |
|
79 | children[parent].append(child) | |
|
80 | ||
|
81 | return children | |
|
82 | ||
|
83 | def depth(n, tree): | |
|
84 | """get depth of an element in the tree""" | |
|
85 | d = 0 | |
|
86 | parent = tree[n] | |
|
87 | while parent is not None: | |
|
88 | d += 1 | |
|
89 | parent = tree[parent] | |
|
90 | return d | |
|
91 | ||
|
92 | def print_bintree(tree, indent=' '): | |
|
93 | """print a binary tree""" | |
|
94 | for n in sorted(tree.keys()): | |
|
95 | print "%s%s" % (indent * depth(n,tree), n) | |
|
96 | ||
|
97 | #---------------------------------------------------------------------------- | |
|
98 | # Communicator class for a binary-tree map | |
|
99 | #---------------------------------------------------------------------------- | |
|
100 | ||
|
101 | ip_pat = re.compile(r'^\d+\.\d+\.\d+\.\d+$') | |
|
102 | ||
|
103 | def disambiguate_dns_url(url, location): | |
|
104 | """accept either IP address or dns name, and return IP""" | |
|
105 | if not ip_pat.match(location): | |
|
106 | location = socket.gethostbyname(location) | |
|
107 | return disambiguate_url(url, location) | |
|
108 | ||
|
109 | class BinaryTreeCommunicator(object): | |
|
110 | ||
|
111 | id = None | |
|
112 | pub = None | |
|
113 | sub = None | |
|
114 | downstream = None | |
|
115 | upstream = None | |
|
116 | pub_url = None | |
|
117 | tree_url = None | |
|
118 | ||
|
119 | def __init__(self, id, interface='tcp://*', root=False): | |
|
120 | self.id = id | |
|
121 | self.root = root | |
|
122 | ||
|
123 | # create context and sockets | |
|
124 | self._ctx = zmq.Context() | |
|
125 | if root: | |
|
126 | self.pub = self._ctx.socket(zmq.PUB) | |
|
127 | else: | |
|
128 | self.sub = self._ctx.socket(zmq.SUB) | |
|
129 | self.sub.setsockopt(zmq.SUBSCRIBE, b'') | |
|
130 | self.downstream = self._ctx.socket(zmq.PULL) | |
|
131 | self.upstream = self._ctx.socket(zmq.PUSH) | |
|
132 | ||
|
133 | # bind to ports | |
|
134 | interface_f = interface + ":%i" | |
|
135 | if self.root: | |
|
136 | pub_port = self.pub.bind_to_random_port(interface) | |
|
137 | self.pub_url = interface_f % pub_port | |
|
138 | ||
|
139 | tree_port = self.downstream.bind_to_random_port(interface) | |
|
140 | self.tree_url = interface_f % tree_port | |
|
141 | self.downstream_poller = zmq.Poller() | |
|
142 | self.downstream_poller.register(self.downstream, zmq.POLLIN) | |
|
143 | ||
|
144 | # guess first public IP from socket | |
|
145 | self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] | |
|
146 | ||
|
147 | def __del__(self): | |
|
148 | self.downstream.close() | |
|
149 | self.upstream.close() | |
|
150 | if self.root: | |
|
151 | self.pub.close() | |
|
152 | else: | |
|
153 | self.sub.close() | |
|
154 | self._ctx.term() | |
|
155 | ||
|
156 | @property | |
|
157 | def info(self): | |
|
158 | """return the connection info for this object's sockets.""" | |
|
159 | return (self.tree_url, self.location) | |
|
160 | ||
|
161 | def connect(self, peers, btree, pub_url, root_id=0): | |
|
162 | """connect to peers. `peers` will be a dict of 4-tuples, keyed by name. | |
|
163 | {peer : (ident, addr, pub_addr, location)} | |
|
164 | where peer is the name, ident is the XREP identity, addr,pub_addr are the | |
|
165 | """ | |
|
166 | ||
|
167 | # count the number of children we have | |
|
168 | self.nchildren = btree.values().count(self.id) | |
|
169 | ||
|
170 | if self.root: | |
|
171 | return # root only binds | |
|
172 | ||
|
173 | root_location = peers[root_id][-1] | |
|
174 | self.sub.connect(disambiguate_dns_url(pub_url, root_location)) | |
|
175 | ||
|
176 | parent = btree[self.id] | |
|
177 | ||
|
178 | tree_url, location = peers[parent] | |
|
179 | self.upstream.connect(disambiguate_dns_url(tree_url, location)) | |
|
180 | ||
|
181 | def serialize(self, obj): | |
|
182 | """serialize objects. | |
|
183 | ||
|
184 | Must return list of sendable buffers. | |
|
185 | ||
|
186 | Can be extended for more efficient/noncopying serialization of numpy arrays, etc. | |
|
187 | """ | |
|
188 | return [pickle.dumps(obj)] | |
|
189 | ||
|
190 | def unserialize(self, msg): | |
|
191 | """inverse of serialize""" | |
|
192 | return pickle.loads(msg[0]) | |
|
193 | ||
|
194 | def publish(self, value): | |
|
195 | assert self.root | |
|
196 | self.pub.send_multipart(self.serialize(value)) | |
|
197 | ||
|
198 | def consume(self): | |
|
199 | assert not self.root | |
|
200 | return self.unserialize(self.sub.recv_multipart()) | |
|
201 | ||
|
202 | def send_upstream(self, value, flags=0): | |
|
203 | assert not self.root | |
|
204 | self.upstream.send_multipart(self.serialize(value), flags=flags|zmq.NOBLOCK) | |
|
205 | ||
|
206 | def recv_downstream(self, flags=0, timeout=2000.): | |
|
207 | # wait for a message, so we won't block if there was a bug | |
|
208 | self.downstream_poller.poll(timeout) | |
|
209 | ||
|
210 | msg = self.downstream.recv_multipart(zmq.NOBLOCK|flags) | |
|
211 | return self.unserialize(msg) | |
|
212 | ||
|
213 | def reduce(self, f, value, flat=True, all=False): | |
|
214 | """parallel reduce on binary tree | |
|
215 | ||
|
216 | if flat: | |
|
217 | value is an entry in the sequence | |
|
218 | else: | |
|
219 | value is a list of entries in the sequence | |
|
220 | ||
|
221 | if all: | |
|
222 | broadcast final result to all nodes | |
|
223 | else: | |
|
224 | only root gets final result | |
|
225 | """ | |
|
226 | if not flat: | |
|
227 | value = reduce(f, value) | |
|
228 | ||
|
229 | for i in range(self.nchildren): | |
|
230 | value = f(value, self.recv_downstream()) | |
|
231 | ||
|
232 | if not self.root: | |
|
233 | self.send_upstream(value) | |
|
234 | ||
|
235 | if all: | |
|
236 | if self.root: | |
|
237 | self.publish(value) | |
|
238 | else: | |
|
239 | value = self.consume() | |
|
240 | return value | |
|
241 | ||
|
242 | def allreduce(self, f, value, flat=True): | |
|
243 | """parallel reduce followed by broadcast of the result""" | |
|
244 | return self.reduce(f, value, flat=flat, all=True) | |
|
245 |
@@ -0,0 +1,87 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """ | |
|
3 | Script for setting up and using [all]reduce with a binary-tree engine interconnect. | |
|
4 | ||
|
5 | usage: `python bintree_script.py` | |
|
6 | ||
|
7 | This spanning tree strategy ensures that a single node node mailbox will never | |
|
8 | receive more that 2 messages at once. This is very important to scale to large | |
|
9 | clusters (e.g. 1000 nodes) since if you have many incoming messages of a couple | |
|
10 | of megabytes you might saturate the network interface of a single node and | |
|
11 | potentially its memory buffers if the messages are not consumed in a streamed | |
|
12 | manner. | |
|
13 | ||
|
14 | Note that the AllReduce scheme implemented with the spanning tree strategy | |
|
15 | impose the aggregation function to be commutative and distributive. It might | |
|
16 | not be the case if you implement the naive gather / reduce / broadcast strategy | |
|
17 | where you can reorder the partial data before performing the reduce. | |
|
18 | """ | |
|
19 | ||
|
20 | from IPython.parallel import Client, Reference | |
|
21 | ||
|
22 | ||
|
23 | # connect client and create views | |
|
24 | rc = Client() | |
|
25 | rc.block=True | |
|
26 | ids = rc.ids | |
|
27 | ||
|
28 | root_id = ids[0] | |
|
29 | root = rc[root_id] | |
|
30 | ||
|
31 | view = rc[:] | |
|
32 | ||
|
33 | # run bintree.py script defining bintree functions, etc. | |
|
34 | execfile('bintree.py') | |
|
35 | ||
|
36 | # generate binary tree of parents | |
|
37 | btree = bintree(ids) | |
|
38 | ||
|
39 | print "setting up binary tree interconnect:" | |
|
40 | print_bintree(btree) | |
|
41 | ||
|
42 | view.run('bintree.py') | |
|
43 | view.scatter('id', ids, flatten=True) | |
|
44 | view['root_id'] = root_id | |
|
45 | ||
|
46 | # create the Communicator objects on the engines | |
|
47 | view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )') | |
|
48 | pub_url = root.apply_sync(lambda : com.pub_url) | |
|
49 | ||
|
50 | # gather the connection information into a dict | |
|
51 | ar = view.apply_async(lambda : com.info) | |
|
52 | peers = ar.get_dict() | |
|
53 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |
|
54 | ||
|
55 | # connect the engines to each other: | |
|
56 | def connect(com, peers, tree, pub_url, root_id): | |
|
57 | """this function will be called on the engines""" | |
|
58 | com.connect(peers, tree, pub_url, root_id) | |
|
59 | ||
|
60 | view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id) | |
|
61 | ||
|
62 | # functions that can be used for reductions | |
|
63 | # max and min builtins can be used as well | |
|
64 | def add(a,b): | |
|
65 | """cumulative sum reduction""" | |
|
66 | return a+b | |
|
67 | ||
|
68 | def mul(a,b): | |
|
69 | """cumulative product reduction""" | |
|
70 | return a*b | |
|
71 | ||
|
72 | view['add'] = add | |
|
73 | view['mul'] = mul | |
|
74 | ||
|
75 | # scatter some data | |
|
76 | data = range(1000) | |
|
77 | view.scatter('data', data) | |
|
78 | ||
|
79 | # perform cumulative sum via allreduce | |
|
80 | view.execute("data_sum = com.allreduce(add, data, flat=False)") | |
|
81 | print "allreduce sum of data on all engines:", view['data_sum'] | |
|
82 | ||
|
83 | # perform cumulative sum *without* final broadcast | |
|
84 | # when not broadcasting with allreduce, the final result resides on the root node: | |
|
85 | view.execute("ids_sum = com.reduce(add, id, flat=True)") | |
|
86 | print "reduce sum of engine ids (not broadcast):", root['ids_sum'] | |
|
87 | print "partial result on each engine:", view['ids_sum'] |
@@ -1,3 +1,5 b'' | |||
|
1 | .. _parallel_examples: | |
|
2 | ||
|
1 | 3 | ================= |
|
2 | 4 | Parallel examples |
|
3 | 5 | ================= |
@@ -4,6 +4,17 b'' | |||
|
4 | 4 | Overview and getting started |
|
5 | 5 | ============================ |
|
6 | 6 | |
|
7 | ||
|
8 | Examples | |
|
9 | ======== | |
|
10 | ||
|
11 | We have various example scripts and notebooks for using IPython.parallel in our | |
|
12 | :file:`docs/examples/parallel` directory, or they can be found `on GitHub`__. | |
|
13 | Some of these are covered in more detail in the :ref:`examples | |
|
14 | <parallel_examples>` section. | |
|
15 | ||
|
16 | .. __: https://github.com/ipython/ipython/tree/master/docs/examples/parallel | |
|
17 | ||
|
7 | 18 | Introduction |
|
8 | 19 | ============ |
|
9 | 20 |
General Comments 0
You need to be logged in to leave comments.
Login now