##// END OF EJS Templates
Work to adapt routers to new Session message protocol.
Brian E. Granger -
Show More
@@ -1,17 +1,22 b''
1 """Tornado handlers for the notebook."""
2
3 #-----------------------------------------------------------------------------
4 # Imports
5 #-----------------------------------------------------------------------------
6
1 import datetime
7 import datetime
2 import json
8 import json
3 import logging
9 import logging
4 import os
10 import os
5 import urllib
11 import urllib
6 import uuid
7 from Queue import Queue
8
12
9 from tornado import web
13 from tornado import web
10 from tornado import websocket
14 from tornado import websocket
11
15
16 #-----------------------------------------------------------------------------
17 # Handlers
18 #-----------------------------------------------------------------------------
12
19
13 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
14 _kernel_action_regex = r"(?P<action>restart|interrupt)"
15
20
16 class MainHandler(web.RequestHandler):
21 class MainHandler(web.RequestHandler):
17 def get(self):
22 def get(self):
@@ -39,58 +44,6 b' class KernelActionHandler(web.RequestHandler):'
39 self.write(json.dumps(new_kernel_id))
44 self.write(json.dumps(new_kernel_id))
40
45
41
46
42 class ZMQStreamRouter(object):
43
44 def __init__(self, zmq_stream):
45 self.zmq_stream = zmq_stream
46 self._clients = {}
47 self.zmq_stream.on_recv(self._on_zmq_reply)
48
49 def register_client(self, client):
50 client_id = uuid.uuid4()
51 self._clients[client_id] = client
52 return client_id
53
54 def unregister_client(self, client_id):
55 del self._clients[client_id]
56
57 def copy_clients(self, router):
58 # Copy the clients of another router.
59 for client_id, client in router._clients.items():
60 client.router = self
61 self._clients[client_id] = client
62
63
64 class IOPubStreamRouter(ZMQStreamRouter):
65
66 def _on_zmq_reply(self, msg_list):
67 for client_id, client in self._clients.items():
68 for msg in msg_list:
69 client.write_message(msg)
70
71 def forward_unicode(self, client_id, msg):
72 # This is a SUB stream that we should never write to.
73 pass
74
75
76 class ShellStreamRouter(ZMQStreamRouter):
77
78 def __init__(self, zmq_stream):
79 ZMQStreamRouter.__init__(self, zmq_stream)
80 self._request_queue = Queue()
81
82 def _on_zmq_reply(self, msg_list):
83 client_id = self._request_queue.get(block=False)
84 client = self._clients.get(client_id)
85 if client is not None:
86 for msg in msg_list:
87 client.write_message(msg)
88
89 def forward_unicode(self, client_id, msg):
90 self._request_queue.put(client_id)
91 self.zmq_stream.send_unicode(msg)
92
93
94 class ZMQStreamHandler(websocket.WebSocketHandler):
47 class ZMQStreamHandler(websocket.WebSocketHandler):
95
48
96 def initialize(self, stream_name):
49 def initialize(self, stream_name):
@@ -4,7 +4,6 b''
4 # Imports
4 # Imports
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6
6
7 import logging
8 import signal
7 import signal
9 import sys
8 import sys
10 import uuid
9 import uuid
@@ -13,7 +12,7 b' import zmq'
13
12
14 from IPython.config.configurable import LoggingConfigurable
13 from IPython.config.configurable import LoggingConfigurable
15 from IPython.zmq.ipkernel import launch_kernel
14 from IPython.zmq.ipkernel import launch_kernel
16 from IPython.utils.traitlets import Instance, Dict, Unicode
15 from IPython.utils.traitlets import Instance, Dict
17
16
18 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
19 # Classes
18 # Classes
@@ -99,8 +99,8 b' class NotebookWebApplication(web.Application):'
99 self._session_dict[kernel_id] = sm
99 self._session_dict[kernel_id] = sm
100 iopub_stream = sm.get_iopub_stream()
100 iopub_stream = sm.get_iopub_stream()
101 shell_stream = sm.get_shell_stream()
101 shell_stream = sm.get_shell_stream()
102 iopub_router = IOPubStreamRouter(iopub_stream)
102 iopub_router = IOPubStreamRouter(iopub_stream, sm.session)
103 shell_router = ShellStreamRouter(shell_stream)
103 shell_router = ShellStreamRouter(shell_stream, sm.session)
104 self._routers[(kernel_id, 'iopub')] = iopub_router
104 self._routers[(kernel_id, 'iopub')] = iopub_router
105 self._routers[(kernel_id, 'shell')] = shell_router
105 self._routers[(kernel_id, 'shell')] = shell_router
106
106
@@ -139,6 +139,8 b' class NotebookWebApplication(web.Application):'
139 router = self._routers[(kernel_id, stream_name)]
139 router = self._routers[(kernel_id, stream_name)]
140 return router
140 return router
141
141
142
143
142 #-----------------------------------------------------------------------------
144 #-----------------------------------------------------------------------------
143 # Aliases and Flags
145 # Aliases and Flags
144 #-----------------------------------------------------------------------------
146 #-----------------------------------------------------------------------------
@@ -1,11 +1,12 b''
1 import uuid
1 import uuid
2 from Queue import Queue
2 from Queue import Queue
3
3 import json
4
4
5 class ZMQStreamRouter(object):
5 class ZMQStreamRouter(object):
6
6
7 def __init__(self, zmq_stream):
7 def __init__(self, zmq_stream, session):
8 self.zmq_stream = zmq_stream
8 self.zmq_stream = zmq_stream
9 self.session = session
9 self._clients = {}
10 self._clients = {}
10 self.zmq_stream.on_recv(self._on_zmq_reply)
11 self.zmq_stream.on_recv(self._on_zmq_reply)
11
12
@@ -29,6 +30,7 b' class IOPubStreamRouter(ZMQStreamRouter):'
29 def _on_zmq_reply(self, msg_list):
30 def _on_zmq_reply(self, msg_list):
30 for client_id, client in self._clients.items():
31 for client_id, client in self._clients.items():
31 for msg in msg_list:
32 for msg in msg_list:
33 print "Got message: ", msg
32 client.write_message(msg)
34 client.write_message(msg)
33
35
34 def forward_unicode(self, client_id, msg):
36 def forward_unicode(self, client_id, msg):
@@ -38,8 +40,8 b' class IOPubStreamRouter(ZMQStreamRouter):'
38
40
39 class ShellStreamRouter(ZMQStreamRouter):
41 class ShellStreamRouter(ZMQStreamRouter):
40
42
41 def __init__(self, zmq_stream):
43 def __init__(self, zmq_stream, session):
42 ZMQStreamRouter.__init__(self, zmq_stream)
44 ZMQStreamRouter.__init__(self, zmq_stream, session)
43 self._request_queue = Queue()
45 self._request_queue = Queue()
44
46
45 def _on_zmq_reply(self, msg_list):
47 def _on_zmq_reply(self, msg_list):
@@ -50,8 +52,7 b' class ShellStreamRouter(ZMQStreamRouter):'
50 client.write_message(msg)
52 client.write_message(msg)
51
53
52 def forward_unicode(self, client_id, msg):
54 def forward_unicode(self, client_id, msg):
53 self._request_queue.put(client_id)
55 print "Inbound message: ", msg
54 self.zmq_stream.send_unicode(msg)
56 self._request_queue.put(client_id)
55
57 self.session.send(self.zmq_stream, msg)
56
57
58
General Comments 0
You need to be logged in to leave comments. Login now