"""Routers that connect WebSockets to ZMQ sockets.""" #----------------------------------------------------------------------------- # Copyright (C) 2011 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING.txt, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import uuid from Queue import Queue import json from IPython.config.configurable import Configurable from IPython.utils.traitlets import Instance, Int, Dict #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- class ZMQStreamRouter(Configurable): zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream') session = Instance('IPython.zmq.session.Session') max_msg_size = Int(2048, config=True, help=""" The max raw message size accepted from the browser over a WebSocket connection. """) _clients = Dict() def __init__(self, **kwargs): super(ZMQStreamRouter,self).__init__(**kwargs) self.zmq_stream.on_recv(self._on_zmq_reply) def __del__(self): self.close() def close(self): """Disable the routing actions of this router.""" self._clients = {} self.zmq_stream.on_recv(None) def register_client(self, client): """Register a client, returning a client uuid.""" client_id = unicode(uuid.uuid4()) self._clients[client_id] = client return client_id def unregister_client(self, client_id): """Unregister a client by its client uuid.""" del self._clients[client_id] def copy_clients(self, router): """Copy the clients of another router to this one. This is used to enable the backend zeromq stream to disconnect and reconnect while the WebSocket connections to browsers remain, such as when a kernel is restarted. """ for client_id, client in router._clients.items(): client.router = self self._clients[client_id] = client def forward_msg(self, client_id, msg): """Forward a msg to a client by its id. The default implementation of this will fail silently if a message arrives on a socket that doesn't support it. This method should use max_msg_size to check and silently discard message that are too long.""" pass def _on_zmq_reply(self, msg_list): """Handle a message the ZMQ stream sends to the router. Usually, this is where the return message will be written to clients that need it using client.write_message(). """ pass def _reserialize_reply(self, msg_list): """Reserialize a reply message using JSON. This takes the msg list from the ZMQ socket, unserializes it using self.session and then serializes the result using JSON. This method should be used by self._on_zmq_reply to build messages that can be sent back to the browser. """ idents, msg_list = self.session.feed_identities(msg_list) msg = self.session.unserialize(msg_list) msg['header'].pop('date') msg.pop('buffers') return json.dumps(msg) class IOPubStreamRouter(ZMQStreamRouter): def _on_zmq_reply(self, msg_list): msg = self._reserialize_reply(msg_list) for client_id, client in self._clients.items(): client.write_message(msg) class ShellStreamRouter(ZMQStreamRouter): _request_queue = Instance(Queue,(),{}) def _on_zmq_reply(self, msg_list): msg = self._reserialize_reply(msg_list) client_id = self._request_queue.get(block=False) client = self._clients.get(client_id) if client is not None: client.write_message(msg) def forward_msg(self, client_id, msg): if len(msg) < self.max_msg_size: msg = json.loads(msg) self._request_queue.put(client_id) self.session.send(self.zmq_stream, msg)