##// END OF EJS Templates
protect websocket against errant messages...
Min RK -
Show More
@@ -1,279 +1,286 b''
1 1 """Tornado handlers for kernels."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import json
7 7 import logging
8 8 from tornado import gen, web
9 9 from tornado.concurrent import Future
10 10 from tornado.ioloop import IOLoop
11 11
12 12 from IPython.utils.jsonutil import date_default
13 13 from IPython.utils.py3compat import cast_unicode
14 14 from IPython.html.utils import url_path_join, url_escape
15 15
16 16 from ...base.handlers import IPythonHandler, json_errors
17 17 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
18 18
19 19 from IPython.core.release import kernel_protocol_version
20 20
21 21 class MainKernelHandler(IPythonHandler):
22 22
23 23 @web.authenticated
24 24 @json_errors
25 25 def get(self):
26 26 km = self.kernel_manager
27 27 self.finish(json.dumps(km.list_kernels()))
28 28
29 29 @web.authenticated
30 30 @json_errors
31 31 def post(self):
32 32 km = self.kernel_manager
33 33 model = self.get_json_body()
34 34 if model is None:
35 35 model = {
36 36 'name': km.default_kernel_name
37 37 }
38 38 else:
39 39 model.setdefault('name', km.default_kernel_name)
40 40
41 41 kernel_id = km.start_kernel(kernel_name=model['name'])
42 42 model = km.kernel_model(kernel_id)
43 43 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
44 44 self.set_header('Location', url_escape(location))
45 45 self.set_status(201)
46 46 self.finish(json.dumps(model))
47 47
48 48
49 49 class KernelHandler(IPythonHandler):
50 50
51 51 SUPPORTED_METHODS = ('DELETE', 'GET')
52 52
53 53 @web.authenticated
54 54 @json_errors
55 55 def get(self, kernel_id):
56 56 km = self.kernel_manager
57 57 km._check_kernel_id(kernel_id)
58 58 model = km.kernel_model(kernel_id)
59 59 self.finish(json.dumps(model))
60 60
61 61 @web.authenticated
62 62 @json_errors
63 63 def delete(self, kernel_id):
64 64 km = self.kernel_manager
65 65 km.shutdown_kernel(kernel_id)
66 66 self.set_status(204)
67 67 self.finish()
68 68
69 69
70 70 class KernelActionHandler(IPythonHandler):
71 71
72 72 @web.authenticated
73 73 @json_errors
74 74 def post(self, kernel_id, action):
75 75 km = self.kernel_manager
76 76 if action == 'interrupt':
77 77 km.interrupt_kernel(kernel_id)
78 78 self.set_status(204)
79 79 if action == 'restart':
80 80 km.restart_kernel(kernel_id)
81 81 model = km.kernel_model(kernel_id)
82 82 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
83 83 self.write(json.dumps(model))
84 84 self.finish()
85 85
86 86
87 87 class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
88 88
89 89 @property
90 90 def kernel_info_timeout(self):
91 91 return self.settings.get('kernel_info_timeout', 10)
92 92
93 93 def __repr__(self):
94 94 return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
95 95
96 96 def create_stream(self):
97 97 km = self.kernel_manager
98 98 identity = self.session.bsession
99 99 for channel in ('shell', 'iopub', 'stdin'):
100 100 meth = getattr(km, 'connect_' + channel)
101 101 self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
102 102 stream.channel = channel
103 103 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
104 104 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
105 105
106 106 def request_kernel_info(self):
107 107 """send a request for kernel_info"""
108 108 km = self.kernel_manager
109 109 kernel = km.get_kernel(self.kernel_id)
110 110 try:
111 111 # check for previous request
112 112 future = kernel._kernel_info_future
113 113 except AttributeError:
114 114 self.log.debug("Requesting kernel info from %s", self.kernel_id)
115 115 # Create a kernel_info channel to query the kernel protocol version.
116 116 # This channel will be closed after the kernel_info reply is received.
117 117 if self.kernel_info_channel is None:
118 118 self.kernel_info_channel = km.connect_shell(self.kernel_id)
119 119 self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
120 120 self.session.send(self.kernel_info_channel, "kernel_info_request")
121 121 # store the future on the kernel, so only one request is sent
122 122 kernel._kernel_info_future = self._kernel_info_future
123 123 else:
124 124 if not future.done():
125 125 self.log.debug("Waiting for pending kernel_info request")
126 126 future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
127 127 return self._kernel_info_future
128 128
129 129 def _handle_kernel_info_reply(self, msg):
130 130 """process the kernel_info_reply
131 131
132 132 enabling msg spec adaptation, if necessary
133 133 """
134 134 idents,msg = self.session.feed_identities(msg)
135 135 try:
136 136 msg = self.session.deserialize(msg)
137 137 except:
138 138 self.log.error("Bad kernel_info reply", exc_info=True)
139 139 self._kernel_info_future.set_result({})
140 140 return
141 141 else:
142 142 info = msg['content']
143 143 self.log.debug("Received kernel info: %s", info)
144 144 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
145 145 self.log.error("Kernel info request failed, assuming current %s", info)
146 146 info = {}
147 147 self._finish_kernel_info(info)
148 148
149 149 # close the kernel_info channel, we don't need it anymore
150 150 if self.kernel_info_channel:
151 151 self.kernel_info_channel.close()
152 152 self.kernel_info_channel = None
153 153
154 154 def _finish_kernel_info(self, info):
155 155 """Finish handling kernel_info reply
156 156
157 157 Set up protocol adaptation, if needed,
158 158 and signal that connection can continue.
159 159 """
160 160 protocol_version = info.get('protocol_version', kernel_protocol_version)
161 161 if protocol_version != kernel_protocol_version:
162 162 self.session.adapt_version = int(protocol_version.split('.')[0])
163 163 self.log.info("Kernel %s speaks protocol %s", self.kernel_id, protocol_version)
164 164 if not self._kernel_info_future.done():
165 165 self._kernel_info_future.set_result(info)
166 166
167 167 def initialize(self):
168 168 super(ZMQChannelsHandler, self).initialize()
169 169 self.zmq_stream = None
170 170 self.channels = {}
171 171 self.kernel_id = None
172 172 self.kernel_info_channel = None
173 173 self._kernel_info_future = Future()
174 174
175 175 @gen.coroutine
176 176 def pre_get(self):
177 177 # authenticate first
178 178 super(ZMQChannelsHandler, self).pre_get()
179 179 # then request kernel info, waiting up to a certain time before giving up.
180 180 # We don't want to wait forever, because browsers don't take it well when
181 181 # servers never respond to websocket connection requests.
182 182 future = self.request_kernel_info()
183 183
184 184 def give_up():
185 185 """Don't wait forever for the kernel to reply"""
186 186 if future.done():
187 187 return
188 188 self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
189 189 future.set_result({})
190 190 loop = IOLoop.current()
191 191 loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
192 192 # actually wait for it
193 193 yield future
194 194
195 195 @gen.coroutine
196 196 def get(self, kernel_id):
197 197 self.kernel_id = cast_unicode(kernel_id, 'ascii')
198 198 yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
199 199
200 200 def open(self, kernel_id):
201 201 super(ZMQChannelsHandler, self).open()
202 202 try:
203 203 self.create_stream()
204 204 except web.HTTPError as e:
205 205 self.log.error("Error opening stream: %s", e)
206 206 # WebSockets don't response to traditional error codes so we
207 207 # close the connection.
208 208 for channel, stream in self.channels.items():
209 209 if not stream.closed():
210 210 stream.close()
211 211 self.close()
212 212 else:
213 213 for channel, stream in self.channels.items():
214 214 stream.on_recv_stream(self._on_zmq_reply)
215 215
216 216 def on_message(self, msg):
217 if not self.channels:
218 # already closed, ignore the message
219 self.log.debug("Received message on closed websocket %r", msg)
220 return
217 221 if isinstance(msg, bytes):
218 222 msg = deserialize_binary_message(msg)
219 223 else:
220 224 msg = json.loads(msg)
221 225 channel = msg.pop('channel', None)
222 226 if channel is None:
223 227 self.log.warn("No channel specified, assuming shell: %s", msg)
224 228 channel = 'shell'
229 if channel not in self.channels:
230 self.log.warn("No such channel: %r", channel)
231 return
225 232 stream = self.channels[channel]
226 233 self.session.send(stream, msg)
227 234
228 235 def on_close(self):
229 236 km = self.kernel_manager
230 237 if self.kernel_id in km:
231 238 km.remove_restart_callback(
232 239 self.kernel_id, self.on_kernel_restarted,
233 240 )
234 241 km.remove_restart_callback(
235 242 self.kernel_id, self.on_restart_failed, 'dead',
236 243 )
237 244 # This method can be called twice, once by self.kernel_died and once
238 245 # from the WebSocket close event. If the WebSocket connection is
239 246 # closed before the ZMQ streams are setup, they could be None.
240 247 for channel, stream in self.channels.items():
241 248 if stream is not None and not stream.closed():
242 249 stream.on_recv(None)
243 250 # close the socket directly, don't wait for the stream
244 251 socket = stream.socket
245 252 stream.close()
246 253 socket.close()
247 254
248 255 self.channels = {}
249 256
250 257 def _send_status_message(self, status):
251 258 msg = self.session.msg("status",
252 259 {'execution_state': status}
253 260 )
254 261 msg['channel'] = 'iopub'
255 262 self.write_message(json.dumps(msg, default=date_default))
256 263
257 264 def on_kernel_restarted(self):
258 265 logging.warn("kernel %s restarted", self.kernel_id)
259 266 self._send_status_message('restarting')
260 267
261 268 def on_restart_failed(self):
262 269 logging.error("kernel %s restarted failed!", self.kernel_id)
263 270 self._send_status_message('dead')
264 271
265 272
266 273 #-----------------------------------------------------------------------------
267 274 # URL to handler mappings
268 275 #-----------------------------------------------------------------------------
269 276
270 277
271 278 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
272 279 _kernel_action_regex = r"(?P<action>restart|interrupt)"
273 280
274 281 default_handlers = [
275 282 (r"/api/kernels", MainKernelHandler),
276 283 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
277 284 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
278 285 (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
279 286 ]
General Comments 0
You need to be logged in to leave comments. Login now