Show More
@@ -0,0 +1,77 b'' | |||||
|
1 | import socket | |||
|
2 | ||||
|
3 | import uuid | |||
|
4 | import zmq | |||
|
5 | ||||
|
6 | from IPython.zmq.parallel.util import disambiguate_url | |||
|
7 | ||||
|
8 | class EngineCommunicator(object): | |||
|
9 | ||||
|
10 | def __init__(self, interface='tcp://*', identity=None): | |||
|
11 | self._ctx = zmq.Context() | |||
|
12 | self.socket = self._ctx.socket(zmq.XREP) | |||
|
13 | self.pub = self._ctx.socket(zmq.PUB) | |||
|
14 | self.sub = self._ctx.socket(zmq.SUB) | |||
|
15 | ||||
|
16 | # configure sockets | |||
|
17 | self.identity = identity or bytes(uuid.uuid4()) | |||
|
18 | print self.identity | |||
|
19 | self.socket.setsockopt(zmq.IDENTITY, self.identity) | |||
|
20 | self.sub.setsockopt(zmq.SUBSCRIBE, b'') | |||
|
21 | ||||
|
22 | # bind to ports | |||
|
23 | port = self.socket.bind_to_random_port(interface) | |||
|
24 | pub_port = self.pub.bind_to_random_port(interface) | |||
|
25 | self.url = interface+":%i"%port | |||
|
26 | self.pub_url = interface+":%i"%pub_port | |||
|
27 | # guess first public IP from socket | |||
|
28 | self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] | |||
|
29 | self.peers = {} | |||
|
30 | ||||
|
31 | def __del__(self): | |||
|
32 | self.socket.close() | |||
|
33 | self.pub.close() | |||
|
34 | self.sub.close() | |||
|
35 | self._ctx.term() | |||
|
36 | ||||
|
37 | @property | |||
|
38 | def info(self): | |||
|
39 | """return the connection info for this object's sockets.""" | |||
|
40 | return (self.identity, self.url, self.pub_url, self.location) | |||
|
41 | ||||
|
42 | def connect(self, peers): | |||
|
43 | """connect to peers. `peers` will be a dict of 4-tuples, keyed by name. | |||
|
44 | {peer : (ident, addr, pub_addr, location)} | |||
|
45 | where peer is the name, ident is the XREP identity, addr,pub_addr are the | |||
|
46 | """ | |||
|
47 | for peer, (ident, url, pub_url, location) in peers.items(): | |||
|
48 | self.peers[peer] = ident | |||
|
49 | if ident != self.identity: | |||
|
50 | self.sub.connect(disambiguate_url(pub_url, location)) | |||
|
51 | if ident > self.identity: | |||
|
52 | # prevent duplicate xrep, by only connecting | |||
|
53 | # engines to engines with higher IDENTITY | |||
|
54 | # a doubly-connected pair will crash | |||
|
55 | self.socket.connect(disambiguate_url(url, location)) | |||
|
56 | ||||
|
57 | def send(self, peers, msg, flags=0, copy=True): | |||
|
58 | if not isinstance(peers, list): | |||
|
59 | peers = [peers] | |||
|
60 | if not isinstance(msg, list): | |||
|
61 | msg = [msg] | |||
|
62 | for p in peers: | |||
|
63 | ident = self.peers[p] | |||
|
64 | self.socket.send_multipart([ident]+msg, flags=flags, copy=copy) | |||
|
65 | ||||
|
66 | def recv(self, flags=0, copy=True): | |||
|
67 | return self.socket.recv_multipart(flags=flags, copy=copy)[1:] | |||
|
68 | ||||
|
69 | def publish(self, msg, flags=0, copy=True): | |||
|
70 | if not isinstance(msg, list): | |||
|
71 | msg = [msg] | |||
|
72 | self.pub.send_multipart(msg, copy=copy) | |||
|
73 | ||||
|
74 | def consume(self, flags=0, copy=True): | |||
|
75 | return self.sub.recv_multipart(flags=flags, copy=copy) | |||
|
76 | ||||
|
77 |
@@ -0,0 +1,43 b'' | |||||
|
1 | import sys | |||
|
2 | ||||
|
3 | from IPython.zmq.parallel import client | |||
|
4 | ||||
|
5 | ||||
|
6 | rc = client.Client() | |||
|
7 | rc.block=True | |||
|
8 | view = rc[:] | |||
|
9 | view.run('communicator.py') | |||
|
10 | view.execute('com = EngineCommunicator()') | |||
|
11 | ||||
|
12 | # gather the connection information into a dict | |||
|
13 | ar = view.apply_async_bound(lambda : com.info) | |||
|
14 | peers = ar.get_dict() | |||
|
15 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |||
|
16 | ||||
|
17 | # connect the engines to each other: | |||
|
18 | view.apply_sync_bound(lambda pdict: com.connect(pdict), peers) | |||
|
19 | ||||
|
20 | # now all the engines are connected, and we can communicate between them: | |||
|
21 | ||||
|
22 | def broadcast(client, sender, msg_name, dest_name=None, block=None): | |||
|
23 | """broadcast a message from one engine to all others.""" | |||
|
24 | dest_name = msg_name if dest_name is None else dest_name | |||
|
25 | client[sender].execute('com.publish(%s)'%msg_name, block=None) | |||
|
26 | targets = client.ids | |||
|
27 | targets.remove(sender) | |||
|
28 | return client[targets].execute('%s=com.consume()'%dest_name, block=None) | |||
|
29 | ||||
|
30 | def send(client, sender, targets, msg_name, dest_name=None, block=None): | |||
|
31 | """send a message from one to one-or-more engines.""" | |||
|
32 | dest_name = msg_name if dest_name is None else dest_name | |||
|
33 | def _send(targets, m_name): | |||
|
34 | msg = globals()[m_name] | |||
|
35 | return com.send(targets, msg) | |||
|
36 | ||||
|
37 | client[sender].apply_async_bound(_send, targets, msg_name) | |||
|
38 | ||||
|
39 | return client[targets].execute('%s=com.recv()'%dest_name, block=None) | |||
|
40 | ||||
|
41 | ||||
|
42 | ||||
|
43 |
General Comments 0
You need to be logged in to leave comments.
Login now