Show More
@@ -55,7 +55,7 b' class ZMQStreamHandler(websocket.WebSocketHandler):' | |||||
55 | logging.info("Connection open: %s, %s" % (kernel_id, self.client_id)) |
|
55 | logging.info("Connection open: %s, %s" % (kernel_id, self.client_id)) | |
56 |
|
56 | |||
57 | def on_message(self, msg): |
|
57 | def on_message(self, msg): | |
58 |
self.router.forward_ |
|
58 | self.router.forward_msg(self.client_id, msg) | |
59 |
|
59 | |||
60 | def on_close(self): |
|
60 | def on_close(self): | |
61 | self.router.unregister_client(self.client_id) |
|
61 | self.router.unregister_client(self.client_id) |
@@ -55,7 +55,7 b" LOCALHOST = '127.0.0.1'" | |||||
55 |
|
55 | |||
56 | class NotebookWebApplication(web.Application): |
|
56 | class NotebookWebApplication(web.Application): | |
57 |
|
57 | |||
58 | def __init__(self, kernel_manager, log, kernel_argv): |
|
58 | def __init__(self, kernel_manager, log, kernel_argv, config): | |
59 | handlers = [ |
|
59 | handlers = [ | |
60 | (r"/", MainHandler), |
|
60 | (r"/", MainHandler), | |
61 | (r"/kernels", KernelHandler), |
|
61 | (r"/kernels", KernelHandler), | |
@@ -74,6 +74,7 b' class NotebookWebApplication(web.Application):' | |||||
74 | self.kernel_manager = kernel_manager |
|
74 | self.kernel_manager = kernel_manager | |
75 | self.log = log |
|
75 | self.log = log | |
76 | self.kernel_argv = kernel_argv |
|
76 | self.kernel_argv = kernel_argv | |
|
77 | self.config = config | |||
77 | self._routers = {} |
|
78 | self._routers = {} | |
78 | self._session_dict = {} |
|
79 | self._session_dict = {} | |
79 |
|
80 | |||
@@ -99,8 +100,12 b' class NotebookWebApplication(web.Application):' | |||||
99 | self._session_dict[kernel_id] = sm |
|
100 | self._session_dict[kernel_id] = sm | |
100 | iopub_stream = sm.get_iopub_stream() |
|
101 | iopub_stream = sm.get_iopub_stream() | |
101 | shell_stream = sm.get_shell_stream() |
|
102 | shell_stream = sm.get_shell_stream() | |
102 |
iopub_router = IOPubStreamRouter( |
|
103 | iopub_router = IOPubStreamRouter( | |
103 | shell_router = ShellStreamRouter(shell_stream, sm.session) |
|
104 | zmq_stream=iopub_stream, session=sm.session, config=self.config | |
|
105 | ) | |||
|
106 | shell_router = ShellStreamRouter( | |||
|
107 | zmq_stream=shell_stream, session=sm.session, config=self.config | |||
|
108 | ) | |||
104 | self._routers[(kernel_id, 'iopub')] = iopub_router |
|
109 | self._routers[(kernel_id, 'iopub')] = iopub_router | |
105 | self._routers[(kernel_id, 'shell')] = shell_router |
|
110 | self._routers[(kernel_id, 'shell')] = shell_router | |
106 |
|
111 | |||
@@ -230,7 +235,9 b' class IPythonNotebookApp(BaseIPythonApplication):' | |||||
230 | def initialize(self, argv=None): |
|
235 | def initialize(self, argv=None): | |
231 | super(IPythonNotebookApp, self).initialize(argv) |
|
236 | super(IPythonNotebookApp, self).initialize(argv) | |
232 | self.init_kernel_manager() |
|
237 | self.init_kernel_manager() | |
233 |
self.web_app = NotebookWebApplication( |
|
238 | self.web_app = NotebookWebApplication( | |
|
239 | self.kernel_manager, self.log, self.kernel_argv, self.config | |||
|
240 | ) | |||
234 | self.http_server = httpserver.HTTPServer(self.web_app) |
|
241 | self.http_server = httpserver.HTTPServer(self.web_app) | |
235 | self.http_server.listen(self.port) |
|
242 | self.http_server.listen(self.port) | |
236 |
|
243 |
@@ -2,57 +2,93 b' import uuid' | |||||
2 | from Queue import Queue |
|
2 | from Queue import Queue | |
3 | import json |
|
3 | import json | |
4 |
|
4 | |||
5 | class ZMQStreamRouter(object): |
|
5 | from IPython.config.configurable import Configurable | |
|
6 | from IPython.utils.traitlets import Instance, Int, Dict | |||
6 |
|
7 | |||
7 | def __init__(self, zmq_stream, session): |
|
8 | class ZMQStreamRouter(Configurable): | |
8 | self.zmq_stream = zmq_stream |
|
9 | ||
9 | self.session = session |
|
10 | zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream') | |
10 | self._clients = {} |
|
11 | session = Instance('IPython.zmq.session.Session') | |
|
12 | max_msg_size = Int(2048, config=True, help=""" | |||
|
13 | The max raw message size accepted from the browser | |||
|
14 | over a WebSocket connection. | |||
|
15 | """) | |||
|
16 | ||||
|
17 | _clients = Dict() | |||
|
18 | ||||
|
19 | def __init__(self, **kwargs): | |||
|
20 | super(ZMQStreamRouter,self).__init__(**kwargs) | |||
11 | self.zmq_stream.on_recv(self._on_zmq_reply) |
|
21 | self.zmq_stream.on_recv(self._on_zmq_reply) | |
12 |
|
22 | |||
13 | def register_client(self, client): |
|
23 | def register_client(self, client): | |
|
24 | """Register a client, returning a client uuid.""" | |||
14 | client_id = uuid.uuid4() |
|
25 | client_id = uuid.uuid4() | |
15 | self._clients[client_id] = client |
|
26 | self._clients[client_id] = client | |
16 | return client_id |
|
27 | return client_id | |
17 |
|
28 | |||
18 | def unregister_client(self, client_id): |
|
29 | def unregister_client(self, client_id): | |
|
30 | """Unregister a client by its client uuid.""" | |||
19 | del self._clients[client_id] |
|
31 | del self._clients[client_id] | |
20 |
|
32 | |||
21 | def copy_clients(self, router): |
|
33 | def copy_clients(self, router): | |
22 |
|
|
34 | """Copy the clients of another router to this one. | |
|
35 | ||||
|
36 | This is used to enable the backend zeromq stream to disconnect | |||
|
37 | and reconnect while the WebSocket connections to browsers | |||
|
38 | remain, such as when a kernel is restarted. | |||
|
39 | """ | |||
23 | for client_id, client in router._clients.items(): |
|
40 | for client_id, client in router._clients.items(): | |
24 | client.router = self |
|
41 | client.router = self | |
25 | self._clients[client_id] = client |
|
42 | self._clients[client_id] = client | |
26 |
|
43 | |||
|
44 | def forward_msg(self, client_id, msg): | |||
|
45 | """Forward a msg to a client by its id. | |||
|
46 | ||||
|
47 | The default implementation of this will fail silently if a message | |||
|
48 | arrives on a socket that doesn't support it. This method should | |||
|
49 | use max_msg_size to check and silently discard message that are too | |||
|
50 | long.""" | |||
|
51 | pass | |||
|
52 | ||||
|
53 | def _on_zmq_reply(self, msg_list): | |||
|
54 | """Handle a message the ZMQ stream sends to the router. | |||
|
55 | ||||
|
56 | Usually, this is where the return message will be written to | |||
|
57 | clients that need it using client.write_message(). | |||
|
58 | """ | |||
|
59 | pass | |||
|
60 | ||||
27 |
|
61 | |||
28 | class IOPubStreamRouter(ZMQStreamRouter): |
|
62 | class IOPubStreamRouter(ZMQStreamRouter): | |
29 |
|
63 | |||
30 | def _on_zmq_reply(self, msg_list): |
|
64 | def _on_zmq_reply(self, msg_list): | |
|
65 | msg = self.session.unpack_message(msg_list) | |||
|
66 | msg = json.dumps(msg) | |||
31 | for client_id, client in self._clients.items(): |
|
67 | for client_id, client in self._clients.items(): | |
32 | for msg in msg_list: |
|
68 | for msg in msg_list: | |
33 | print "Got message: ", msg |
|
|||
34 | client.write_message(msg) |
|
69 | client.write_message(msg) | |
35 |
|
70 | |||
36 | def forward_unicode(self, client_id, msg): |
|
|||
37 | # This is a SUB stream that we should never write to. |
|
|||
38 | pass |
|
|||
39 |
|
||||
40 |
|
71 | |||
41 | class ShellStreamRouter(ZMQStreamRouter): |
|
72 | class ShellStreamRouter(ZMQStreamRouter): | |
42 |
|
73 | |||
43 | def __init__(self, zmq_stream, session): |
|
74 | _request_queue = Instance(Queue,(),{}) | |
44 | ZMQStreamRouter.__init__(self, zmq_stream, session) |
|
|||
45 | self._request_queue = Queue() |
|
|||
46 |
|
75 | |||
47 | def _on_zmq_reply(self, msg_list): |
|
76 | def _on_zmq_reply(self, msg_list): | |
|
77 | msg = self.session.unpack_message(msg_list) | |||
|
78 | msg = json.dumps(msg) | |||
|
79 | print "Reply: ", msg_list | |||
48 | client_id = self._request_queue.get(block=False) |
|
80 | client_id = self._request_queue.get(block=False) | |
49 | client = self._clients.get(client_id) |
|
81 | client = self._clients.get(client_id) | |
50 | if client is not None: |
|
82 | if client is not None: | |
51 | for msg in msg_list: |
|
83 | for msg in msg_list: | |
52 | client.write_message(msg) |
|
84 | client.write_message(msg) | |
53 |
|
85 | |||
54 |
def forward_ |
|
86 | def forward_msg(self, client_id, msg): | |
55 | print "Inbound message: ", msg |
|
87 | if len(msg) < self.max_msg_size: | |
56 | self._request_queue.put(client_id) |
|
88 | msg = json.loads(msg) | |
57 | self.session.send(self.zmq_stream, msg) |
|
89 | print "Raw msg: ", msg | |
|
90 | to_send = self.session.serialize(msg) | |||
|
91 | print "to_send: ", to_send, to_send[-3:] | |||
|
92 | self._request_queue.put(client_id) | |||
|
93 | self.session.send_raw(self.zmq_stream, to_send[-3:]) | |||
58 |
|
94 |
General Comments 0
You need to be logged in to leave comments.
Login now