##// END OF EJS Templates
protect websocket against errant messages...
Min RK -
Show More
@@ -1,279 +1,286 b''
1 """Tornado handlers for kernels."""
1 """Tornado handlers for kernels."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import json
6 import json
7 import logging
7 import logging
8 from tornado import gen, web
8 from tornado import gen, web
9 from tornado.concurrent import Future
9 from tornado.concurrent import Future
10 from tornado.ioloop import IOLoop
10 from tornado.ioloop import IOLoop
11
11
12 from IPython.utils.jsonutil import date_default
12 from IPython.utils.jsonutil import date_default
13 from IPython.utils.py3compat import cast_unicode
13 from IPython.utils.py3compat import cast_unicode
14 from IPython.html.utils import url_path_join, url_escape
14 from IPython.html.utils import url_path_join, url_escape
15
15
16 from ...base.handlers import IPythonHandler, json_errors
16 from ...base.handlers import IPythonHandler, json_errors
17 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
17 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
18
18
19 from IPython.core.release import kernel_protocol_version
19 from IPython.core.release import kernel_protocol_version
20
20
21 class MainKernelHandler(IPythonHandler):
21 class MainKernelHandler(IPythonHandler):
22
22
23 @web.authenticated
23 @web.authenticated
24 @json_errors
24 @json_errors
25 def get(self):
25 def get(self):
26 km = self.kernel_manager
26 km = self.kernel_manager
27 self.finish(json.dumps(km.list_kernels()))
27 self.finish(json.dumps(km.list_kernels()))
28
28
29 @web.authenticated
29 @web.authenticated
30 @json_errors
30 @json_errors
31 def post(self):
31 def post(self):
32 km = self.kernel_manager
32 km = self.kernel_manager
33 model = self.get_json_body()
33 model = self.get_json_body()
34 if model is None:
34 if model is None:
35 model = {
35 model = {
36 'name': km.default_kernel_name
36 'name': km.default_kernel_name
37 }
37 }
38 else:
38 else:
39 model.setdefault('name', km.default_kernel_name)
39 model.setdefault('name', km.default_kernel_name)
40
40
41 kernel_id = km.start_kernel(kernel_name=model['name'])
41 kernel_id = km.start_kernel(kernel_name=model['name'])
42 model = km.kernel_model(kernel_id)
42 model = km.kernel_model(kernel_id)
43 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
43 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
44 self.set_header('Location', url_escape(location))
44 self.set_header('Location', url_escape(location))
45 self.set_status(201)
45 self.set_status(201)
46 self.finish(json.dumps(model))
46 self.finish(json.dumps(model))
47
47
48
48
49 class KernelHandler(IPythonHandler):
49 class KernelHandler(IPythonHandler):
50
50
51 SUPPORTED_METHODS = ('DELETE', 'GET')
51 SUPPORTED_METHODS = ('DELETE', 'GET')
52
52
53 @web.authenticated
53 @web.authenticated
54 @json_errors
54 @json_errors
55 def get(self, kernel_id):
55 def get(self, kernel_id):
56 km = self.kernel_manager
56 km = self.kernel_manager
57 km._check_kernel_id(kernel_id)
57 km._check_kernel_id(kernel_id)
58 model = km.kernel_model(kernel_id)
58 model = km.kernel_model(kernel_id)
59 self.finish(json.dumps(model))
59 self.finish(json.dumps(model))
60
60
61 @web.authenticated
61 @web.authenticated
62 @json_errors
62 @json_errors
63 def delete(self, kernel_id):
63 def delete(self, kernel_id):
64 km = self.kernel_manager
64 km = self.kernel_manager
65 km.shutdown_kernel(kernel_id)
65 km.shutdown_kernel(kernel_id)
66 self.set_status(204)
66 self.set_status(204)
67 self.finish()
67 self.finish()
68
68
69
69
70 class KernelActionHandler(IPythonHandler):
70 class KernelActionHandler(IPythonHandler):
71
71
72 @web.authenticated
72 @web.authenticated
73 @json_errors
73 @json_errors
74 def post(self, kernel_id, action):
74 def post(self, kernel_id, action):
75 km = self.kernel_manager
75 km = self.kernel_manager
76 if action == 'interrupt':
76 if action == 'interrupt':
77 km.interrupt_kernel(kernel_id)
77 km.interrupt_kernel(kernel_id)
78 self.set_status(204)
78 self.set_status(204)
79 if action == 'restart':
79 if action == 'restart':
80 km.restart_kernel(kernel_id)
80 km.restart_kernel(kernel_id)
81 model = km.kernel_model(kernel_id)
81 model = km.kernel_model(kernel_id)
82 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
82 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
83 self.write(json.dumps(model))
83 self.write(json.dumps(model))
84 self.finish()
84 self.finish()
85
85
86
86
87 class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
87 class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
88
88
89 @property
89 @property
90 def kernel_info_timeout(self):
90 def kernel_info_timeout(self):
91 return self.settings.get('kernel_info_timeout', 10)
91 return self.settings.get('kernel_info_timeout', 10)
92
92
93 def __repr__(self):
93 def __repr__(self):
94 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
94 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
95
95
96 def create_stream(self):
96 def create_stream(self):
97 km = self.kernel_manager
97 km = self.kernel_manager
98 identity = self.session.bsession
98 identity = self.session.bsession
99 for channel in ('shell', 'iopub', 'stdin'):
99 for channel in ('shell', 'iopub', 'stdin'):
100 meth = getattr(km, 'connect_' + channel)
100 meth = getattr(km, 'connect_' + channel)
101 self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
101 self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
102 stream.channel = channel
102 stream.channel = channel
103 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
103 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
104 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
104 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
105
105
106 def request_kernel_info(self):
106 def request_kernel_info(self):
107 """send a request for kernel_info"""
107 """send a request for kernel_info"""
108 km = self.kernel_manager
108 km = self.kernel_manager
109 kernel = km.get_kernel(self.kernel_id)
109 kernel = km.get_kernel(self.kernel_id)
110 try:
110 try:
111 # check for previous request
111 # check for previous request
112 future = kernel._kernel_info_future
112 future = kernel._kernel_info_future
113 except AttributeError:
113 except AttributeError:
114 self.log.debug("Requesting kernel info from %s", self.kernel_id)
114 self.log.debug("Requesting kernel info from %s", self.kernel_id)
115 # Create a kernel_info channel to query the kernel protocol version.
115 # Create a kernel_info channel to query the kernel protocol version.
116 # This channel will be closed after the kernel_info reply is received.
116 # This channel will be closed after the kernel_info reply is received.
117 if self.kernel_info_channel is None:
117 if self.kernel_info_channel is None:
118 self.kernel_info_channel = km.connect_shell(self.kernel_id)
118 self.kernel_info_channel = km.connect_shell(self.kernel_id)
119 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
119 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
120 self.session.send(self.kernel_info_channel, "kernel_info_request")
120 self.session.send(self.kernel_info_channel, "kernel_info_request")
121 # store the future on the kernel, so only one request is sent
121 # store the future on the kernel, so only one request is sent
122 kernel._kernel_info_future = self._kernel_info_future
122 kernel._kernel_info_future = self._kernel_info_future
123 else:
123 else:
124 if not future.done():
124 if not future.done():
125 self.log.debug("Waiting for pending kernel_info request")
125 self.log.debug("Waiting for pending kernel_info request")
126 future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
126 future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
127 return self._kernel_info_future
127 return self._kernel_info_future
128
128
129 def _handle_kernel_info_reply(self, msg):
129 def _handle_kernel_info_reply(self, msg):
130 """process the kernel_info_reply
130 """process the kernel_info_reply
131
131
132 enabling msg spec adaptation, if necessary
132 enabling msg spec adaptation, if necessary
133 """
133 """
134 idents,msg = self.session.feed_identities(msg)
134 idents,msg = self.session.feed_identities(msg)
135 try:
135 try:
136 msg = self.session.deserialize(msg)
136 msg = self.session.deserialize(msg)
137 except:
137 except:
138 self.log.error("Bad kernel_info reply", exc_info=True)
138 self.log.error("Bad kernel_info reply", exc_info=True)
139 self._kernel_info_future.set_result({})
139 self._kernel_info_future.set_result({})
140 return
140 return
141 else:
141 else:
142 info = msg['content']
142 info = msg['content']
143 self.log.debug("Received kernel info: %s", info)
143 self.log.debug("Received kernel info: %s", info)
144 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
144 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
145 self.log.error("Kernel info request failed, assuming current %s", info)
145 self.log.error("Kernel info request failed, assuming current %s", info)
146 info = {}
146 info = {}
147 self._finish_kernel_info(info)
147 self._finish_kernel_info(info)
148
148
149 # close the kernel_info channel, we don't need it anymore
149 # close the kernel_info channel, we don't need it anymore
150 if self.kernel_info_channel:
150 if self.kernel_info_channel:
151 self.kernel_info_channel.close()
151 self.kernel_info_channel.close()
152 self.kernel_info_channel = None
152 self.kernel_info_channel = None
153
153
154 def _finish_kernel_info(self, info):
154 def _finish_kernel_info(self, info):
155 """Finish handling kernel_info reply
155 """Finish handling kernel_info reply
156
156
157 Set up protocol adaptation, if needed,
157 Set up protocol adaptation, if needed,
158 and signal that connection can continue.
158 and signal that connection can continue.
159 """
159 """
160 protocol_version = info.get('protocol_version', kernel_protocol_version)
160 protocol_version = info.get('protocol_version', kernel_protocol_version)
161 if protocol_version != kernel_protocol_version:
161 if protocol_version != kernel_protocol_version:
162 self.session.adapt_version = int(protocol_version.split('.')[0])
162 self.session.adapt_version = int(protocol_version.split('.')[0])
163 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
163 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
164 if not self._kernel_info_future.done():
164 if not self._kernel_info_future.done():
165 self._kernel_info_future.set_result(info)
165 self._kernel_info_future.set_result(info)
166
166
167 def initialize(self):
167 def initialize(self):
168 super(ZMQChannelsHandler, self).initialize()
168 super(ZMQChannelsHandler, self).initialize()
169 self.zmq_stream = None
169 self.zmq_stream = None
170 self.channels = {}
170 self.channels = {}
171 self.kernel_id = None
171 self.kernel_id = None
172 self.kernel_info_channel = None
172 self.kernel_info_channel = None
173 self._kernel_info_future = Future()
173 self._kernel_info_future = Future()
174
174
175 @gen.coroutine
175 @gen.coroutine
176 def pre_get(self):
176 def pre_get(self):
177 # authenticate first
177 # authenticate first
178 super(ZMQChannelsHandler, self).pre_get()
178 super(ZMQChannelsHandler, self).pre_get()
179 # then request kernel info, waiting up to a certain time before giving up.
179 # then request kernel info, waiting up to a certain time before giving up.
180 # We don't want to wait forever, because browsers don't take it well when
180 # We don't want to wait forever, because browsers don't take it well when
181 # servers never respond to websocket connection requests.
181 # servers never respond to websocket connection requests.
182 future = self.request_kernel_info()
182 future = self.request_kernel_info()
183
183
184 def give_up():
184 def give_up():
185 """Don't wait forever for the kernel to reply"""
185 """Don't wait forever for the kernel to reply"""
186 if future.done():
186 if future.done():
187 return
187 return
188 self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
188 self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
189 future.set_result({})
189 future.set_result({})
190 loop = IOLoop.current()
190 loop = IOLoop.current()
191 loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
191 loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
192 # actually wait for it
192 # actually wait for it
193 yield future
193 yield future
194
194
195 @gen.coroutine
195 @gen.coroutine
196 def get(self, kernel_id):
196 def get(self, kernel_id):
197 self.kernel_id = cast_unicode(kernel_id, 'ascii')
197 self.kernel_id = cast_unicode(kernel_id, 'ascii')
198 yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
198 yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
199
199
200 def open(self, kernel_id):
200 def open(self, kernel_id):
201 super(ZMQChannelsHandler, self).open()
201 super(ZMQChannelsHandler, self).open()
202 try:
202 try:
203 self.create_stream()
203 self.create_stream()
204 except web.HTTPError as e:
204 except web.HTTPError as e:
205 self.log.error("Error opening stream: %s", e)
205 self.log.error("Error opening stream: %s", e)
206 # WebSockets don't response to traditional error codes so we
206 # WebSockets don't response to traditional error codes so we
207 # close the connection.
207 # close the connection.
208 for channel, stream in self.channels.items():
208 for channel, stream in self.channels.items():
209 if not stream.closed():
209 if not stream.closed():
210 stream.close()
210 stream.close()
211 self.close()
211 self.close()
212 else:
212 else:
213 for channel, stream in self.channels.items():
213 for channel, stream in self.channels.items():
214 stream.on_recv_stream(self._on_zmq_reply)
214 stream.on_recv_stream(self._on_zmq_reply)
215
215
216 def on_message(self, msg):
216 def on_message(self, msg):
217 if not self.channels:
218 # already closed, ignore the message
219 self.log.debug("Received message on closed websocket %r", msg)
220 return
217 if isinstance(msg, bytes):
221 if isinstance(msg, bytes):
218 msg = deserialize_binary_message(msg)
222 msg = deserialize_binary_message(msg)
219 else:
223 else:
220 msg = json.loads(msg)
224 msg = json.loads(msg)
221 channel = msg.pop('channel', None)
225 channel = msg.pop('channel', None)
222 if channel is None:
226 if channel is None:
223 self.log.warn("No channel specified, assuming shell: %s", msg)
227 self.log.warn("No channel specified, assuming shell: %s", msg)
224 channel = 'shell'
228 channel = 'shell'
229 if channel not in self.channels:
230 self.log.warn("No such channel: %r", channel)
231 return
225 stream = self.channels[channel]
232 stream = self.channels[channel]
226 self.session.send(stream, msg)
233 self.session.send(stream, msg)
227
234
228 def on_close(self):
235 def on_close(self):
229 km = self.kernel_manager
236 km = self.kernel_manager
230 if self.kernel_id in km:
237 if self.kernel_id in km:
231 km.remove_restart_callback(
238 km.remove_restart_callback(
232 self.kernel_id, self.on_kernel_restarted,
239 self.kernel_id, self.on_kernel_restarted,
233 )
240 )
234 km.remove_restart_callback(
241 km.remove_restart_callback(
235 self.kernel_id, self.on_restart_failed, 'dead',
242 self.kernel_id, self.on_restart_failed, 'dead',
236 )
243 )
237 # This method can be called twice, once by self.kernel_died and once
244 # This method can be called twice, once by self.kernel_died and once
238 # from the WebSocket close event. If the WebSocket connection is
245 # from the WebSocket close event. If the WebSocket connection is
239 # closed before the ZMQ streams are setup, they could be None.
246 # closed before the ZMQ streams are setup, they could be None.
240 for channel, stream in self.channels.items():
247 for channel, stream in self.channels.items():
241 if stream is not None and not stream.closed():
248 if stream is not None and not stream.closed():
242 stream.on_recv(None)
249 stream.on_recv(None)
243 # close the socket directly, don't wait for the stream
250 # close the socket directly, don't wait for the stream
244 socket = stream.socket
251 socket = stream.socket
245 stream.close()
252 stream.close()
246 socket.close()
253 socket.close()
247
254
248 self.channels = {}
255 self.channels = {}
249
256
250 def _send_status_message(self, status):
257 def _send_status_message(self, status):
251 msg = self.session.msg("status",
258 msg = self.session.msg("status",
252 {'execution_state': status}
259 {'execution_state': status}
253 )
260 )
254 msg['channel'] = 'iopub'
261 msg['channel'] = 'iopub'
255 self.write_message(json.dumps(msg, default=date_default))
262 self.write_message(json.dumps(msg, default=date_default))
256
263
257 def on_kernel_restarted(self):
264 def on_kernel_restarted(self):
258 logging.warn("kernel %s restarted", self.kernel_id)
265 logging.warn("kernel %s restarted", self.kernel_id)
259 self._send_status_message('restarting')
266 self._send_status_message('restarting')
260
267
261 def on_restart_failed(self):
268 def on_restart_failed(self):
262 logging.error("kernel %s restarted failed!", self.kernel_id)
269 logging.error("kernel %s restarted failed!", self.kernel_id)
263 self._send_status_message('dead')
270 self._send_status_message('dead')
264
271
265
272
266 #-----------------------------------------------------------------------------
273 #-----------------------------------------------------------------------------
267 # URL to handler mappings
274 # URL to handler mappings
268 #-----------------------------------------------------------------------------
275 #-----------------------------------------------------------------------------
269
276
270
277
271 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
278 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
272 _kernel_action_regex = r"(?P<action>restart|interrupt)"
279 _kernel_action_regex = r"(?P<action>restart|interrupt)"
273
280
274 default_handlers = [
281 default_handlers = [
275 (r"/api/kernels", MainKernelHandler),
282 (r"/api/kernels", MainKernelHandler),
276 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
283 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
277 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
284 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
278 (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
285 (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
279 ]
286 ]
General Comments 0
You need to be logged in to leave comments. Login now