##// END OF EJS Templates
add inter-engine communication example
MinRK -
Show More
@@ -0,0 +1,77 b''
1 import socket
2
3 import uuid
4 import zmq
5
6 from IPython.zmq.parallel.util import disambiguate_url
7
8 class EngineCommunicator(object):
9
10 def __init__(self, interface='tcp://*', identity=None):
11 self._ctx = zmq.Context()
12 self.socket = self._ctx.socket(zmq.XREP)
13 self.pub = self._ctx.socket(zmq.PUB)
14 self.sub = self._ctx.socket(zmq.SUB)
15
16 # configure sockets
17 self.identity = identity or bytes(uuid.uuid4())
18 print self.identity
19 self.socket.setsockopt(zmq.IDENTITY, self.identity)
20 self.sub.setsockopt(zmq.SUBSCRIBE, b'')
21
22 # bind to ports
23 port = self.socket.bind_to_random_port(interface)
24 pub_port = self.pub.bind_to_random_port(interface)
25 self.url = interface+":%i"%port
26 self.pub_url = interface+":%i"%pub_port
27 # guess first public IP from socket
28 self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0]
29 self.peers = {}
30
31 def __del__(self):
32 self.socket.close()
33 self.pub.close()
34 self.sub.close()
35 self._ctx.term()
36
37 @property
38 def info(self):
39 """return the connection info for this object's sockets."""
40 return (self.identity, self.url, self.pub_url, self.location)
41
42 def connect(self, peers):
43 """connect to peers. `peers` will be a dict of 4-tuples, keyed by name.
44 {peer : (ident, addr, pub_addr, location)}
45 where peer is the name, ident is the XREP identity, addr,pub_addr are the
46 """
47 for peer, (ident, url, pub_url, location) in peers.items():
48 self.peers[peer] = ident
49 if ident != self.identity:
50 self.sub.connect(disambiguate_url(pub_url, location))
51 if ident > self.identity:
52 # prevent duplicate xrep, by only connecting
53 # engines to engines with higher IDENTITY
54 # a doubly-connected pair will crash
55 self.socket.connect(disambiguate_url(url, location))
56
57 def send(self, peers, msg, flags=0, copy=True):
58 if not isinstance(peers, list):
59 peers = [peers]
60 if not isinstance(msg, list):
61 msg = [msg]
62 for p in peers:
63 ident = self.peers[p]
64 self.socket.send_multipart([ident]+msg, flags=flags, copy=copy)
65
66 def recv(self, flags=0, copy=True):
67 return self.socket.recv_multipart(flags=flags, copy=copy)[1:]
68
69 def publish(self, msg, flags=0, copy=True):
70 if not isinstance(msg, list):
71 msg = [msg]
72 self.pub.send_multipart(msg, copy=copy)
73
74 def consume(self, flags=0, copy=True):
75 return self.sub.recv_multipart(flags=flags, copy=copy)
76
77
@@ -0,0 +1,43 b''
1 import sys
2
3 from IPython.zmq.parallel import client
4
5
6 rc = client.Client()
7 rc.block=True
8 view = rc[:]
9 view.run('communicator.py')
10 view.execute('com = EngineCommunicator()')
11
12 # gather the connection information into a dict
13 ar = view.apply_async_bound(lambda : com.info)
14 peers = ar.get_dict()
15 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
16
17 # connect the engines to each other:
18 view.apply_sync_bound(lambda pdict: com.connect(pdict), peers)
19
20 # now all the engines are connected, and we can communicate between them:
21
22 def broadcast(client, sender, msg_name, dest_name=None, block=None):
23 """broadcast a message from one engine to all others."""
24 dest_name = msg_name if dest_name is None else dest_name
25 client[sender].execute('com.publish(%s)'%msg_name, block=None)
26 targets = client.ids
27 targets.remove(sender)
28 return client[targets].execute('%s=com.consume()'%dest_name, block=None)
29
30 def send(client, sender, targets, msg_name, dest_name=None, block=None):
31 """send a message from one to one-or-more engines."""
32 dest_name = msg_name if dest_name is None else dest_name
33 def _send(targets, m_name):
34 msg = globals()[m_name]
35 return com.send(targets, msg)
36
37 client[sender].apply_async_bound(_send, targets, msg_name)
38
39 return client[targets].execute('%s=com.recv()'%dest_name, block=None)
40
41
42
43
General Comments 0
You need to be logged in to leave comments. Login now