diff --git a/docs/examples/newparallel/interengine/communicator.py b/docs/examples/newparallel/interengine/communicator.py new file mode 100644 index 0000000..48deea8 --- /dev/null +++ b/docs/examples/newparallel/interengine/communicator.py @@ -0,0 +1,77 @@ +import socket + +import uuid +import zmq + +from IPython.zmq.parallel.util import disambiguate_url + +class EngineCommunicator(object): + + def __init__(self, interface='tcp://*', identity=None): + self._ctx = zmq.Context() + self.socket = self._ctx.socket(zmq.XREP) + self.pub = self._ctx.socket(zmq.PUB) + self.sub = self._ctx.socket(zmq.SUB) + + # configure sockets + self.identity = identity or bytes(uuid.uuid4()) + print self.identity + self.socket.setsockopt(zmq.IDENTITY, self.identity) + self.sub.setsockopt(zmq.SUBSCRIBE, b'') + + # bind to ports + port = self.socket.bind_to_random_port(interface) + pub_port = self.pub.bind_to_random_port(interface) + self.url = interface+":%i"%port + self.pub_url = interface+":%i"%pub_port + # guess first public IP from socket + self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] + self.peers = {} + + def __del__(self): + self.socket.close() + self.pub.close() + self.sub.close() + self._ctx.term() + + @property + def info(self): + """return the connection info for this object's sockets.""" + return (self.identity, self.url, self.pub_url, self.location) + + def connect(self, peers): + """connect to peers. `peers` will be a dict of 4-tuples, keyed by name. + {peer : (ident, addr, pub_addr, location)} + where peer is the name, ident is the XREP identity, addr,pub_addr are the + """ + for peer, (ident, url, pub_url, location) in peers.items(): + self.peers[peer] = ident + if ident != self.identity: + self.sub.connect(disambiguate_url(pub_url, location)) + if ident > self.identity: + # prevent duplicate xrep, by only connecting + # engines to engines with higher IDENTITY + # a doubly-connected pair will crash + self.socket.connect(disambiguate_url(url, location)) + + def send(self, peers, msg, flags=0, copy=True): + if not isinstance(peers, list): + peers = [peers] + if not isinstance(msg, list): + msg = [msg] + for p in peers: + ident = self.peers[p] + self.socket.send_multipart([ident]+msg, flags=flags, copy=copy) + + def recv(self, flags=0, copy=True): + return self.socket.recv_multipart(flags=flags, copy=copy)[1:] + + def publish(self, msg, flags=0, copy=True): + if not isinstance(msg, list): + msg = [msg] + self.pub.send_multipart(msg, copy=copy) + + def consume(self, flags=0, copy=True): + return self.sub.recv_multipart(flags=flags, copy=copy) + + diff --git a/docs/examples/newparallel/interengine/interengine.py b/docs/examples/newparallel/interengine/interengine.py new file mode 100644 index 0000000..56bd739 --- /dev/null +++ b/docs/examples/newparallel/interengine/interengine.py @@ -0,0 +1,43 @@ +import sys + +from IPython.zmq.parallel import client + + +rc = client.Client() +rc.block=True +view = rc[:] +view.run('communicator.py') +view.execute('com = EngineCommunicator()') + +# gather the connection information into a dict +ar = view.apply_async_bound(lambda : com.info) +peers = ar.get_dict() +# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators + +# connect the engines to each other: +view.apply_sync_bound(lambda pdict: com.connect(pdict), peers) + +# now all the engines are connected, and we can communicate between them: + +def broadcast(client, sender, msg_name, dest_name=None, block=None): + """broadcast a message from one engine to all others.""" + dest_name = msg_name if dest_name is None else dest_name + client[sender].execute('com.publish(%s)'%msg_name, block=None) + targets = client.ids + targets.remove(sender) + return client[targets].execute('%s=com.consume()'%dest_name, block=None) + +def send(client, sender, targets, msg_name, dest_name=None, block=None): + """send a message from one to one-or-more engines.""" + dest_name = msg_name if dest_name is None else dest_name + def _send(targets, m_name): + msg = globals()[m_name] + return com.send(targets, msg) + + client[sender].apply_async_bound(_send, targets, msg_name) + + return client[targets].execute('%s=com.recv()'%dest_name, block=None) + + + +