Show More
@@ -0,0 +1,245 b'' | |||||
|
1 | """ | |||
|
2 | BinaryTree inter-engine communication class | |||
|
3 | ||||
|
4 | use from bintree_script.py | |||
|
5 | ||||
|
6 | Provides parallel [all]reduce functionality | |||
|
7 | ||||
|
8 | """ | |||
|
9 | ||||
|
10 | import cPickle as pickle | |||
|
11 | import re | |||
|
12 | import socket | |||
|
13 | import uuid | |||
|
14 | ||||
|
15 | import zmq | |||
|
16 | ||||
|
17 | from IPython.parallel.util import disambiguate_url | |||
|
18 | ||||
|
19 | ||||
|
20 | #---------------------------------------------------------------------------- | |||
|
21 | # bintree-related construction/printing helpers | |||
|
22 | #---------------------------------------------------------------------------- | |||
|
23 | ||||
|
24 | def bintree(ids, parent=None): | |||
|
25 | """construct {child:parent} dict representation of a binary tree | |||
|
26 | ||||
|
27 | keys are the nodes in the tree, and values are the parent of each node. | |||
|
28 | ||||
|
29 | The root node has parent `parent`, default: None. | |||
|
30 | ||||
|
31 | >>> tree = bintree(range(7)) | |||
|
32 | >>> tree | |||
|
33 | {0: None, 1: 0, 2: 1, 3: 1, 4: 0, 5: 4, 6: 4} | |||
|
34 | >>> print_bintree(tree) | |||
|
35 | 0 | |||
|
36 | 1 | |||
|
37 | 2 | |||
|
38 | 3 | |||
|
39 | 4 | |||
|
40 | 5 | |||
|
41 | 6 | |||
|
42 | """ | |||
|
43 | parents = {} | |||
|
44 | n = len(ids) | |||
|
45 | if n == 0: | |||
|
46 | return parents | |||
|
47 | root = ids[0] | |||
|
48 | parents[root] = parent | |||
|
49 | if len(ids) == 1: | |||
|
50 | return parents | |||
|
51 | else: | |||
|
52 | ids = ids[1:] | |||
|
53 | n = len(ids) | |||
|
54 | left = bintree(ids[:n/2], parent=root) | |||
|
55 | right = bintree(ids[n/2:], parent=root) | |||
|
56 | parents.update(left) | |||
|
57 | parents.update(right) | |||
|
58 | return parents | |||
|
59 | ||||
|
60 | def reverse_bintree(parents): | |||
|
61 | """construct {parent:[children]} dict from {child:parent} | |||
|
62 | ||||
|
63 | keys are the nodes in the tree, and values are the lists of children | |||
|
64 | of that node in the tree. | |||
|
65 | ||||
|
66 | reverse_tree[None] is the root node | |||
|
67 | ||||
|
68 | >>> tree = bintree(range(7)) | |||
|
69 | >>> reverse_bintree(tree) | |||
|
70 | {None: 0, 0: [1, 4], 4: [5, 6], 1: [2, 3]} | |||
|
71 | """ | |||
|
72 | children = {} | |||
|
73 | for child,parent in parents.iteritems(): | |||
|
74 | if parent is None: | |||
|
75 | children[None] = child | |||
|
76 | continue | |||
|
77 | elif parent not in children: | |||
|
78 | children[parent] = [] | |||
|
79 | children[parent].append(child) | |||
|
80 | ||||
|
81 | return children | |||
|
82 | ||||
|
83 | def depth(n, tree): | |||
|
84 | """get depth of an element in the tree""" | |||
|
85 | d = 0 | |||
|
86 | parent = tree[n] | |||
|
87 | while parent is not None: | |||
|
88 | d += 1 | |||
|
89 | parent = tree[parent] | |||
|
90 | return d | |||
|
91 | ||||
|
92 | def print_bintree(tree, indent=' '): | |||
|
93 | """print a binary tree""" | |||
|
94 | for n in sorted(tree.keys()): | |||
|
95 | print "%s%s" % (indent * depth(n,tree), n) | |||
|
96 | ||||
|
97 | #---------------------------------------------------------------------------- | |||
|
98 | # Communicator class for a binary-tree map | |||
|
99 | #---------------------------------------------------------------------------- | |||
|
100 | ||||
|
101 | ip_pat = re.compile(r'^\d+\.\d+\.\d+\.\d+$') | |||
|
102 | ||||
|
103 | def disambiguate_dns_url(url, location): | |||
|
104 | """accept either IP address or dns name, and return IP""" | |||
|
105 | if not ip_pat.match(location): | |||
|
106 | location = socket.gethostbyname(location) | |||
|
107 | return disambiguate_url(url, location) | |||
|
108 | ||||
|
109 | class BinaryTreeCommunicator(object): | |||
|
110 | ||||
|
111 | id = None | |||
|
112 | pub = None | |||
|
113 | sub = None | |||
|
114 | downstream = None | |||
|
115 | upstream = None | |||
|
116 | pub_url = None | |||
|
117 | tree_url = None | |||
|
118 | ||||
|
119 | def __init__(self, id, interface='tcp://*', root=False): | |||
|
120 | self.id = id | |||
|
121 | self.root = root | |||
|
122 | ||||
|
123 | # create context and sockets | |||
|
124 | self._ctx = zmq.Context() | |||
|
125 | if root: | |||
|
126 | self.pub = self._ctx.socket(zmq.PUB) | |||
|
127 | else: | |||
|
128 | self.sub = self._ctx.socket(zmq.SUB) | |||
|
129 | self.sub.setsockopt(zmq.SUBSCRIBE, b'') | |||
|
130 | self.downstream = self._ctx.socket(zmq.PULL) | |||
|
131 | self.upstream = self._ctx.socket(zmq.PUSH) | |||
|
132 | ||||
|
133 | # bind to ports | |||
|
134 | interface_f = interface + ":%i" | |||
|
135 | if self.root: | |||
|
136 | pub_port = self.pub.bind_to_random_port(interface) | |||
|
137 | self.pub_url = interface_f % pub_port | |||
|
138 | ||||
|
139 | tree_port = self.downstream.bind_to_random_port(interface) | |||
|
140 | self.tree_url = interface_f % tree_port | |||
|
141 | self.downstream_poller = zmq.Poller() | |||
|
142 | self.downstream_poller.register(self.downstream, zmq.POLLIN) | |||
|
143 | ||||
|
144 | # guess first public IP from socket | |||
|
145 | self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] | |||
|
146 | ||||
|
147 | def __del__(self): | |||
|
148 | self.downstream.close() | |||
|
149 | self.upstream.close() | |||
|
150 | if self.root: | |||
|
151 | self.pub.close() | |||
|
152 | else: | |||
|
153 | self.sub.close() | |||
|
154 | self._ctx.term() | |||
|
155 | ||||
|
156 | @property | |||
|
157 | def info(self): | |||
|
158 | """return the connection info for this object's sockets.""" | |||
|
159 | return (self.tree_url, self.location) | |||
|
160 | ||||
|
161 | def connect(self, peers, btree, pub_url, root_id=0): | |||
|
162 | """connect to peers. `peers` will be a dict of 4-tuples, keyed by name. | |||
|
163 | {peer : (ident, addr, pub_addr, location)} | |||
|
164 | where peer is the name, ident is the XREP identity, addr,pub_addr are the | |||
|
165 | """ | |||
|
166 | ||||
|
167 | # count the number of children we have | |||
|
168 | self.nchildren = btree.values().count(self.id) | |||
|
169 | ||||
|
170 | if self.root: | |||
|
171 | return # root only binds | |||
|
172 | ||||
|
173 | root_location = peers[root_id][-1] | |||
|
174 | self.sub.connect(disambiguate_dns_url(pub_url, root_location)) | |||
|
175 | ||||
|
176 | parent = btree[self.id] | |||
|
177 | ||||
|
178 | tree_url, location = peers[parent] | |||
|
179 | self.upstream.connect(disambiguate_dns_url(tree_url, location)) | |||
|
180 | ||||
|
181 | def serialize(self, obj): | |||
|
182 | """serialize objects. | |||
|
183 | ||||
|
184 | Must return list of sendable buffers. | |||
|
185 | ||||
|
186 | Can be extended for more efficient/noncopying serialization of numpy arrays, etc. | |||
|
187 | """ | |||
|
188 | return [pickle.dumps(obj)] | |||
|
189 | ||||
|
190 | def unserialize(self, msg): | |||
|
191 | """inverse of serialize""" | |||
|
192 | return pickle.loads(msg[0]) | |||
|
193 | ||||
|
194 | def publish(self, value): | |||
|
195 | assert self.root | |||
|
196 | self.pub.send_multipart(self.serialize(value)) | |||
|
197 | ||||
|
198 | def consume(self): | |||
|
199 | assert not self.root | |||
|
200 | return self.unserialize(self.sub.recv_multipart()) | |||
|
201 | ||||
|
202 | def send_upstream(self, value, flags=0): | |||
|
203 | assert not self.root | |||
|
204 | self.upstream.send_multipart(self.serialize(value), flags=flags|zmq.NOBLOCK) | |||
|
205 | ||||
|
206 | def recv_downstream(self, flags=0, timeout=2000.): | |||
|
207 | # wait for a message, so we won't block if there was a bug | |||
|
208 | self.downstream_poller.poll(timeout) | |||
|
209 | ||||
|
210 | msg = self.downstream.recv_multipart(zmq.NOBLOCK|flags) | |||
|
211 | return self.unserialize(msg) | |||
|
212 | ||||
|
213 | def reduce(self, f, value, flat=True, all=False): | |||
|
214 | """parallel reduce on binary tree | |||
|
215 | ||||
|
216 | if flat: | |||
|
217 | value is an entry in the sequence | |||
|
218 | else: | |||
|
219 | value is a list of entries in the sequence | |||
|
220 | ||||
|
221 | if all: | |||
|
222 | broadcast final result to all nodes | |||
|
223 | else: | |||
|
224 | only root gets final result | |||
|
225 | """ | |||
|
226 | if not flat: | |||
|
227 | value = reduce(f, value) | |||
|
228 | ||||
|
229 | for i in range(self.nchildren): | |||
|
230 | value = f(value, self.recv_downstream()) | |||
|
231 | ||||
|
232 | if not self.root: | |||
|
233 | self.send_upstream(value) | |||
|
234 | ||||
|
235 | if all: | |||
|
236 | if self.root: | |||
|
237 | self.publish(value) | |||
|
238 | else: | |||
|
239 | value = self.consume() | |||
|
240 | return value | |||
|
241 | ||||
|
242 | def allreduce(self, f, value, flat=True): | |||
|
243 | """parallel reduce followed by broadcast of the result""" | |||
|
244 | return self.reduce(f, value, flat=flat, all=True) | |||
|
245 |
@@ -0,0 +1,87 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """ | |||
|
3 | Script for setting up and using [all]reduce with a binary-tree engine interconnect. | |||
|
4 | ||||
|
5 | usage: `python bintree_script.py` | |||
|
6 | ||||
|
7 | This spanning tree strategy ensures that a single node node mailbox will never | |||
|
8 | receive more that 2 messages at once. This is very important to scale to large | |||
|
9 | clusters (e.g. 1000 nodes) since if you have many incoming messages of a couple | |||
|
10 | of megabytes you might saturate the network interface of a single node and | |||
|
11 | potentially its memory buffers if the messages are not consumed in a streamed | |||
|
12 | manner. | |||
|
13 | ||||
|
14 | Note that the AllReduce scheme implemented with the spanning tree strategy | |||
|
15 | impose the aggregation function to be commutative and distributive. It might | |||
|
16 | not be the case if you implement the naive gather / reduce / broadcast strategy | |||
|
17 | where you can reorder the partial data before performing the reduce. | |||
|
18 | """ | |||
|
19 | ||||
|
20 | from IPython.parallel import Client, Reference | |||
|
21 | ||||
|
22 | ||||
|
23 | # connect client and create views | |||
|
24 | rc = Client() | |||
|
25 | rc.block=True | |||
|
26 | ids = rc.ids | |||
|
27 | ||||
|
28 | root_id = ids[0] | |||
|
29 | root = rc[root_id] | |||
|
30 | ||||
|
31 | view = rc[:] | |||
|
32 | ||||
|
33 | # run bintree.py script defining bintree functions, etc. | |||
|
34 | execfile('bintree.py') | |||
|
35 | ||||
|
36 | # generate binary tree of parents | |||
|
37 | btree = bintree(ids) | |||
|
38 | ||||
|
39 | print "setting up binary tree interconnect:" | |||
|
40 | print_bintree(btree) | |||
|
41 | ||||
|
42 | view.run('bintree.py') | |||
|
43 | view.scatter('id', ids, flatten=True) | |||
|
44 | view['root_id'] = root_id | |||
|
45 | ||||
|
46 | # create the Communicator objects on the engines | |||
|
47 | view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )') | |||
|
48 | pub_url = root.apply_sync(lambda : com.pub_url) | |||
|
49 | ||||
|
50 | # gather the connection information into a dict | |||
|
51 | ar = view.apply_async(lambda : com.info) | |||
|
52 | peers = ar.get_dict() | |||
|
53 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |||
|
54 | ||||
|
55 | # connect the engines to each other: | |||
|
56 | def connect(com, peers, tree, pub_url, root_id): | |||
|
57 | """this function will be called on the engines""" | |||
|
58 | com.connect(peers, tree, pub_url, root_id) | |||
|
59 | ||||
|
60 | view.apply_sync(connect, Reference('com'), peers, btree, pub_url, root_id) | |||
|
61 | ||||
|
62 | # functions that can be used for reductions | |||
|
63 | # max and min builtins can be used as well | |||
|
64 | def add(a,b): | |||
|
65 | """cumulative sum reduction""" | |||
|
66 | return a+b | |||
|
67 | ||||
|
68 | def mul(a,b): | |||
|
69 | """cumulative product reduction""" | |||
|
70 | return a*b | |||
|
71 | ||||
|
72 | view['add'] = add | |||
|
73 | view['mul'] = mul | |||
|
74 | ||||
|
75 | # scatter some data | |||
|
76 | data = range(1000) | |||
|
77 | view.scatter('data', data) | |||
|
78 | ||||
|
79 | # perform cumulative sum via allreduce | |||
|
80 | view.execute("data_sum = com.allreduce(add, data, flat=False)") | |||
|
81 | print "allreduce sum of data on all engines:", view['data_sum'] | |||
|
82 | ||||
|
83 | # perform cumulative sum *without* final broadcast | |||
|
84 | # when not broadcasting with allreduce, the final result resides on the root node: | |||
|
85 | view.execute("ids_sum = com.reduce(add, id, flat=True)") | |||
|
86 | print "reduce sum of engine ids (not broadcast):", root['ids_sum'] | |||
|
87 | print "partial result on each engine:", view['ids_sum'] |
@@ -1,3 +1,5 b'' | |||||
|
1 | .. _parallel_examples: | |||
|
2 | ||||
1 | ================= |
|
3 | ================= | |
2 | Parallel examples |
|
4 | Parallel examples | |
3 | ================= |
|
5 | ================= |
@@ -4,6 +4,17 b'' | |||||
4 | Overview and getting started |
|
4 | Overview and getting started | |
5 | ============================ |
|
5 | ============================ | |
6 |
|
6 | |||
|
7 | ||||
|
8 | Examples | |||
|
9 | ======== | |||
|
10 | ||||
|
11 | We have various example scripts and notebooks for using IPython.parallel in our | |||
|
12 | :file:`docs/examples/parallel` directory, or they can be found `on GitHub`__. | |||
|
13 | Some of these are covered in more detail in the :ref:`examples | |||
|
14 | <parallel_examples>` section. | |||
|
15 | ||||
|
16 | .. __: https://github.com/ipython/ipython/tree/master/docs/examples/parallel | |||
|
17 | ||||
7 | Introduction |
|
18 | Introduction | |
8 | ============ |
|
19 | ============ | |
9 |
|
20 |
General Comments 0
You need to be logged in to leave comments.
Login now