##// END OF EJS Templates
More work on updating the notebook zmq forwarding.
Brian E. Granger -
Show More
@@ -55,7 +55,7 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
55 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
55 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
56
56
57 def on_message(self, msg):
57 def on_message(self, msg):
58 self.router.forward_unicode(self.client_id, msg)
58 self.router.forward_msg(self.client_id, msg)
59
59
60 def on_close(self):
60 def on_close(self):
61 self.router.unregister_client(self.client_id)
61 self.router.unregister_client(self.client_id)
@@ -55,7 +55,7 b" LOCALHOST = '127.0.0.1'"
55
55
56 class NotebookWebApplication(web.Application):
56 class NotebookWebApplication(web.Application):
57
57
58 def __init__(self, kernel_manager, log, kernel_argv):
58 def __init__(self, kernel_manager, log, kernel_argv, config):
59 handlers = [
59 handlers = [
60 (r"/", MainHandler),
60 (r"/", MainHandler),
61 (r"/kernels", KernelHandler),
61 (r"/kernels", KernelHandler),
@@ -74,6 +74,7 b' class NotebookWebApplication(web.Application):'
74 self.kernel_manager = kernel_manager
74 self.kernel_manager = kernel_manager
75 self.log = log
75 self.log = log
76 self.kernel_argv = kernel_argv
76 self.kernel_argv = kernel_argv
77 self.config = config
77 self._routers = {}
78 self._routers = {}
78 self._session_dict = {}
79 self._session_dict = {}
79
80
@@ -99,8 +100,12 b' class NotebookWebApplication(web.Application):'
99 self._session_dict[kernel_id] = sm
100 self._session_dict[kernel_id] = sm
100 iopub_stream = sm.get_iopub_stream()
101 iopub_stream = sm.get_iopub_stream()
101 shell_stream = sm.get_shell_stream()
102 shell_stream = sm.get_shell_stream()
102 iopub_router = IOPubStreamRouter(iopub_stream, sm.session)
103 iopub_router = IOPubStreamRouter(
103 shell_router = ShellStreamRouter(shell_stream, sm.session)
104 zmq_stream=iopub_stream, session=sm.session, config=self.config
105 )
106 shell_router = ShellStreamRouter(
107 zmq_stream=shell_stream, session=sm.session, config=self.config
108 )
104 self._routers[(kernel_id, 'iopub')] = iopub_router
109 self._routers[(kernel_id, 'iopub')] = iopub_router
105 self._routers[(kernel_id, 'shell')] = shell_router
110 self._routers[(kernel_id, 'shell')] = shell_router
106
111
@@ -230,7 +235,9 b' class IPythonNotebookApp(BaseIPythonApplication):'
230 def initialize(self, argv=None):
235 def initialize(self, argv=None):
231 super(IPythonNotebookApp, self).initialize(argv)
236 super(IPythonNotebookApp, self).initialize(argv)
232 self.init_kernel_manager()
237 self.init_kernel_manager()
233 self.web_app = NotebookWebApplication(self.kernel_manager, self.log, self.kernel_argv)
238 self.web_app = NotebookWebApplication(
239 self.kernel_manager, self.log, self.kernel_argv, self.config
240 )
234 self.http_server = httpserver.HTTPServer(self.web_app)
241 self.http_server = httpserver.HTTPServer(self.web_app)
235 self.http_server.listen(self.port)
242 self.http_server.listen(self.port)
236
243
@@ -2,57 +2,93 b' import uuid'
2 from Queue import Queue
2 from Queue import Queue
3 import json
3 import json
4
4
5 class ZMQStreamRouter(object):
5 from IPython.config.configurable import Configurable
6 from IPython.utils.traitlets import Instance, Int, Dict
6
7
7 def __init__(self, zmq_stream, session):
8 class ZMQStreamRouter(Configurable):
8 self.zmq_stream = zmq_stream
9
9 self.session = session
10 zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
10 self._clients = {}
11 session = Instance('IPython.zmq.session.Session')
12 max_msg_size = Int(2048, config=True, help="""
13 The max raw message size accepted from the browser
14 over a WebSocket connection.
15 """)
16
17 _clients = Dict()
18
19 def __init__(self, **kwargs):
20 super(ZMQStreamRouter,self).__init__(**kwargs)
11 self.zmq_stream.on_recv(self._on_zmq_reply)
21 self.zmq_stream.on_recv(self._on_zmq_reply)
12
22
13 def register_client(self, client):
23 def register_client(self, client):
24 """Register a client, returning a client uuid."""
14 client_id = uuid.uuid4()
25 client_id = uuid.uuid4()
15 self._clients[client_id] = client
26 self._clients[client_id] = client
16 return client_id
27 return client_id
17
28
18 def unregister_client(self, client_id):
29 def unregister_client(self, client_id):
30 """Unregister a client by its client uuid."""
19 del self._clients[client_id]
31 del self._clients[client_id]
20
32
21 def copy_clients(self, router):
33 def copy_clients(self, router):
22 # Copy the clients of another router.
34 """Copy the clients of another router to this one.
35
36 This is used to enable the backend zeromq stream to disconnect
37 and reconnect while the WebSocket connections to browsers
38 remain, such as when a kernel is restarted.
39 """
23 for client_id, client in router._clients.items():
40 for client_id, client in router._clients.items():
24 client.router = self
41 client.router = self
25 self._clients[client_id] = client
42 self._clients[client_id] = client
26
43
44 def forward_msg(self, client_id, msg):
45 """Forward a msg to a client by its id.
46
47 The default implementation of this will fail silently if a message
48 arrives on a socket that doesn't support it. This method should
49 use max_msg_size to check and silently discard message that are too
50 long."""
51 pass
52
53 def _on_zmq_reply(self, msg_list):
54 """Handle a message the ZMQ stream sends to the router.
55
56 Usually, this is where the return message will be written to
57 clients that need it using client.write_message().
58 """
59 pass
60
27
61
28 class IOPubStreamRouter(ZMQStreamRouter):
62 class IOPubStreamRouter(ZMQStreamRouter):
29
63
30 def _on_zmq_reply(self, msg_list):
64 def _on_zmq_reply(self, msg_list):
65 msg = self.session.unpack_message(msg_list)
66 msg = json.dumps(msg)
31 for client_id, client in self._clients.items():
67 for client_id, client in self._clients.items():
32 for msg in msg_list:
68 for msg in msg_list:
33 print "Got message: ", msg
34 client.write_message(msg)
69 client.write_message(msg)
35
70
36 def forward_unicode(self, client_id, msg):
37 # This is a SUB stream that we should never write to.
38 pass
39
40
71
41 class ShellStreamRouter(ZMQStreamRouter):
72 class ShellStreamRouter(ZMQStreamRouter):
42
73
43 def __init__(self, zmq_stream, session):
74 _request_queue = Instance(Queue,(),{})
44 ZMQStreamRouter.__init__(self, zmq_stream, session)
45 self._request_queue = Queue()
46
75
47 def _on_zmq_reply(self, msg_list):
76 def _on_zmq_reply(self, msg_list):
77 msg = self.session.unpack_message(msg_list)
78 msg = json.dumps(msg)
79 print "Reply: ", msg_list
48 client_id = self._request_queue.get(block=False)
80 client_id = self._request_queue.get(block=False)
49 client = self._clients.get(client_id)
81 client = self._clients.get(client_id)
50 if client is not None:
82 if client is not None:
51 for msg in msg_list:
83 for msg in msg_list:
52 client.write_message(msg)
84 client.write_message(msg)
53
85
54 def forward_unicode(self, client_id, msg):
86 def forward_msg(self, client_id, msg):
55 print "Inbound message: ", msg
87 if len(msg) < self.max_msg_size:
56 self._request_queue.put(client_id)
88 msg = json.loads(msg)
57 self.session.send(self.zmq_stream, msg)
89 print "Raw msg: ", msg
90 to_send = self.session.serialize(msg)
91 print "to_send: ", to_send, to_send[-3:]
92 self._request_queue.put(client_id)
93 self.session.send_raw(self.zmq_stream, to_send[-3:])
58
94
General Comments 0
You need to be logged in to leave comments. Login now