Show More
@@ -0,0 +1,3 b'' | |||
|
1 | * The notebook now uses a single websocket at `/kernels/<kernel-id>/channels` instead of separate | |
|
2 | `/kernels/<kernel-id>/{shell|iopub|stdin}` channels. Messages on each channel are identified by a | |
|
3 | `channel` key in the message dict, for both send and recv. |
@@ -138,7 +138,7 b' class ZMQStreamHandler(WebSocketHandler):' | |||
|
138 | 138 | """meaningless for websockets""" |
|
139 | 139 | pass |
|
140 | 140 | |
|
141 | def _reserialize_reply(self, msg_list): | |
|
141 | def _reserialize_reply(self, msg_list, channel=None): | |
|
142 | 142 | """Reserialize a reply message using JSON. |
|
143 | 143 | |
|
144 | 144 | This takes the msg list from the ZMQ socket, deserializes it using |
@@ -148,6 +148,8 b' class ZMQStreamHandler(WebSocketHandler):' | |||
|
148 | 148 | """ |
|
149 | 149 | idents, msg_list = self.session.feed_identities(msg_list) |
|
150 | 150 | msg = self.session.deserialize(msg_list) |
|
151 | if channel: | |
|
152 | msg['channel'] = channel | |
|
151 | 153 | if msg['buffers']: |
|
152 | 154 | buf = serialize_binary_message(msg) |
|
153 | 155 | return buf |
@@ -155,12 +157,13 b' class ZMQStreamHandler(WebSocketHandler):' | |||
|
155 | 157 | smsg = json.dumps(msg, default=date_default) |
|
156 | 158 | return cast_unicode(smsg) |
|
157 | 159 | |
|
158 | def _on_zmq_reply(self, msg_list): | |
|
160 | def _on_zmq_reply(self, stream, msg_list): | |
|
159 | 161 | # Sometimes this gets triggered when the on_close method is scheduled in the |
|
160 | 162 | # eventloop but hasn't been called. |
|
161 |
if |
|
|
163 | if stream.closed(): return | |
|
164 | channel = getattr(stream, 'channel', None) | |
|
162 | 165 | try: |
|
163 | msg = self._reserialize_reply(msg_list) | |
|
166 | msg = self._reserialize_reply(msg_list, channel=channel) | |
|
164 | 167 | except Exception: |
|
165 | 168 | self.log.critical("Malformed message: %r" % msg_list, exc_info=True) |
|
166 | 169 | else: |
@@ -84,7 +84,7 b' class KernelActionHandler(IPythonHandler):' | |||
|
84 | 84 | self.finish() |
|
85 | 85 | |
|
86 | 86 | |
|
87 | class ZMQChannelHandler(AuthenticatedZMQStreamHandler): | |
|
87 | class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): | |
|
88 | 88 | |
|
89 | 89 | @property |
|
90 | 90 | def kernel_info_timeout(self): |
@@ -95,8 +95,13 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):' | |||
|
95 | 95 | |
|
96 | 96 | def create_stream(self): |
|
97 | 97 | km = self.kernel_manager |
|
98 | meth = getattr(km, 'connect_%s' % self.channel) | |
|
99 | self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession) | |
|
98 | identity = self.session.bsession | |
|
99 | for channel in ('shell', 'iopub', 'stdin'): | |
|
100 | meth = getattr(km, 'connect_' + channel) | |
|
101 | self.channels[channel] = stream = meth(self.kernel_id, identity=identity) | |
|
102 | stream.channel = channel | |
|
103 | km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) | |
|
104 | km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') | |
|
100 | 105 | |
|
101 | 106 | def request_kernel_info(self): |
|
102 | 107 | """send a request for kernel_info""" |
@@ -160,8 +165,9 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):' | |||
|
160 | 165 | self._kernel_info_future.set_result(info) |
|
161 | 166 | |
|
162 | 167 | def initialize(self): |
|
163 | super(ZMQChannelHandler, self).initialize() | |
|
168 | super(ZMQChannelsHandler, self).initialize() | |
|
164 | 169 | self.zmq_stream = None |
|
170 | self.channels = {} | |
|
165 | 171 | self.kernel_id = None |
|
166 | 172 | self.kernel_info_channel = None |
|
167 | 173 | self._kernel_info_future = Future() |
@@ -169,7 +175,7 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):' | |||
|
169 | 175 | @gen.coroutine |
|
170 | 176 | def pre_get(self): |
|
171 | 177 | # authenticate first |
|
172 | super(ZMQChannelHandler, self).pre_get() | |
|
178 | super(ZMQChannelsHandler, self).pre_get() | |
|
173 | 179 | # then request kernel info, waiting up to a certain time before giving up. |
|
174 | 180 | # We don't want to wait forever, because browsers don't take it well when |
|
175 | 181 | # servers never respond to websocket connection requests. |
@@ -189,57 +195,37 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):' | |||
|
189 | 195 | @gen.coroutine |
|
190 | 196 | def get(self, kernel_id): |
|
191 | 197 | self.kernel_id = cast_unicode(kernel_id, 'ascii') |
|
192 | yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id) | |
|
198 | yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id) | |
|
193 | 199 | |
|
194 | 200 | def open(self, kernel_id): |
|
195 | super(ZMQChannelHandler, self).open() | |
|
201 | super(ZMQChannelsHandler, self).open() | |
|
196 | 202 | try: |
|
197 | 203 | self.create_stream() |
|
198 | 204 | except web.HTTPError as e: |
|
199 | 205 | self.log.error("Error opening stream: %s", e) |
|
200 | 206 | # WebSockets don't response to traditional error codes so we |
|
201 | 207 | # close the connection. |
|
202 | if not self.stream.closed(): | |
|
203 |
|
|
|
208 | for channel, stream in self.channels.items(): | |
|
209 | if not stream.closed(): | |
|
210 | stream.close() | |
|
204 | 211 | self.close() |
|
205 | 212 | else: |
|
206 | self.zmq_stream.on_recv(self._on_zmq_reply) | |
|
213 | for channel, stream in self.channels.items(): | |
|
214 | stream.on_recv_stream(self._on_zmq_reply) | |
|
207 | 215 | |
|
208 | 216 | def on_message(self, msg): |
|
209 | if self.zmq_stream is None: | |
|
210 | return | |
|
211 | elif self.zmq_stream.closed(): | |
|
212 | self.log.info("%s closed, closing websocket.", self) | |
|
213 | self.close() | |
|
214 | return | |
|
215 | 217 | if isinstance(msg, bytes): |
|
216 | 218 | msg = deserialize_binary_message(msg) |
|
217 | 219 | else: |
|
218 | 220 | msg = json.loads(msg) |
|
219 | self.session.send(self.zmq_stream, msg) | |
|
221 | channel = msg.pop('channel', None) | |
|
222 | if channel is None: | |
|
223 | self.log.warn("No channel specified, assuming shell: %s", msg) | |
|
224 | channel = 'shell' | |
|
225 | stream = self.channels[channel] | |
|
226 | self.session.send(stream, msg) | |
|
220 | 227 | |
|
221 | 228 | def on_close(self): |
|
222 | # This method can be called twice, once by self.kernel_died and once | |
|
223 | # from the WebSocket close event. If the WebSocket connection is | |
|
224 | # closed before the ZMQ streams are setup, they could be None. | |
|
225 | if self.zmq_stream is not None and not self.zmq_stream.closed(): | |
|
226 | self.zmq_stream.on_recv(None) | |
|
227 | # close the socket directly, don't wait for the stream | |
|
228 | socket = self.zmq_stream.socket | |
|
229 | self.zmq_stream.close() | |
|
230 | socket.close() | |
|
231 | ||
|
232 | ||
|
233 | class IOPubHandler(ZMQChannelHandler): | |
|
234 | channel = 'iopub' | |
|
235 | ||
|
236 | def create_stream(self): | |
|
237 | super(IOPubHandler, self).create_stream() | |
|
238 | km = self.kernel_manager | |
|
239 | km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) | |
|
240 | km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') | |
|
241 | ||
|
242 | def on_close(self): | |
|
243 | 229 | km = self.kernel_manager |
|
244 | 230 | if self.kernel_id in km: |
|
245 | 231 | km.remove_restart_callback( |
@@ -248,12 +234,24 b' class IOPubHandler(ZMQChannelHandler):' | |||
|
248 | 234 | km.remove_restart_callback( |
|
249 | 235 | self.kernel_id, self.on_restart_failed, 'dead', |
|
250 | 236 | ) |
|
251 | super(IOPubHandler, self).on_close() | |
|
252 | ||
|
237 | # This method can be called twice, once by self.kernel_died and once | |
|
238 | # from the WebSocket close event. If the WebSocket connection is | |
|
239 | # closed before the ZMQ streams are setup, they could be None. | |
|
240 | for channel, stream in self.channels.items(): | |
|
241 | if stream is not None and not stream.closed(): | |
|
242 | stream.on_recv(None) | |
|
243 | # close the socket directly, don't wait for the stream | |
|
244 | socket = stream.socket | |
|
245 | stream.close() | |
|
246 | socket.close() | |
|
247 | ||
|
248 | self.channels = {} | |
|
249 | ||
|
253 | 250 | def _send_status_message(self, status): |
|
254 | 251 | msg = self.session.msg("status", |
|
255 | 252 | {'execution_state': status} |
|
256 | 253 | ) |
|
254 | msg['channel'] = 'iopub' | |
|
257 | 255 | self.write_message(json.dumps(msg, default=date_default)) |
|
258 | 256 | |
|
259 | 257 | def on_kernel_restarted(self): |
@@ -263,18 +261,6 b' class IOPubHandler(ZMQChannelHandler):' | |||
|
263 | 261 | def on_restart_failed(self): |
|
264 | 262 | logging.error("kernel %s restarted failed!", self.kernel_id) |
|
265 | 263 | self._send_status_message('dead') |
|
266 | ||
|
267 | def on_message(self, msg): | |
|
268 | """IOPub messages make no sense""" | |
|
269 | pass | |
|
270 | ||
|
271 | ||
|
272 | class ShellHandler(ZMQChannelHandler): | |
|
273 | channel = 'shell' | |
|
274 | ||
|
275 | ||
|
276 | class StdinHandler(ZMQChannelHandler): | |
|
277 | channel = 'stdin' | |
|
278 | 264 | |
|
279 | 265 | |
|
280 | 266 | #----------------------------------------------------------------------------- |
@@ -289,7 +275,5 b' default_handlers = [' | |||
|
289 | 275 | (r"/api/kernels", MainKernelHandler), |
|
290 | 276 | (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler), |
|
291 | 277 | (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler), |
|
292 |
(r"/api/kernels/%s/ |
|
|
293 | (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler), | |
|
294 | (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler) | |
|
278 | (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler), | |
|
295 | 279 | ] |
@@ -28,12 +28,7 b' define([' | |||
|
28 | 28 | |
|
29 | 29 | this.id = null; |
|
30 | 30 | this.name = name; |
|
31 | ||
|
32 | this.channels = { | |
|
33 | 'shell': null, | |
|
34 | 'iopub': null, | |
|
35 | 'stdin': null | |
|
36 | }; | |
|
31 | this.ws = null; | |
|
37 | 32 | |
|
38 | 33 | this.kernel_service_url = kernel_service_url; |
|
39 | 34 | this.kernel_url = null; |
@@ -429,7 +424,7 b' define([' | |||
|
429 | 424 | |
|
430 | 425 | Kernel.prototype.start_channels = function () { |
|
431 | 426 | /** |
|
432 |
* Start the |
|
|
427 | * Start the websocket channels. | |
|
433 | 428 | * Will stop and restart them if they already exist. |
|
434 | 429 | * |
|
435 | 430 | * @function start_channels |
@@ -440,16 +435,12 b' define([' | |||
|
440 | 435 | |
|
441 | 436 | console.log("Starting WebSockets:", ws_host_url); |
|
442 | 437 | |
|
443 | var channel_url = function(channel) { | |
|
444 | return [ | |
|
438 | this.ws = new this.WebSocket([ | |
|
445 | 439 | that.ws_url, |
|
446 | utils.url_join_encode(that.kernel_url, channel), | |
|
440 | utils.url_join_encode(that.kernel_url, 'channels'), | |
|
447 | 441 | "?session_id=" + that.session_id |
|
448 |
].join('') |
|
|
449 |
|
|
|
450 | this.channels.shell = new this.WebSocket(channel_url("shell")); | |
|
451 | this.channels.stdin = new this.WebSocket(channel_url("stdin")); | |
|
452 | this.channels.iopub = new this.WebSocket(channel_url("iopub")); | |
|
442 | ].join('') | |
|
443 | ); | |
|
453 | 444 | |
|
454 | 445 | var already_called_onclose = false; // only alert once |
|
455 | 446 | var ws_closed_early = function(evt){ |
@@ -489,28 +480,22 b' define([' | |||
|
489 | 480 | that._ws_closed(ws_host_url, true); |
|
490 | 481 | }; |
|
491 | 482 | |
|
492 | for (var c in this.channels) { | |
|
493 | this.channels[c].onopen = $.proxy(this._ws_opened, this); | |
|
494 |
|
|
|
495 | this.channels[c].onerror = ws_error; | |
|
496 | } | |
|
483 | this.ws.onopen = $.proxy(this._ws_opened, this); | |
|
484 | this.ws.onclose = ws_closed_early; | |
|
485 | this.ws.onerror = ws_error; | |
|
497 | 486 | // switch from early-close to late-close message after 1s |
|
498 | 487 | setTimeout(function() { |
|
499 |
f |
|
|
500 | if (that.channels[c] !== null) { | |
|
501 | that.channels[c].onclose = ws_closed_late; | |
|
502 | } | |
|
488 | if (that.ws !== null) { | |
|
489 | that.ws.onclose = ws_closed_late; | |
|
503 | 490 | } |
|
504 | 491 | }, 1000); |
|
505 |
this. |
|
|
506 | this.channels.iopub.onmessage = $.proxy(this._handle_iopub_message, this); | |
|
507 | this.channels.stdin.onmessage = $.proxy(this._handle_input_request, this); | |
|
492 | this.ws.onmessage = $.proxy(this._handle_ws_message, this); | |
|
508 | 493 | }; |
|
509 | 494 | |
|
510 | 495 | Kernel.prototype._ws_opened = function (evt) { |
|
511 | 496 | /** |
|
512 | 497 | * Handle a websocket entering the open state, |
|
513 |
* signaling that the kernel is connected when |
|
|
498 | * signaling that the kernel is connected when websocket is open. | |
|
514 | 499 | * |
|
515 | 500 | * @function _ws_opened |
|
516 | 501 | */ |
@@ -522,8 +507,7 b' define([' | |||
|
522 | 507 | |
|
523 | 508 | Kernel.prototype._ws_closed = function(ws_url, error) { |
|
524 | 509 | /** |
|
525 |
* Handle a websocket entering the closed state. |
|
|
526 | * other communication channels if they are open. If the websocket | |
|
510 | * Handle a websocket entering the closed state. If the websocket | |
|
527 | 511 | * was not closed due to an error, try to reconnect to the kernel. |
|
528 | 512 | * |
|
529 | 513 | * @function _ws_closed |
@@ -560,27 +544,23 b' define([' | |||
|
560 | 544 | |
|
561 | 545 | Kernel.prototype.stop_channels = function () { |
|
562 | 546 | /** |
|
563 |
* Close the websocket |
|
|
564 |
* in `this. |
|
|
547 | * Close the websocket. After successful close, the value | |
|
548 | * in `this.ws` will be null. | |
|
565 | 549 | * |
|
566 | 550 | * @function stop_channels |
|
567 | 551 | */ |
|
568 | 552 | var that = this; |
|
569 |
var close = function ( |
|
|
570 | return function () { | |
|
571 | if (that.channels[c] && that.channels[c].readyState === WebSocket.CLOSED) { | |
|
572 | that.channels[c] = null; | |
|
573 | } | |
|
574 | }; | |
|
553 | var close = function () { | |
|
554 | if (that.ws && that.ws.readyState === WebSocket.CLOSED) { | |
|
555 | that.ws = null; | |
|
556 | } | |
|
575 | 557 | }; |
|
576 |
f |
|
|
577 | if ( this.channels[c] !== null ) { | |
|
578 | if (this.channels[c].readyState === WebSocket.OPEN) { | |
|
579 |
|
|
|
580 | this.channels[c].close(); | |
|
581 |
|
|
|
582 | close(c)(); | |
|
583 | } | |
|
558 | if (this.ws !== null) { | |
|
559 | if (this.ws.readyState === WebSocket.OPEN) { | |
|
560 | this.ws.onclose = close; | |
|
561 | this.ws.close(); | |
|
562 | } else { | |
|
563 | close(); | |
|
584 | 564 | } |
|
585 | 565 | } |
|
586 | 566 | }; |
@@ -588,20 +568,18 b' define([' | |||
|
588 | 568 | Kernel.prototype.is_connected = function () { |
|
589 | 569 | /** |
|
590 | 570 | * Check whether there is a connection to the kernel. This |
|
591 |
* function only returns true if |
|
|
592 |
* created and ha |
|
|
571 | * function only returns true if websocket has been | |
|
572 | * created and has a state of WebSocket.OPEN. | |
|
593 | 573 | * |
|
594 | 574 | * @function is_connected |
|
595 | 575 | * @returns {bool} - whether there is a connection |
|
596 | 576 | */ |
|
597 | for (var c in this.channels) { | |
|
598 | // if any channel is not ready, then we're not connected | |
|
599 | if (this.channels[c] === null) { | |
|
600 | return false; | |
|
601 | } | |
|
602 | if (this.channels[c].readyState !== WebSocket.OPEN) { | |
|
603 | return false; | |
|
604 | } | |
|
577 | // if any channel is not ready, then we're not connected | |
|
578 | if (this.ws === null) { | |
|
579 | return false; | |
|
580 | } | |
|
581 | if (this.ws.readyState !== WebSocket.OPEN) { | |
|
582 | return false; | |
|
605 | 583 | } |
|
606 | 584 | return true; |
|
607 | 585 | }; |
@@ -615,12 +593,7 b' define([' | |||
|
615 | 593 | * @function is_fully_disconnected |
|
616 | 594 | * @returns {bool} - whether the kernel is fully disconnected |
|
617 | 595 | */ |
|
618 | for (var c in this.channels) { | |
|
619 | if (this.channels[c] === null) { | |
|
620 | return true; | |
|
621 | } | |
|
622 | } | |
|
623 | return false; | |
|
596 | return (this.ws === null); | |
|
624 | 597 | }; |
|
625 | 598 | |
|
626 | 599 | Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) { |
@@ -633,7 +606,8 b' define([' | |||
|
633 | 606 | throw new Error("kernel is not connected"); |
|
634 | 607 | } |
|
635 | 608 | var msg = this._get_msg(msg_type, content, metadata, buffers); |
|
636 | this.channels.shell.send(serialize.serialize(msg)); | |
|
609 | msg.channel = 'shell'; | |
|
610 | this.ws.send(serialize.serialize(msg)); | |
|
637 | 611 | this.set_callbacks_for_msg(msg.header.msg_id, callbacks); |
|
638 | 612 | return msg.header.msg_id; |
|
639 | 613 | }; |
@@ -784,7 +758,8 b' define([' | |||
|
784 | 758 | }; |
|
785 | 759 | this.events.trigger('input_reply.Kernel', {kernel: this, content: content}); |
|
786 | 760 | var msg = this._get_msg("input_reply", content); |
|
787 | this.channels.stdin.send(serialize.serialize(msg)); | |
|
761 | msg.channel = 'stdin'; | |
|
762 | this.ws.send(serialize.serialize(msg)); | |
|
788 | 763 | return msg.header.msg_id; |
|
789 | 764 | }; |
|
790 | 765 | |
@@ -877,15 +852,28 b' define([' | |||
|
877 | 852 | this.last_msg_callbacks = {}; |
|
878 | 853 | } |
|
879 | 854 | }; |
|
880 | ||
|
881 | /** | |
|
882 | * @function _handle_shell_reply | |
|
883 | */ | |
|
884 | Kernel.prototype._handle_shell_reply = function (e) { | |
|
885 | serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this)); | |
|
855 | ||
|
856 | Kernel.prototype._handle_ws_message = function (e) { | |
|
857 | serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this)); | |
|
886 | 858 | }; |
|
887 | 859 | |
|
888 |
Kernel.prototype._finish_ |
|
|
860 | Kernel.prototype._finish_ws_message = function (msg) { | |
|
861 | switch (msg.channel) { | |
|
862 | case 'shell': | |
|
863 | this._handle_shell_reply(msg); | |
|
864 | break; | |
|
865 | case 'iopub': | |
|
866 | this._handle_iopub_message(msg); | |
|
867 | break; | |
|
868 | case 'stdin': | |
|
869 | this._handle_input_request(msg); | |
|
870 | break; | |
|
871 | default: | |
|
872 | console.error("unrecognized message channel", msg.channel, msg); | |
|
873 | } | |
|
874 | }; | |
|
875 | ||
|
876 | Kernel.prototype._handle_shell_reply = function (reply) { | |
|
889 | 877 | this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply}); |
|
890 | 878 | var content = reply.content; |
|
891 | 879 | var metadata = reply.metadata; |
@@ -1030,12 +1018,7 b' define([' | |||
|
1030 | 1018 | * |
|
1031 | 1019 | * @function _handle_iopub_message |
|
1032 | 1020 | */ |
|
1033 |
Kernel.prototype._handle_iopub_message = function ( |
|
|
1034 | serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this)); | |
|
1035 | }; | |
|
1036 | ||
|
1037 | ||
|
1038 | Kernel.prototype._finish_iopub_message = function (msg) { | |
|
1021 | Kernel.prototype._handle_iopub_message = function (msg) { | |
|
1039 | 1022 | var handler = this.get_iopub_handler(msg.header.msg_type); |
|
1040 | 1023 | if (handler !== undefined) { |
|
1041 | 1024 | handler(msg); |
@@ -1045,12 +1028,7 b' define([' | |||
|
1045 | 1028 | /** |
|
1046 | 1029 | * @function _handle_input_request |
|
1047 | 1030 | */ |
|
1048 | Kernel.prototype._handle_input_request = function (e) { | |
|
1049 | serialize.deserialize(e.data, $.proxy(this._finish_input_request, this)); | |
|
1050 | }; | |
|
1051 | ||
|
1052 | ||
|
1053 | Kernel.prototype._finish_input_request = function (request) { | |
|
1031 | Kernel.prototype._handle_input_request = function (request) { | |
|
1054 | 1032 | var header = request.header; |
|
1055 | 1033 | var content = request.content; |
|
1056 | 1034 | var metadata = request.metadata; |
General Comments 0
You need to be logged in to leave comments.
Login now