routers.py
57 lines
| 1.5 KiB
| text/x-python
|
PythonLexer
Brian E. Granger
|
r4341 | import uuid | ||
from Queue import Queue | ||||
class ZMQStreamRouter(object): | ||||
def __init__(self, zmq_stream): | ||||
self.zmq_stream = zmq_stream | ||||
self._clients = {} | ||||
self.zmq_stream.on_recv(self._on_zmq_reply) | ||||
def register_client(self, client): | ||||
client_id = uuid.uuid4() | ||||
self._clients[client_id] = client | ||||
return client_id | ||||
def unregister_client(self, client_id): | ||||
del self._clients[client_id] | ||||
def copy_clients(self, router): | ||||
# Copy the clients of another router. | ||||
for client_id, client in router._clients.items(): | ||||
client.router = self | ||||
self._clients[client_id] = client | ||||
class IOPubStreamRouter(ZMQStreamRouter): | ||||
def _on_zmq_reply(self, msg_list): | ||||
for client_id, client in self._clients.items(): | ||||
for msg in msg_list: | ||||
client.write_message(msg) | ||||
def forward_unicode(self, client_id, msg): | ||||
# This is a SUB stream that we should never write to. | ||||
pass | ||||
class ShellStreamRouter(ZMQStreamRouter): | ||||
def __init__(self, zmq_stream): | ||||
ZMQStreamRouter.__init__(self, zmq_stream) | ||||
self._request_queue = Queue() | ||||
def _on_zmq_reply(self, msg_list): | ||||
client_id = self._request_queue.get(block=False) | ||||
client = self._clients.get(client_id) | ||||
if client is not None: | ||||
for msg in msg_list: | ||||
client.write_message(msg) | ||||
def forward_unicode(self, client_id, msg): | ||||
self._request_queue.put(client_id) | ||||
self.zmq_stream.send_unicode(msg) | ||||