communicator.py
59 lines
| 2.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3656 | #!/usr/bin/env python | ||
"""A simple Communicator class that has N,E,S,W neighbors connected via 0MQ PEER sockets""" | ||||
import socket | ||||
import zmq | ||||
MinRK
|
r3666 | from IPython.parallel.util import disambiguate_url | ||
MinRK
|
r3656 | |||
class EngineCommunicator(object): | ||||
"""An object that connects Engines to each other. | ||||
MinRK
|
r3662 | north and east sockets listen, while south and west sockets connect. | ||
MinRK
|
r3656 | |||
This class is useful in cases where there is a set of nodes that | ||||
must communicate only with their nearest neighbors. | ||||
""" | ||||
def __init__(self, interface='tcp://*', identity=None): | ||||
self._ctx = zmq.Context() | ||||
self.north = self._ctx.socket(zmq.PAIR) | ||||
self.west = self._ctx.socket(zmq.PAIR) | ||||
self.south = self._ctx.socket(zmq.PAIR) | ||||
self.east = self._ctx.socket(zmq.PAIR) | ||||
# bind to ports | ||||
northport = self.north.bind_to_random_port(interface) | ||||
eastport = self.east.bind_to_random_port(interface) | ||||
self.north_url = interface+":%i"%northport | ||||
self.east_url = interface+":%i"%eastport | ||||
# guess first public IP from socket | ||||
self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] | ||||
def __del__(self): | ||||
self.north.close() | ||||
self.south.close() | ||||
self.east.close() | ||||
self.west.close() | ||||
self._ctx.term() | ||||
@property | ||||
def info(self): | ||||
"""return the connection info for this object's sockets.""" | ||||
return (self.location, self.north_url, self.east_url) | ||||
def connect(self, south_peer=None, west_peer=None): | ||||
"""connect to peers. `peers` will be a 3-tuples, of the form: | ||||
(location, north_addr, east_addr) | ||||
as produced by | ||||
""" | ||||
if south_peer is not None: | ||||
location, url, _ = south_peer | ||||
self.south.connect(disambiguate_url(url, location)) | ||||
if west_peer is not None: | ||||
location, _, url = west_peer | ||||
self.west.connect(disambiguate_url(url, location)) | ||||