routers.py
125 lines
| 4.2 KiB
| text/x-python
|
PythonLexer
Brian E. Granger
|
r4348 | """Routers that connect WebSockets to ZMQ sockets.""" | ||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING.txt, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
Brian E. Granger
|
r4341 | import uuid | ||
from Queue import Queue | ||||
Brian E. Granger
|
r4346 | import json | ||
Brian E. Granger
|
r4341 | |||
Brian E. Granger
|
r4347 | from IPython.config.configurable import Configurable | ||
from IPython.utils.traitlets import Instance, Int, Dict | ||||
Brian E. Granger
|
r4341 | |||
Brian E. Granger
|
r4348 | #----------------------------------------------------------------------------- | ||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
Brian E. Granger
|
r4347 | class ZMQStreamRouter(Configurable): | ||
zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
session = Instance('IPython.zmq.session.Session') | ||||
max_msg_size = Int(2048, config=True, help=""" | ||||
The max raw message size accepted from the browser | ||||
over a WebSocket connection. | ||||
""") | ||||
_clients = Dict() | ||||
def __init__(self, **kwargs): | ||||
super(ZMQStreamRouter,self).__init__(**kwargs) | ||||
Brian E. Granger
|
r4341 | self.zmq_stream.on_recv(self._on_zmq_reply) | ||
Brian E. Granger
|
r4495 | def __del__(self): | ||
self.close() | ||||
def close(self): | ||||
"""Disable the routing actions of this router.""" | ||||
self._clients = {} | ||||
self.zmq_stream.on_recv(None) | ||||
Brian E. Granger
|
r4341 | def register_client(self, client): | ||
Brian E. Granger
|
r4347 | """Register a client, returning a client uuid.""" | ||
Brian E. Granger
|
r4495 | client_id = unicode(uuid.uuid4()) | ||
Brian E. Granger
|
r4341 | self._clients[client_id] = client | ||
return client_id | ||||
def unregister_client(self, client_id): | ||||
Brian E. Granger
|
r4347 | """Unregister a client by its client uuid.""" | ||
Brian E. Granger
|
r4341 | del self._clients[client_id] | ||
def copy_clients(self, router): | ||||
Brian E. Granger
|
r4347 | """Copy the clients of another router to this one. | ||
This is used to enable the backend zeromq stream to disconnect | ||||
and reconnect while the WebSocket connections to browsers | ||||
remain, such as when a kernel is restarted. | ||||
""" | ||||
Brian E. Granger
|
r4341 | for client_id, client in router._clients.items(): | ||
client.router = self | ||||
self._clients[client_id] = client | ||||
Brian E. Granger
|
r4347 | def forward_msg(self, client_id, msg): | ||
"""Forward a msg to a client by its id. | ||||
The default implementation of this will fail silently if a message | ||||
arrives on a socket that doesn't support it. This method should | ||||
use max_msg_size to check and silently discard message that are too | ||||
long.""" | ||||
pass | ||||
def _on_zmq_reply(self, msg_list): | ||||
"""Handle a message the ZMQ stream sends to the router. | ||||
Usually, this is where the return message will be written to | ||||
clients that need it using client.write_message(). | ||||
""" | ||||
pass | ||||
Brian E. Granger
|
r4348 | def _reserialize_reply(self, msg_list): | ||
"""Reserialize a reply message using JSON. | ||||
This takes the msg list from the ZMQ socket, unserializes it using | ||||
self.session and then serializes the result using JSON. This method | ||||
should be used by self._on_zmq_reply to build messages that can | ||||
be sent back to the browser. | ||||
""" | ||||
idents, msg_list = self.session.feed_identities(msg_list) | ||||
Brian E. Granger
|
r4351 | msg = self.session.unserialize(msg_list) | ||
Brian E. Granger
|
r4348 | msg['header'].pop('date') | ||
Brian E. Granger
|
r4356 | msg.pop('buffers') | ||
Brian E. Granger
|
r4348 | return json.dumps(msg) | ||
Brian E. Granger
|
r4341 | |||
class IOPubStreamRouter(ZMQStreamRouter): | ||||
def _on_zmq_reply(self, msg_list): | ||||
Brian E. Granger
|
r4348 | msg = self._reserialize_reply(msg_list) | ||
Brian E. Granger
|
r4341 | for client_id, client in self._clients.items(): | ||
Brian E. Granger
|
r4348 | client.write_message(msg) | ||
Brian E. Granger
|
r4341 | |||
class ShellStreamRouter(ZMQStreamRouter): | ||||
Brian E. Granger
|
r4347 | _request_queue = Instance(Queue,(),{}) | ||
Brian E. Granger
|
r4341 | |||
def _on_zmq_reply(self, msg_list): | ||||
Brian E. Granger
|
r4348 | msg = self._reserialize_reply(msg_list) | ||
Brian E. Granger
|
r4341 | client_id = self._request_queue.get(block=False) | ||
client = self._clients.get(client_id) | ||||
if client is not None: | ||||
Brian E. Granger
|
r4348 | client.write_message(msg) | ||
Brian E. Granger
|
r4341 | |||
Brian E. Granger
|
r4347 | def forward_msg(self, client_id, msg): | ||
if len(msg) < self.max_msg_size: | ||||
msg = json.loads(msg) | ||||
self._request_queue.put(client_id) | ||||
Brian E. Granger
|
r4348 | self.session.send(self.zmq_stream, msg) | ||
Brian E. Granger
|
r4341 | |||