##// END OF EJS Templates
Moving base ZMQ handlers to base/zmqhandlers.py.
Brian E. Granger -
Show More
@@ -0,0 +1,114 b''
1 """Tornado handlers for WebSocket <-> ZMQ sockets.
2
3 Authors:
4
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
18
19 import Cookie
20 import logging
21 from tornado import web
22 from tornado import websocket
23
24 from zmq.utils import jsonapi
25
26 from IPython.kernel.zmq.session import Session
27 from IPython.utils.jsonutil import date_default
28 from IPython.utils.py3compat import PY3
29
30 from ..base.handlers import IPythonHandler
31
32 #-----------------------------------------------------------------------------
33 # ZMQ handlers
34 #-----------------------------------------------------------------------------
35
36 class ZMQStreamHandler(websocket.WebSocketHandler):
37
38 def clear_cookie(self, *args, **kwargs):
39 """meaningless for websockets"""
40 pass
41
42 def _reserialize_reply(self, msg_list):
43 """Reserialize a reply message using JSON.
44
45 This takes the msg list from the ZMQ socket, unserializes it using
46 self.session and then serializes the result using JSON. This method
47 should be used by self._on_zmq_reply to build messages that can
48 be sent back to the browser.
49 """
50 idents, msg_list = self.session.feed_identities(msg_list)
51 msg = self.session.unserialize(msg_list)
52 try:
53 msg['header'].pop('date')
54 except KeyError:
55 pass
56 try:
57 msg['parent_header'].pop('date')
58 except KeyError:
59 pass
60 msg.pop('buffers')
61 return jsonapi.dumps(msg, default=date_default)
62
63 def _on_zmq_reply(self, msg_list):
64 # Sometimes this gets triggered when the on_close method is scheduled in the
65 # eventloop but hasn't been called.
66 if self.stream.closed(): return
67 try:
68 msg = self._reserialize_reply(msg_list)
69 except Exception:
70 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
71 else:
72 self.write_message(msg)
73
74 def allow_draft76(self):
75 """Allow draft 76, until browsers such as Safari update to RFC 6455.
76
77 This has been disabled by default in tornado in release 2.2.0, and
78 support will be removed in later versions.
79 """
80 return True
81
82
83 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
84
85 def open(self, kernel_id):
86 self.kernel_id = kernel_id.decode('ascii')
87 self.session = Session(config=self.config)
88 self.save_on_message = self.on_message
89 self.on_message = self.on_first_message
90
91 def _inject_cookie_message(self, msg):
92 """Inject the first message, which is the document cookie,
93 for authentication."""
94 if not PY3 and isinstance(msg, unicode):
95 # Cookie constructor doesn't accept unicode strings
96 # under Python 2.x for some reason
97 msg = msg.encode('utf8', 'replace')
98 try:
99 identity, msg = msg.split(':', 1)
100 self.session.session = identity.decode('ascii')
101 except Exception:
102 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
103
104 try:
105 self.request._cookies = Cookie.SimpleCookie(msg)
106 except:
107 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
108
109 def on_first_message(self, msg):
110 self._inject_cookie_message(msg)
111 if self.get_current_user() is None:
112 self.log.warn("Couldn't authenticate WebSocket connection")
113 raise web.HTTPError(403)
114 self.on_message = self.save_on_message No newline at end of file
@@ -1,271 +1,187 b''
1 """Tornado handlers for the notebook.
1 """Tornado handlers for the notebook.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import Cookie
20 import logging
19 import logging
21 from tornado import web
20 from tornado import web
22 from tornado import websocket
23
21
24 from zmq.utils import jsonapi
22 from zmq.utils import jsonapi
25
23
26 from IPython.kernel.zmq.session import Session
27 from IPython.utils.jsonutil import date_default
24 from IPython.utils.jsonutil import date_default
28 from IPython.utils.py3compat import PY3
29
25
30 from ..base.handlers import IPythonHandler
26 from ..base.handlers import IPythonHandler
27 from ..base.zmqhandlers import AuthenticatedZMQStreamHandler
31
28
32 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
33 # Kernel handlers
30 # Kernel handlers
34 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
35
32
36
33
37 class MainKernelHandler(IPythonHandler):
34 class MainKernelHandler(IPythonHandler):
38
35
39 @web.authenticated
36 @web.authenticated
40 def get(self):
37 def get(self):
41 km = self.kernel_manager
38 km = self.kernel_manager
42 self.finish(jsonapi.dumps(km.list_kernel_ids()))
39 self.finish(jsonapi.dumps(km.list_kernel_ids()))
43
40
44 @web.authenticated
41 @web.authenticated
45 def post(self):
42 def post(self):
46 km = self.kernel_manager
43 km = self.kernel_manager
47 nbm = self.notebook_manager
44 nbm = self.notebook_manager
48 notebook_id = self.get_argument('notebook', default=None)
45 notebook_id = self.get_argument('notebook', default=None)
49 kernel_id = km.start_kernel(notebook_id, cwd=nbm.notebook_dir)
46 kernel_id = km.start_kernel(notebook_id, cwd=nbm.notebook_dir)
50 data = {'ws_url':self.ws_url,'kernel_id':kernel_id}
47 data = {'ws_url':self.ws_url,'kernel_id':kernel_id}
51 self.set_header('Location', '{0}kernels/{1}'.format(self.base_kernel_url, kernel_id))
48 self.set_header('Location', '{0}kernels/{1}'.format(self.base_kernel_url, kernel_id))
52 self.finish(jsonapi.dumps(data))
49 self.finish(jsonapi.dumps(data))
53
50
54
51
55 class KernelHandler(IPythonHandler):
52 class KernelHandler(IPythonHandler):
56
53
57 SUPPORTED_METHODS = ('DELETE')
54 SUPPORTED_METHODS = ('DELETE')
58
55
59 @web.authenticated
56 @web.authenticated
60 def delete(self, kernel_id):
57 def delete(self, kernel_id):
61 km = self.kernel_manager
58 km = self.kernel_manager
62 km.shutdown_kernel(kernel_id)
59 km.shutdown_kernel(kernel_id)
63 self.set_status(204)
60 self.set_status(204)
64 self.finish()
61 self.finish()
65
62
66
63
67 class KernelActionHandler(IPythonHandler):
64 class KernelActionHandler(IPythonHandler):
68
65
69 @web.authenticated
66 @web.authenticated
70 def post(self, kernel_id, action):
67 def post(self, kernel_id, action):
71 km = self.kernel_manager
68 km = self.kernel_manager
72 if action == 'interrupt':
69 if action == 'interrupt':
73 km.interrupt_kernel(kernel_id)
70 km.interrupt_kernel(kernel_id)
74 self.set_status(204)
71 self.set_status(204)
75 if action == 'restart':
72 if action == 'restart':
76 km.restart_kernel(kernel_id)
73 km.restart_kernel(kernel_id)
77 data = {'ws_url':self.ws_url, 'kernel_id':kernel_id}
74 data = {'ws_url':self.ws_url, 'kernel_id':kernel_id}
78 self.set_header('Location', '{0}kernels/{1}'.format(self.base_kernel_url, kernel_id))
75 self.set_header('Location', '{0}kernels/{1}'.format(self.base_kernel_url, kernel_id))
79 self.write(jsonapi.dumps(data))
76 self.write(jsonapi.dumps(data))
80 self.finish()
77 self.finish()
81
78
82
79
83 class ZMQStreamHandler(websocket.WebSocketHandler):
84
85 def clear_cookie(self, *args, **kwargs):
86 """meaningless for websockets"""
87 pass
88
89 def _reserialize_reply(self, msg_list):
90 """Reserialize a reply message using JSON.
91
92 This takes the msg list from the ZMQ socket, unserializes it using
93 self.session and then serializes the result using JSON. This method
94 should be used by self._on_zmq_reply to build messages that can
95 be sent back to the browser.
96 """
97 idents, msg_list = self.session.feed_identities(msg_list)
98 msg = self.session.unserialize(msg_list)
99 try:
100 msg['header'].pop('date')
101 except KeyError:
102 pass
103 try:
104 msg['parent_header'].pop('date')
105 except KeyError:
106 pass
107 msg.pop('buffers')
108 return jsonapi.dumps(msg, default=date_default)
109
110 def _on_zmq_reply(self, msg_list):
111 # Sometimes this gets triggered when the on_close method is scheduled in the
112 # eventloop but hasn't been called.
113 if self.stream.closed(): return
114 try:
115 msg = self._reserialize_reply(msg_list)
116 except Exception:
117 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
118 else:
119 self.write_message(msg)
120
121 def allow_draft76(self):
122 """Allow draft 76, until browsers such as Safari update to RFC 6455.
123
124 This has been disabled by default in tornado in release 2.2.0, and
125 support will be removed in later versions.
126 """
127 return True
128
129
130 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
131
132 def open(self, kernel_id):
133 self.kernel_id = kernel_id.decode('ascii')
134 self.session = Session(config=self.config)
135 self.save_on_message = self.on_message
136 self.on_message = self.on_first_message
137
138 def _inject_cookie_message(self, msg):
139 """Inject the first message, which is the document cookie,
140 for authentication."""
141 if not PY3 and isinstance(msg, unicode):
142 # Cookie constructor doesn't accept unicode strings
143 # under Python 2.x for some reason
144 msg = msg.encode('utf8', 'replace')
145 try:
146 identity, msg = msg.split(':', 1)
147 self.session.session = identity.decode('ascii')
148 except Exception:
149 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
150
151 try:
152 self.request._cookies = Cookie.SimpleCookie(msg)
153 except:
154 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
155
156 def on_first_message(self, msg):
157 self._inject_cookie_message(msg)
158 if self.get_current_user() is None:
159 self.log.warn("Couldn't authenticate WebSocket connection")
160 raise web.HTTPError(403)
161 self.on_message = self.save_on_message
162
163
164 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
80 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
165
81
166 @property
82 @property
167 def max_msg_size(self):
83 def max_msg_size(self):
168 return self.settings.get('max_msg_size', 65535)
84 return self.settings.get('max_msg_size', 65535)
169
85
170 def create_stream(self):
86 def create_stream(self):
171 km = self.kernel_manager
87 km = self.kernel_manager
172 meth = getattr(km, 'connect_%s' % self.channel)
88 meth = getattr(km, 'connect_%s' % self.channel)
173 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
89 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
174
90
175 def initialize(self, *args, **kwargs):
91 def initialize(self, *args, **kwargs):
176 self.zmq_stream = None
92 self.zmq_stream = None
177
93
178 def on_first_message(self, msg):
94 def on_first_message(self, msg):
179 try:
95 try:
180 super(ZMQChannelHandler, self).on_first_message(msg)
96 super(ZMQChannelHandler, self).on_first_message(msg)
181 except web.HTTPError:
97 except web.HTTPError:
182 self.close()
98 self.close()
183 return
99 return
184 try:
100 try:
185 self.create_stream()
101 self.create_stream()
186 except web.HTTPError:
102 except web.HTTPError:
187 # WebSockets don't response to traditional error codes so we
103 # WebSockets don't response to traditional error codes so we
188 # close the connection.
104 # close the connection.
189 if not self.stream.closed():
105 if not self.stream.closed():
190 self.stream.close()
106 self.stream.close()
191 self.close()
107 self.close()
192 else:
108 else:
193 self.zmq_stream.on_recv(self._on_zmq_reply)
109 self.zmq_stream.on_recv(self._on_zmq_reply)
194
110
195 def on_message(self, msg):
111 def on_message(self, msg):
196 if len(msg) < self.max_msg_size:
112 if len(msg) < self.max_msg_size:
197 msg = jsonapi.loads(msg)
113 msg = jsonapi.loads(msg)
198 self.session.send(self.zmq_stream, msg)
114 self.session.send(self.zmq_stream, msg)
199
115
200 def on_close(self):
116 def on_close(self):
201 # This method can be called twice, once by self.kernel_died and once
117 # This method can be called twice, once by self.kernel_died and once
202 # from the WebSocket close event. If the WebSocket connection is
118 # from the WebSocket close event. If the WebSocket connection is
203 # closed before the ZMQ streams are setup, they could be None.
119 # closed before the ZMQ streams are setup, they could be None.
204 if self.zmq_stream is not None and not self.zmq_stream.closed():
120 if self.zmq_stream is not None and not self.zmq_stream.closed():
205 self.zmq_stream.on_recv(None)
121 self.zmq_stream.on_recv(None)
206 self.zmq_stream.close()
122 self.zmq_stream.close()
207
123
208
124
209 class IOPubHandler(ZMQChannelHandler):
125 class IOPubHandler(ZMQChannelHandler):
210 channel = 'iopub'
126 channel = 'iopub'
211
127
212 def create_stream(self):
128 def create_stream(self):
213 super(IOPubHandler, self).create_stream()
129 super(IOPubHandler, self).create_stream()
214 km = self.kernel_manager
130 km = self.kernel_manager
215 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
131 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
216 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
132 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
217
133
218 def on_close(self):
134 def on_close(self):
219 km = self.kernel_manager
135 km = self.kernel_manager
220 if self.kernel_id in km:
136 if self.kernel_id in km:
221 km.remove_restart_callback(
137 km.remove_restart_callback(
222 self.kernel_id, self.on_kernel_restarted,
138 self.kernel_id, self.on_kernel_restarted,
223 )
139 )
224 km.remove_restart_callback(
140 km.remove_restart_callback(
225 self.kernel_id, self.on_restart_failed, 'dead',
141 self.kernel_id, self.on_restart_failed, 'dead',
226 )
142 )
227 super(IOPubHandler, self).on_close()
143 super(IOPubHandler, self).on_close()
228
144
229 def _send_status_message(self, status):
145 def _send_status_message(self, status):
230 msg = self.session.msg("status",
146 msg = self.session.msg("status",
231 {'execution_state': status}
147 {'execution_state': status}
232 )
148 )
233 self.write_message(jsonapi.dumps(msg, default=date_default))
149 self.write_message(jsonapi.dumps(msg, default=date_default))
234
150
235 def on_kernel_restarted(self):
151 def on_kernel_restarted(self):
236 logging.warn("kernel %s restarted", self.kernel_id)
152 logging.warn("kernel %s restarted", self.kernel_id)
237 self._send_status_message('restarting')
153 self._send_status_message('restarting')
238
154
239 def on_restart_failed(self):
155 def on_restart_failed(self):
240 logging.error("kernel %s restarted failed!", self.kernel_id)
156 logging.error("kernel %s restarted failed!", self.kernel_id)
241 self._send_status_message('dead')
157 self._send_status_message('dead')
242
158
243 def on_message(self, msg):
159 def on_message(self, msg):
244 """IOPub messages make no sense"""
160 """IOPub messages make no sense"""
245 pass
161 pass
246
162
247
163
248 class ShellHandler(ZMQChannelHandler):
164 class ShellHandler(ZMQChannelHandler):
249 channel = 'shell'
165 channel = 'shell'
250
166
251
167
252 class StdinHandler(ZMQChannelHandler):
168 class StdinHandler(ZMQChannelHandler):
253 channel = 'stdin'
169 channel = 'stdin'
254
170
255
171
256 #-----------------------------------------------------------------------------
172 #-----------------------------------------------------------------------------
257 # URL to handler mappings
173 # URL to handler mappings
258 #-----------------------------------------------------------------------------
174 #-----------------------------------------------------------------------------
259
175
260
176
261 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
177 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
262 _kernel_action_regex = r"(?P<action>restart|interrupt)"
178 _kernel_action_regex = r"(?P<action>restart|interrupt)"
263
179
264 default_handlers = [
180 default_handlers = [
265 (r"/kernels", MainKernelHandler),
181 (r"/kernels", MainKernelHandler),
266 (r"/kernels/%s" % _kernel_id_regex, KernelHandler),
182 (r"/kernels/%s" % _kernel_id_regex, KernelHandler),
267 (r"/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
183 (r"/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
268 (r"/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
184 (r"/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
269 (r"/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
185 (r"/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
270 (r"/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
186 (r"/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
271 ]
187 ]
General Comments 0
You need to be logged in to leave comments. Login now