##// END OF EJS Templates
Work to adapt routers to new Session message protocol.
Brian E. Granger -
Show More
@@ -1,17 +1,22 b''
1 """Tornado handlers for the notebook."""
2
3 #-----------------------------------------------------------------------------
4 # Imports
5 #-----------------------------------------------------------------------------
6
1 7 import datetime
2 8 import json
3 9 import logging
4 10 import os
5 11 import urllib
6 import uuid
7 from Queue import Queue
8 12
9 13 from tornado import web
10 14 from tornado import websocket
11 15
16 #-----------------------------------------------------------------------------
17 # Handlers
18 #-----------------------------------------------------------------------------
12 19
13 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
14 _kernel_action_regex = r"(?P<action>restart|interrupt)"
15 20
16 21 class MainHandler(web.RequestHandler):
17 22 def get(self):
@@ -39,58 +44,6 b' class KernelActionHandler(web.RequestHandler):'
39 44 self.write(json.dumps(new_kernel_id))
40 45
41 46
42 class ZMQStreamRouter(object):
43
44 def __init__(self, zmq_stream):
45 self.zmq_stream = zmq_stream
46 self._clients = {}
47 self.zmq_stream.on_recv(self._on_zmq_reply)
48
49 def register_client(self, client):
50 client_id = uuid.uuid4()
51 self._clients[client_id] = client
52 return client_id
53
54 def unregister_client(self, client_id):
55 del self._clients[client_id]
56
57 def copy_clients(self, router):
58 # Copy the clients of another router.
59 for client_id, client in router._clients.items():
60 client.router = self
61 self._clients[client_id] = client
62
63
64 class IOPubStreamRouter(ZMQStreamRouter):
65
66 def _on_zmq_reply(self, msg_list):
67 for client_id, client in self._clients.items():
68 for msg in msg_list:
69 client.write_message(msg)
70
71 def forward_unicode(self, client_id, msg):
72 # This is a SUB stream that we should never write to.
73 pass
74
75
76 class ShellStreamRouter(ZMQStreamRouter):
77
78 def __init__(self, zmq_stream):
79 ZMQStreamRouter.__init__(self, zmq_stream)
80 self._request_queue = Queue()
81
82 def _on_zmq_reply(self, msg_list):
83 client_id = self._request_queue.get(block=False)
84 client = self._clients.get(client_id)
85 if client is not None:
86 for msg in msg_list:
87 client.write_message(msg)
88
89 def forward_unicode(self, client_id, msg):
90 self._request_queue.put(client_id)
91 self.zmq_stream.send_unicode(msg)
92
93
94 47 class ZMQStreamHandler(websocket.WebSocketHandler):
95 48
96 49 def initialize(self, stream_name):
@@ -4,7 +4,6 b''
4 4 # Imports
5 5 #-----------------------------------------------------------------------------
6 6
7 import logging
8 7 import signal
9 8 import sys
10 9 import uuid
@@ -13,7 +12,7 b' import zmq'
13 12
14 13 from IPython.config.configurable import LoggingConfigurable
15 14 from IPython.zmq.ipkernel import launch_kernel
16 from IPython.utils.traitlets import Instance, Dict, Unicode
15 from IPython.utils.traitlets import Instance, Dict
17 16
18 17 #-----------------------------------------------------------------------------
19 18 # Classes
@@ -99,8 +99,8 b' class NotebookWebApplication(web.Application):'
99 99 self._session_dict[kernel_id] = sm
100 100 iopub_stream = sm.get_iopub_stream()
101 101 shell_stream = sm.get_shell_stream()
102 iopub_router = IOPubStreamRouter(iopub_stream)
103 shell_router = ShellStreamRouter(shell_stream)
102 iopub_router = IOPubStreamRouter(iopub_stream, sm.session)
103 shell_router = ShellStreamRouter(shell_stream, sm.session)
104 104 self._routers[(kernel_id, 'iopub')] = iopub_router
105 105 self._routers[(kernel_id, 'shell')] = shell_router
106 106
@@ -139,6 +139,8 b' class NotebookWebApplication(web.Application):'
139 139 router = self._routers[(kernel_id, stream_name)]
140 140 return router
141 141
142
143
142 144 #-----------------------------------------------------------------------------
143 145 # Aliases and Flags
144 146 #-----------------------------------------------------------------------------
@@ -1,11 +1,12 b''
1 1 import uuid
2 2 from Queue import Queue
3
3 import json
4 4
5 5 class ZMQStreamRouter(object):
6 6
7 def __init__(self, zmq_stream):
7 def __init__(self, zmq_stream, session):
8 8 self.zmq_stream = zmq_stream
9 self.session = session
9 10 self._clients = {}
10 11 self.zmq_stream.on_recv(self._on_zmq_reply)
11 12
@@ -29,6 +30,7 b' class IOPubStreamRouter(ZMQStreamRouter):'
29 30 def _on_zmq_reply(self, msg_list):
30 31 for client_id, client in self._clients.items():
31 32 for msg in msg_list:
33 print "Got message: ", msg
32 34 client.write_message(msg)
33 35
34 36 def forward_unicode(self, client_id, msg):
@@ -38,8 +40,8 b' class IOPubStreamRouter(ZMQStreamRouter):'
38 40
39 41 class ShellStreamRouter(ZMQStreamRouter):
40 42
41 def __init__(self, zmq_stream):
42 ZMQStreamRouter.__init__(self, zmq_stream)
43 def __init__(self, zmq_stream, session):
44 ZMQStreamRouter.__init__(self, zmq_stream, session)
43 45 self._request_queue = Queue()
44 46
45 47 def _on_zmq_reply(self, msg_list):
@@ -50,8 +52,7 b' class ShellStreamRouter(ZMQStreamRouter):'
50 52 client.write_message(msg)
51 53
52 54 def forward_unicode(self, client_id, msg):
53 self._request_queue.put(client_id)
54 self.zmq_stream.send_unicode(msg)
55
56
55 print "Inbound message: ", msg
56 self._request_queue.put(client_id)
57 self.session.send(self.zmq_stream, msg)
57 58
General Comments 0
You need to be logged in to leave comments. Login now