##// END OF EJS Templates
Merge pull request #7389 from minrk/one-websocket...
Brian E. Granger -
r19850:62757cfc merge
parent child Browse files
Show More
@@ -0,0 +1,3 b''
1 * The notebook now uses a single websocket at `/kernels/<kernel-id>/channels` instead of separate
2 `/kernels/<kernel-id>/{shell|iopub|stdin}` channels. Messages on each channel are identified by a
3 `channel` key in the message dict, for both send and recv.
@@ -138,7 +138,7 b' class ZMQStreamHandler(WebSocketHandler):'
138 138 """meaningless for websockets"""
139 139 pass
140 140
141 def _reserialize_reply(self, msg_list):
141 def _reserialize_reply(self, msg_list, channel=None):
142 142 """Reserialize a reply message using JSON.
143 143
144 144 This takes the msg list from the ZMQ socket, deserializes it using
@@ -148,6 +148,8 b' class ZMQStreamHandler(WebSocketHandler):'
148 148 """
149 149 idents, msg_list = self.session.feed_identities(msg_list)
150 150 msg = self.session.deserialize(msg_list)
151 if channel:
152 msg['channel'] = channel
151 153 if msg['buffers']:
152 154 buf = serialize_binary_message(msg)
153 155 return buf
@@ -155,12 +157,13 b' class ZMQStreamHandler(WebSocketHandler):'
155 157 smsg = json.dumps(msg, default=date_default)
156 158 return cast_unicode(smsg)
157 159
158 def _on_zmq_reply(self, msg_list):
160 def _on_zmq_reply(self, stream, msg_list):
159 161 # Sometimes this gets triggered when the on_close method is scheduled in the
160 162 # eventloop but hasn't been called.
161 if self.stream.closed(): return
163 if stream.closed(): return
164 channel = getattr(stream, 'channel', None)
162 165 try:
163 msg = self._reserialize_reply(msg_list)
166 msg = self._reserialize_reply(msg_list, channel=channel)
164 167 except Exception:
165 168 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
166 169 else:
@@ -84,7 +84,7 b' class KernelActionHandler(IPythonHandler):'
84 84 self.finish()
85 85
86 86
87 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
87 class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
88 88
89 89 @property
90 90 def kernel_info_timeout(self):
@@ -95,8 +95,13 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
95 95
96 96 def create_stream(self):
97 97 km = self.kernel_manager
98 meth = getattr(km, 'connect_%s' % self.channel)
99 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
98 identity = self.session.bsession
99 for channel in ('shell', 'iopub', 'stdin'):
100 meth = getattr(km, 'connect_' + channel)
101 self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
102 stream.channel = channel
103 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
104 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
100 105
101 106 def request_kernel_info(self):
102 107 """send a request for kernel_info"""
@@ -160,8 +165,9 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
160 165 self._kernel_info_future.set_result(info)
161 166
162 167 def initialize(self):
163 super(ZMQChannelHandler, self).initialize()
168 super(ZMQChannelsHandler, self).initialize()
164 169 self.zmq_stream = None
170 self.channels = {}
165 171 self.kernel_id = None
166 172 self.kernel_info_channel = None
167 173 self._kernel_info_future = Future()
@@ -169,7 +175,7 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
169 175 @gen.coroutine
170 176 def pre_get(self):
171 177 # authenticate first
172 super(ZMQChannelHandler, self).pre_get()
178 super(ZMQChannelsHandler, self).pre_get()
173 179 # then request kernel info, waiting up to a certain time before giving up.
174 180 # We don't want to wait forever, because browsers don't take it well when
175 181 # servers never respond to websocket connection requests.
@@ -189,57 +195,37 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
189 195 @gen.coroutine
190 196 def get(self, kernel_id):
191 197 self.kernel_id = cast_unicode(kernel_id, 'ascii')
192 yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id)
198 yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
193 199
194 200 def open(self, kernel_id):
195 super(ZMQChannelHandler, self).open()
201 super(ZMQChannelsHandler, self).open()
196 202 try:
197 203 self.create_stream()
198 204 except web.HTTPError as e:
199 205 self.log.error("Error opening stream: %s", e)
200 206 # WebSockets don't response to traditional error codes so we
201 207 # close the connection.
202 if not self.stream.closed():
203 self.stream.close()
208 for channel, stream in self.channels.items():
209 if not stream.closed():
210 stream.close()
204 211 self.close()
205 212 else:
206 self.zmq_stream.on_recv(self._on_zmq_reply)
213 for channel, stream in self.channels.items():
214 stream.on_recv_stream(self._on_zmq_reply)
207 215
208 216 def on_message(self, msg):
209 if self.zmq_stream is None:
210 return
211 elif self.zmq_stream.closed():
212 self.log.info("%s closed, closing websocket.", self)
213 self.close()
214 return
215 217 if isinstance(msg, bytes):
216 218 msg = deserialize_binary_message(msg)
217 219 else:
218 220 msg = json.loads(msg)
219 self.session.send(self.zmq_stream, msg)
221 channel = msg.pop('channel', None)
222 if channel is None:
223 self.log.warn("No channel specified, assuming shell: %s", msg)
224 channel = 'shell'
225 stream = self.channels[channel]
226 self.session.send(stream, msg)
220 227
221 228 def on_close(self):
222 # This method can be called twice, once by self.kernel_died and once
223 # from the WebSocket close event. If the WebSocket connection is
224 # closed before the ZMQ streams are setup, they could be None.
225 if self.zmq_stream is not None and not self.zmq_stream.closed():
226 self.zmq_stream.on_recv(None)
227 # close the socket directly, don't wait for the stream
228 socket = self.zmq_stream.socket
229 self.zmq_stream.close()
230 socket.close()
231
232
233 class IOPubHandler(ZMQChannelHandler):
234 channel = 'iopub'
235
236 def create_stream(self):
237 super(IOPubHandler, self).create_stream()
238 km = self.kernel_manager
239 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
240 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
241
242 def on_close(self):
243 229 km = self.kernel_manager
244 230 if self.kernel_id in km:
245 231 km.remove_restart_callback(
@@ -248,12 +234,24 b' class IOPubHandler(ZMQChannelHandler):'
248 234 km.remove_restart_callback(
249 235 self.kernel_id, self.on_restart_failed, 'dead',
250 236 )
251 super(IOPubHandler, self).on_close()
252
237 # This method can be called twice, once by self.kernel_died and once
238 # from the WebSocket close event. If the WebSocket connection is
239 # closed before the ZMQ streams are setup, they could be None.
240 for channel, stream in self.channels.items():
241 if stream is not None and not stream.closed():
242 stream.on_recv(None)
243 # close the socket directly, don't wait for the stream
244 socket = stream.socket
245 stream.close()
246 socket.close()
247
248 self.channels = {}
249
253 250 def _send_status_message(self, status):
254 251 msg = self.session.msg("status",
255 252 {'execution_state': status}
256 253 )
254 msg['channel'] = 'iopub'
257 255 self.write_message(json.dumps(msg, default=date_default))
258 256
259 257 def on_kernel_restarted(self):
@@ -263,18 +261,6 b' class IOPubHandler(ZMQChannelHandler):'
263 261 def on_restart_failed(self):
264 262 logging.error("kernel %s restarted failed!", self.kernel_id)
265 263 self._send_status_message('dead')
266
267 def on_message(self, msg):
268 """IOPub messages make no sense"""
269 pass
270
271
272 class ShellHandler(ZMQChannelHandler):
273 channel = 'shell'
274
275
276 class StdinHandler(ZMQChannelHandler):
277 channel = 'stdin'
278 264
279 265
280 266 #-----------------------------------------------------------------------------
@@ -289,7 +275,5 b' default_handlers = ['
289 275 (r"/api/kernels", MainKernelHandler),
290 276 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
291 277 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
292 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
293 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
294 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
278 (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
295 279 ]
@@ -28,12 +28,7 b' define(['
28 28
29 29 this.id = null;
30 30 this.name = name;
31
32 this.channels = {
33 'shell': null,
34 'iopub': null,
35 'stdin': null
36 };
31 this.ws = null;
37 32
38 33 this.kernel_service_url = kernel_service_url;
39 34 this.kernel_url = null;
@@ -429,7 +424,7 b' define(['
429 424
430 425 Kernel.prototype.start_channels = function () {
431 426 /**
432 * Start the `shell`and `iopub` channels.
427 * Start the websocket channels.
433 428 * Will stop and restart them if they already exist.
434 429 *
435 430 * @function start_channels
@@ -440,16 +435,12 b' define(['
440 435
441 436 console.log("Starting WebSockets:", ws_host_url);
442 437
443 var channel_url = function(channel) {
444 return [
438 this.ws = new this.WebSocket([
445 439 that.ws_url,
446 utils.url_join_encode(that.kernel_url, channel),
440 utils.url_join_encode(that.kernel_url, 'channels'),
447 441 "?session_id=" + that.session_id
448 ].join('');
449 };
450 this.channels.shell = new this.WebSocket(channel_url("shell"));
451 this.channels.stdin = new this.WebSocket(channel_url("stdin"));
452 this.channels.iopub = new this.WebSocket(channel_url("iopub"));
442 ].join('')
443 );
453 444
454 445 var already_called_onclose = false; // only alert once
455 446 var ws_closed_early = function(evt){
@@ -489,28 +480,22 b' define(['
489 480 that._ws_closed(ws_host_url, true);
490 481 };
491 482
492 for (var c in this.channels) {
493 this.channels[c].onopen = $.proxy(this._ws_opened, this);
494 this.channels[c].onclose = ws_closed_early;
495 this.channels[c].onerror = ws_error;
496 }
483 this.ws.onopen = $.proxy(this._ws_opened, this);
484 this.ws.onclose = ws_closed_early;
485 this.ws.onerror = ws_error;
497 486 // switch from early-close to late-close message after 1s
498 487 setTimeout(function() {
499 for (var c in that.channels) {
500 if (that.channels[c] !== null) {
501 that.channels[c].onclose = ws_closed_late;
502 }
488 if (that.ws !== null) {
489 that.ws.onclose = ws_closed_late;
503 490 }
504 491 }, 1000);
505 this.channels.shell.onmessage = $.proxy(this._handle_shell_reply, this);
506 this.channels.iopub.onmessage = $.proxy(this._handle_iopub_message, this);
507 this.channels.stdin.onmessage = $.proxy(this._handle_input_request, this);
492 this.ws.onmessage = $.proxy(this._handle_ws_message, this);
508 493 };
509 494
510 495 Kernel.prototype._ws_opened = function (evt) {
511 496 /**
512 497 * Handle a websocket entering the open state,
513 * signaling that the kernel is connected when all channels are open.
498 * signaling that the kernel is connected when websocket is open.
514 499 *
515 500 * @function _ws_opened
516 501 */
@@ -522,8 +507,7 b' define(['
522 507
523 508 Kernel.prototype._ws_closed = function(ws_url, error) {
524 509 /**
525 * Handle a websocket entering the closed state. This closes the
526 * other communication channels if they are open. If the websocket
510 * Handle a websocket entering the closed state. If the websocket
527 511 * was not closed due to an error, try to reconnect to the kernel.
528 512 *
529 513 * @function _ws_closed
@@ -560,27 +544,23 b' define(['
560 544
561 545 Kernel.prototype.stop_channels = function () {
562 546 /**
563 * Close the websocket channels. After successful close, the value
564 * in `this.channels[channel_name]` will be null.
547 * Close the websocket. After successful close, the value
548 * in `this.ws` will be null.
565 549 *
566 550 * @function stop_channels
567 551 */
568 552 var that = this;
569 var close = function (c) {
570 return function () {
571 if (that.channels[c] && that.channels[c].readyState === WebSocket.CLOSED) {
572 that.channels[c] = null;
573 }
574 };
553 var close = function () {
554 if (that.ws && that.ws.readyState === WebSocket.CLOSED) {
555 that.ws = null;
556 }
575 557 };
576 for (var c in this.channels) {
577 if ( this.channels[c] !== null ) {
578 if (this.channels[c].readyState === WebSocket.OPEN) {
579 this.channels[c].onclose = close(c);
580 this.channels[c].close();
581 } else {
582 close(c)();
583 }
558 if (this.ws !== null) {
559 if (this.ws.readyState === WebSocket.OPEN) {
560 this.ws.onclose = close;
561 this.ws.close();
562 } else {
563 close();
584 564 }
585 565 }
586 566 };
@@ -588,20 +568,18 b' define(['
588 568 Kernel.prototype.is_connected = function () {
589 569 /**
590 570 * Check whether there is a connection to the kernel. This
591 * function only returns true if all channel objects have been
592 * created and have a state of WebSocket.OPEN.
571 * function only returns true if websocket has been
572 * created and has a state of WebSocket.OPEN.
593 573 *
594 574 * @function is_connected
595 575 * @returns {bool} - whether there is a connection
596 576 */
597 for (var c in this.channels) {
598 // if any channel is not ready, then we're not connected
599 if (this.channels[c] === null) {
600 return false;
601 }
602 if (this.channels[c].readyState !== WebSocket.OPEN) {
603 return false;
604 }
577 // if any channel is not ready, then we're not connected
578 if (this.ws === null) {
579 return false;
580 }
581 if (this.ws.readyState !== WebSocket.OPEN) {
582 return false;
605 583 }
606 584 return true;
607 585 };
@@ -615,12 +593,7 b' define(['
615 593 * @function is_fully_disconnected
616 594 * @returns {bool} - whether the kernel is fully disconnected
617 595 */
618 for (var c in this.channels) {
619 if (this.channels[c] === null) {
620 return true;
621 }
622 }
623 return false;
596 return (this.ws === null);
624 597 };
625 598
626 599 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
@@ -633,7 +606,8 b' define(['
633 606 throw new Error("kernel is not connected");
634 607 }
635 608 var msg = this._get_msg(msg_type, content, metadata, buffers);
636 this.channels.shell.send(serialize.serialize(msg));
609 msg.channel = 'shell';
610 this.ws.send(serialize.serialize(msg));
637 611 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
638 612 return msg.header.msg_id;
639 613 };
@@ -784,7 +758,8 b' define(['
784 758 };
785 759 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
786 760 var msg = this._get_msg("input_reply", content);
787 this.channels.stdin.send(serialize.serialize(msg));
761 msg.channel = 'stdin';
762 this.ws.send(serialize.serialize(msg));
788 763 return msg.header.msg_id;
789 764 };
790 765
@@ -877,15 +852,28 b' define(['
877 852 this.last_msg_callbacks = {};
878 853 }
879 854 };
880
881 /**
882 * @function _handle_shell_reply
883 */
884 Kernel.prototype._handle_shell_reply = function (e) {
885 serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this));
855
856 Kernel.prototype._handle_ws_message = function (e) {
857 serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this));
886 858 };
887 859
888 Kernel.prototype._finish_shell_reply = function (reply) {
860 Kernel.prototype._finish_ws_message = function (msg) {
861 switch (msg.channel) {
862 case 'shell':
863 this._handle_shell_reply(msg);
864 break;
865 case 'iopub':
866 this._handle_iopub_message(msg);
867 break;
868 case 'stdin':
869 this._handle_input_request(msg);
870 break;
871 default:
872 console.error("unrecognized message channel", msg.channel, msg);
873 }
874 };
875
876 Kernel.prototype._handle_shell_reply = function (reply) {
889 877 this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
890 878 var content = reply.content;
891 879 var metadata = reply.metadata;
@@ -1030,12 +1018,7 b' define(['
1030 1018 *
1031 1019 * @function _handle_iopub_message
1032 1020 */
1033 Kernel.prototype._handle_iopub_message = function (e) {
1034 serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this));
1035 };
1036
1037
1038 Kernel.prototype._finish_iopub_message = function (msg) {
1021 Kernel.prototype._handle_iopub_message = function (msg) {
1039 1022 var handler = this.get_iopub_handler(msg.header.msg_type);
1040 1023 if (handler !== undefined) {
1041 1024 handler(msg);
@@ -1045,12 +1028,7 b' define(['
1045 1028 /**
1046 1029 * @function _handle_input_request
1047 1030 */
1048 Kernel.prototype._handle_input_request = function (e) {
1049 serialize.deserialize(e.data, $.proxy(this._finish_input_request, this));
1050 };
1051
1052
1053 Kernel.prototype._finish_input_request = function (request) {
1031 Kernel.prototype._handle_input_request = function (request) {
1054 1032 var header = request.header;
1055 1033 var content = request.content;
1056 1034 var metadata = request.metadata;
General Comments 0
You need to be logged in to leave comments. Login now