Show More
@@ -0,0 +1,77 b'' | |||
|
1 | import socket | |
|
2 | ||
|
3 | import uuid | |
|
4 | import zmq | |
|
5 | ||
|
6 | from IPython.zmq.parallel.util import disambiguate_url | |
|
7 | ||
|
8 | class EngineCommunicator(object): | |
|
9 | ||
|
10 | def __init__(self, interface='tcp://*', identity=None): | |
|
11 | self._ctx = zmq.Context() | |
|
12 | self.socket = self._ctx.socket(zmq.XREP) | |
|
13 | self.pub = self._ctx.socket(zmq.PUB) | |
|
14 | self.sub = self._ctx.socket(zmq.SUB) | |
|
15 | ||
|
16 | # configure sockets | |
|
17 | self.identity = identity or bytes(uuid.uuid4()) | |
|
18 | print self.identity | |
|
19 | self.socket.setsockopt(zmq.IDENTITY, self.identity) | |
|
20 | self.sub.setsockopt(zmq.SUBSCRIBE, b'') | |
|
21 | ||
|
22 | # bind to ports | |
|
23 | port = self.socket.bind_to_random_port(interface) | |
|
24 | pub_port = self.pub.bind_to_random_port(interface) | |
|
25 | self.url = interface+":%i"%port | |
|
26 | self.pub_url = interface+":%i"%pub_port | |
|
27 | # guess first public IP from socket | |
|
28 | self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] | |
|
29 | self.peers = {} | |
|
30 | ||
|
31 | def __del__(self): | |
|
32 | self.socket.close() | |
|
33 | self.pub.close() | |
|
34 | self.sub.close() | |
|
35 | self._ctx.term() | |
|
36 | ||
|
37 | @property | |
|
38 | def info(self): | |
|
39 | """return the connection info for this object's sockets.""" | |
|
40 | return (self.identity, self.url, self.pub_url, self.location) | |
|
41 | ||
|
42 | def connect(self, peers): | |
|
43 | """connect to peers. `peers` will be a dict of 4-tuples, keyed by name. | |
|
44 | {peer : (ident, addr, pub_addr, location)} | |
|
45 | where peer is the name, ident is the XREP identity, addr,pub_addr are the | |
|
46 | """ | |
|
47 | for peer, (ident, url, pub_url, location) in peers.items(): | |
|
48 | self.peers[peer] = ident | |
|
49 | if ident != self.identity: | |
|
50 | self.sub.connect(disambiguate_url(pub_url, location)) | |
|
51 | if ident > self.identity: | |
|
52 | # prevent duplicate xrep, by only connecting | |
|
53 | # engines to engines with higher IDENTITY | |
|
54 | # a doubly-connected pair will crash | |
|
55 | self.socket.connect(disambiguate_url(url, location)) | |
|
56 | ||
|
57 | def send(self, peers, msg, flags=0, copy=True): | |
|
58 | if not isinstance(peers, list): | |
|
59 | peers = [peers] | |
|
60 | if not isinstance(msg, list): | |
|
61 | msg = [msg] | |
|
62 | for p in peers: | |
|
63 | ident = self.peers[p] | |
|
64 | self.socket.send_multipart([ident]+msg, flags=flags, copy=copy) | |
|
65 | ||
|
66 | def recv(self, flags=0, copy=True): | |
|
67 | return self.socket.recv_multipart(flags=flags, copy=copy)[1:] | |
|
68 | ||
|
69 | def publish(self, msg, flags=0, copy=True): | |
|
70 | if not isinstance(msg, list): | |
|
71 | msg = [msg] | |
|
72 | self.pub.send_multipart(msg, copy=copy) | |
|
73 | ||
|
74 | def consume(self, flags=0, copy=True): | |
|
75 | return self.sub.recv_multipart(flags=flags, copy=copy) | |
|
76 | ||
|
77 |
@@ -0,0 +1,43 b'' | |||
|
1 | import sys | |
|
2 | ||
|
3 | from IPython.zmq.parallel import client | |
|
4 | ||
|
5 | ||
|
6 | rc = client.Client() | |
|
7 | rc.block=True | |
|
8 | view = rc[:] | |
|
9 | view.run('communicator.py') | |
|
10 | view.execute('com = EngineCommunicator()') | |
|
11 | ||
|
12 | # gather the connection information into a dict | |
|
13 | ar = view.apply_async_bound(lambda : com.info) | |
|
14 | peers = ar.get_dict() | |
|
15 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |
|
16 | ||
|
17 | # connect the engines to each other: | |
|
18 | view.apply_sync_bound(lambda pdict: com.connect(pdict), peers) | |
|
19 | ||
|
20 | # now all the engines are connected, and we can communicate between them: | |
|
21 | ||
|
22 | def broadcast(client, sender, msg_name, dest_name=None, block=None): | |
|
23 | """broadcast a message from one engine to all others.""" | |
|
24 | dest_name = msg_name if dest_name is None else dest_name | |
|
25 | client[sender].execute('com.publish(%s)'%msg_name, block=None) | |
|
26 | targets = client.ids | |
|
27 | targets.remove(sender) | |
|
28 | return client[targets].execute('%s=com.consume()'%dest_name, block=None) | |
|
29 | ||
|
30 | def send(client, sender, targets, msg_name, dest_name=None, block=None): | |
|
31 | """send a message from one to one-or-more engines.""" | |
|
32 | dest_name = msg_name if dest_name is None else dest_name | |
|
33 | def _send(targets, m_name): | |
|
34 | msg = globals()[m_name] | |
|
35 | return com.send(targets, msg) | |
|
36 | ||
|
37 | client[sender].apply_async_bound(_send, targets, msg_name) | |
|
38 | ||
|
39 | return client[targets].execute('%s=com.recv()'%dest_name, block=None) | |
|
40 | ||
|
41 | ||
|
42 | ||
|
43 |
General Comments 0
You need to be logged in to leave comments.
Login now