##// END OF EJS Templates
use single WebSocket connection for all channels...
Min RK -
Show More
@@ -138,7 +138,7 b' class ZMQStreamHandler(WebSocketHandler):'
138 """meaningless for websockets"""
138 """meaningless for websockets"""
139 pass
139 pass
140
140
141 def _reserialize_reply(self, msg_list):
141 def _reserialize_reply(self, msg_list, channel=None):
142 """Reserialize a reply message using JSON.
142 """Reserialize a reply message using JSON.
143
143
144 This takes the msg list from the ZMQ socket, deserializes it using
144 This takes the msg list from the ZMQ socket, deserializes it using
@@ -148,6 +148,8 b' class ZMQStreamHandler(WebSocketHandler):'
148 """
148 """
149 idents, msg_list = self.session.feed_identities(msg_list)
149 idents, msg_list = self.session.feed_identities(msg_list)
150 msg = self.session.deserialize(msg_list)
150 msg = self.session.deserialize(msg_list)
151 if channel:
152 msg['channel'] = channel
151 if msg['buffers']:
153 if msg['buffers']:
152 buf = serialize_binary_message(msg)
154 buf = serialize_binary_message(msg)
153 return buf
155 return buf
@@ -155,12 +157,13 b' class ZMQStreamHandler(WebSocketHandler):'
155 smsg = json.dumps(msg, default=date_default)
157 smsg = json.dumps(msg, default=date_default)
156 return cast_unicode(smsg)
158 return cast_unicode(smsg)
157
159
158 def _on_zmq_reply(self, msg_list):
160 def _on_zmq_reply(self, stream, msg_list):
159 # Sometimes this gets triggered when the on_close method is scheduled in the
161 # Sometimes this gets triggered when the on_close method is scheduled in the
160 # eventloop but hasn't been called.
162 # eventloop but hasn't been called.
161 if self.stream.closed(): return
163 if stream.closed(): return
164 channel = getattr(stream, 'channel', None)
162 try:
165 try:
163 msg = self._reserialize_reply(msg_list)
166 msg = self._reserialize_reply(msg_list, channel=channel)
164 except Exception:
167 except Exception:
165 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
168 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
166 else:
169 else:
@@ -84,7 +84,7 b' class KernelActionHandler(IPythonHandler):'
84 self.finish()
84 self.finish()
85
85
86
86
87 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
87 class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
88
88
89 @property
89 @property
90 def kernel_info_timeout(self):
90 def kernel_info_timeout(self):
@@ -95,8 +95,13 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
95
95
96 def create_stream(self):
96 def create_stream(self):
97 km = self.kernel_manager
97 km = self.kernel_manager
98 meth = getattr(km, 'connect_%s' % self.channel)
98 identity = self.session.bsession
99 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
99 for channel in ('shell', 'iopub', 'stdin'):
100 meth = getattr(km, 'connect_' + channel)
101 self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
102 stream.channel = channel
103 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
104 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
100
105
101 def request_kernel_info(self):
106 def request_kernel_info(self):
102 """send a request for kernel_info"""
107 """send a request for kernel_info"""
@@ -160,8 +165,9 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
160 self._kernel_info_future.set_result(info)
165 self._kernel_info_future.set_result(info)
161
166
162 def initialize(self):
167 def initialize(self):
163 super(ZMQChannelHandler, self).initialize()
168 super(ZMQChannelsHandler, self).initialize()
164 self.zmq_stream = None
169 self.zmq_stream = None
170 self.channels = {}
165 self.kernel_id = None
171 self.kernel_id = None
166 self.kernel_info_channel = None
172 self.kernel_info_channel = None
167 self._kernel_info_future = Future()
173 self._kernel_info_future = Future()
@@ -169,7 +175,7 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
169 @gen.coroutine
175 @gen.coroutine
170 def pre_get(self):
176 def pre_get(self):
171 # authenticate first
177 # authenticate first
172 super(ZMQChannelHandler, self).pre_get()
178 super(ZMQChannelsHandler, self).pre_get()
173 # then request kernel info, waiting up to a certain time before giving up.
179 # then request kernel info, waiting up to a certain time before giving up.
174 # We don't want to wait forever, because browsers don't take it well when
180 # We don't want to wait forever, because browsers don't take it well when
175 # servers never respond to websocket connection requests.
181 # servers never respond to websocket connection requests.
@@ -189,55 +195,35 b' class ZMQChannelHandler(AuthenticatedZMQStreamHandler):'
189 @gen.coroutine
195 @gen.coroutine
190 def get(self, kernel_id):
196 def get(self, kernel_id):
191 self.kernel_id = cast_unicode(kernel_id, 'ascii')
197 self.kernel_id = cast_unicode(kernel_id, 'ascii')
192 yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id)
198 yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
193
199
194 def open(self, kernel_id):
200 def open(self, kernel_id):
195 super(ZMQChannelHandler, self).open()
201 super(ZMQChannelsHandler, self).open()
196 try:
202 try:
197 self.create_stream()
203 self.create_stream()
198 except web.HTTPError as e:
204 except web.HTTPError as e:
199 self.log.error("Error opening stream: %s", e)
205 self.log.error("Error opening stream: %s", e)
200 # WebSockets don't response to traditional error codes so we
206 # WebSockets don't response to traditional error codes so we
201 # close the connection.
207 # close the connection.
202 if not self.stream.closed():
208 for channel, stream in self.channels.items():
203 self.stream.close()
209 if not stream.closed():
210 stream.close()
204 self.close()
211 self.close()
205 else:
212 else:
206 self.zmq_stream.on_recv(self._on_zmq_reply)
213 for channel, stream in self.channels.items():
214 stream.on_recv_stream(self._on_zmq_reply)
207
215
208 def on_message(self, msg):
216 def on_message(self, msg):
209 if self.zmq_stream is None:
210 return
211 elif self.zmq_stream.closed():
212 self.log.info("%s closed, closing websocket.", self)
213 self.close()
214 return
215 if isinstance(msg, bytes):
217 if isinstance(msg, bytes):
216 msg = deserialize_binary_message(msg)
218 msg = deserialize_binary_message(msg)
217 else:
219 else:
218 msg = json.loads(msg)
220 msg = json.loads(msg)
219 self.session.send(self.zmq_stream, msg)
221 channel = msg.pop('channel', None)
220
222 if channel is None:
221 def on_close(self):
223 self.log.warn("No channel specified, assuming shell: %s", msg)
222 # This method can be called twice, once by self.kernel_died and once
224 channel = 'shell'
223 # from the WebSocket close event. If the WebSocket connection is
225 stream = self.channels[channel]
224 # closed before the ZMQ streams are setup, they could be None.
226 self.session.send(stream, msg)
225 if self.zmq_stream is not None and not self.zmq_stream.closed():
226 self.zmq_stream.on_recv(None)
227 # close the socket directly, don't wait for the stream
228 socket = self.zmq_stream.socket
229 self.zmq_stream.close()
230 socket.close()
231
232
233 class IOPubHandler(ZMQChannelHandler):
234 channel = 'iopub'
235
236 def create_stream(self):
237 super(IOPubHandler, self).create_stream()
238 km = self.kernel_manager
239 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
240 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
241
227
242 def on_close(self):
228 def on_close(self):
243 km = self.kernel_manager
229 km = self.kernel_manager
@@ -248,12 +234,24 b' class IOPubHandler(ZMQChannelHandler):'
248 km.remove_restart_callback(
234 km.remove_restart_callback(
249 self.kernel_id, self.on_restart_failed, 'dead',
235 self.kernel_id, self.on_restart_failed, 'dead',
250 )
236 )
251 super(IOPubHandler, self).on_close()
237 # This method can be called twice, once by self.kernel_died and once
238 # from the WebSocket close event. If the WebSocket connection is
239 # closed before the ZMQ streams are setup, they could be None.
240 for channel, stream in self.channels.items():
241 if stream is not None and not stream.closed():
242 stream.on_recv(None)
243 # close the socket directly, don't wait for the stream
244 socket = stream.socket
245 stream.close()
246 socket.close()
247
248 self.channels = {}
252
249
253 def _send_status_message(self, status):
250 def _send_status_message(self, status):
254 msg = self.session.msg("status",
251 msg = self.session.msg("status",
255 {'execution_state': status}
252 {'execution_state': status}
256 )
253 )
254 msg['channel'] = 'iopub'
257 self.write_message(json.dumps(msg, default=date_default))
255 self.write_message(json.dumps(msg, default=date_default))
258
256
259 def on_kernel_restarted(self):
257 def on_kernel_restarted(self):
@@ -264,18 +262,6 b' class IOPubHandler(ZMQChannelHandler):'
264 logging.error("kernel %s restarted failed!", self.kernel_id)
262 logging.error("kernel %s restarted failed!", self.kernel_id)
265 self._send_status_message('dead')
263 self._send_status_message('dead')
266
264
267 def on_message(self, msg):
268 """IOPub messages make no sense"""
269 pass
270
271
272 class ShellHandler(ZMQChannelHandler):
273 channel = 'shell'
274
275
276 class StdinHandler(ZMQChannelHandler):
277 channel = 'stdin'
278
279
265
280 #-----------------------------------------------------------------------------
266 #-----------------------------------------------------------------------------
281 # URL to handler mappings
267 # URL to handler mappings
@@ -289,7 +275,5 b' default_handlers = ['
289 (r"/api/kernels", MainKernelHandler),
275 (r"/api/kernels", MainKernelHandler),
290 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
276 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
291 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
277 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
292 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
278 (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
293 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
294 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
295 ]
279 ]
@@ -28,12 +28,7 b' define(['
28
28
29 this.id = null;
29 this.id = null;
30 this.name = name;
30 this.name = name;
31
31 this.ws = null;
32 this.channels = {
33 'shell': null,
34 'iopub': null,
35 'stdin': null
36 };
37
32
38 this.kernel_service_url = kernel_service_url;
33 this.kernel_service_url = kernel_service_url;
39 this.kernel_url = null;
34 this.kernel_url = null;
@@ -429,7 +424,7 b' define(['
429
424
430 Kernel.prototype.start_channels = function () {
425 Kernel.prototype.start_channels = function () {
431 /**
426 /**
432 * Start the `shell`and `iopub` channels.
427 * Start the websocket channels.
433 * Will stop and restart them if they already exist.
428 * Will stop and restart them if they already exist.
434 *
429 *
435 * @function start_channels
430 * @function start_channels
@@ -440,16 +435,12 b' define(['
440
435
441 console.log("Starting WebSockets:", ws_host_url);
436 console.log("Starting WebSockets:", ws_host_url);
442
437
443 var channel_url = function(channel) {
438 this.ws = new this.WebSocket([
444 return [
445 that.ws_url,
439 that.ws_url,
446 utils.url_join_encode(that.kernel_url, channel),
440 utils.url_join_encode(that.kernel_url, 'channels'),
447 "?session_id=" + that.session_id
441 "?session_id=" + that.session_id
448 ].join('');
442 ].join('')
449 };
443 );
450 this.channels.shell = new this.WebSocket(channel_url("shell"));
451 this.channels.stdin = new this.WebSocket(channel_url("stdin"));
452 this.channels.iopub = new this.WebSocket(channel_url("iopub"));
453
444
454 var already_called_onclose = false; // only alert once
445 var already_called_onclose = false; // only alert once
455 var ws_closed_early = function(evt){
446 var ws_closed_early = function(evt){
@@ -489,28 +480,22 b' define(['
489 that._ws_closed(ws_host_url, true);
480 that._ws_closed(ws_host_url, true);
490 };
481 };
491
482
492 for (var c in this.channels) {
483 this.ws.onopen = $.proxy(this._ws_opened, this);
493 this.channels[c].onopen = $.proxy(this._ws_opened, this);
484 this.ws.onclose = ws_closed_early;
494 this.channels[c].onclose = ws_closed_early;
485 this.ws.onerror = ws_error;
495 this.channels[c].onerror = ws_error;
496 }
497 // switch from early-close to late-close message after 1s
486 // switch from early-close to late-close message after 1s
498 setTimeout(function() {
487 setTimeout(function() {
499 for (var c in that.channels) {
488 if (that.ws !== null) {
500 if (that.channels[c] !== null) {
489 that.ws.onclose = ws_closed_late;
501 that.channels[c].onclose = ws_closed_late;
502 }
503 }
490 }
504 }, 1000);
491 }, 1000);
505 this.channels.shell.onmessage = $.proxy(this._handle_shell_reply, this);
492 this.ws.onmessage = $.proxy(this._handle_ws_message, this);
506 this.channels.iopub.onmessage = $.proxy(this._handle_iopub_message, this);
507 this.channels.stdin.onmessage = $.proxy(this._handle_input_request, this);
508 };
493 };
509
494
510 Kernel.prototype._ws_opened = function (evt) {
495 Kernel.prototype._ws_opened = function (evt) {
511 /**
496 /**
512 * Handle a websocket entering the open state,
497 * Handle a websocket entering the open state,
513 * signaling that the kernel is connected when all channels are open.
498 * signaling that the kernel is connected when websocket is open.
514 *
499 *
515 * @function _ws_opened
500 * @function _ws_opened
516 */
501 */
@@ -522,8 +507,7 b' define(['
522
507
523 Kernel.prototype._ws_closed = function(ws_url, error) {
508 Kernel.prototype._ws_closed = function(ws_url, error) {
524 /**
509 /**
525 * Handle a websocket entering the closed state. This closes the
510 * Handle a websocket entering the closed state. If the websocket
526 * other communication channels if they are open. If the websocket
527 * was not closed due to an error, try to reconnect to the kernel.
511 * was not closed due to an error, try to reconnect to the kernel.
528 *
512 *
529 * @function _ws_closed
513 * @function _ws_closed
@@ -560,27 +544,23 b' define(['
560
544
561 Kernel.prototype.stop_channels = function () {
545 Kernel.prototype.stop_channels = function () {
562 /**
546 /**
563 * Close the websocket channels. After successful close, the value
547 * Close the websocket. After successful close, the value
564 * in `this.channels[channel_name]` will be null.
548 * in `this.ws` will be null.
565 *
549 *
566 * @function stop_channels
550 * @function stop_channels
567 */
551 */
568 var that = this;
552 var that = this;
569 var close = function (c) {
553 var close = function () {
570 return function () {
554 if (that.ws && that.ws.readyState === WebSocket.CLOSED) {
571 if (that.channels[c] && that.channels[c].readyState === WebSocket.CLOSED) {
555 that.ws = null;
572 that.channels[c] = null;
573 }
556 }
574 };
557 };
575 };
558 if (this.ws !== null) {
576 for (var c in this.channels) {
559 if (this.ws.readyState === WebSocket.OPEN) {
577 if ( this.channels[c] !== null ) {
560 this.ws.onclose = close;
578 if (this.channels[c].readyState === WebSocket.OPEN) {
561 this.ws.close();
579 this.channels[c].onclose = close(c);
580 this.channels[c].close();
581 } else {
562 } else {
582 close(c)();
563 close();
583 }
584 }
564 }
585 }
565 }
586 };
566 };
@@ -588,21 +568,19 b' define(['
588 Kernel.prototype.is_connected = function () {
568 Kernel.prototype.is_connected = function () {
589 /**
569 /**
590 * Check whether there is a connection to the kernel. This
570 * Check whether there is a connection to the kernel. This
591 * function only returns true if all channel objects have been
571 * function only returns true if websocket has been
592 * created and have a state of WebSocket.OPEN.
572 * created and has a state of WebSocket.OPEN.
593 *
573 *
594 * @function is_connected
574 * @function is_connected
595 * @returns {bool} - whether there is a connection
575 * @returns {bool} - whether there is a connection
596 */
576 */
597 for (var c in this.channels) {
598 // if any channel is not ready, then we're not connected
577 // if any channel is not ready, then we're not connected
599 if (this.channels[c] === null) {
578 if (this.ws === null) {
600 return false;
579 return false;
601 }
580 }
602 if (this.channels[c].readyState !== WebSocket.OPEN) {
581 if (this.ws.readyState !== WebSocket.OPEN) {
603 return false;
582 return false;
604 }
583 }
605 }
606 return true;
584 return true;
607 };
585 };
608
586
@@ -615,12 +593,7 b' define(['
615 * @function is_fully_disconnected
593 * @function is_fully_disconnected
616 * @returns {bool} - whether the kernel is fully disconnected
594 * @returns {bool} - whether the kernel is fully disconnected
617 */
595 */
618 for (var c in this.channels) {
596 return (this.ws === null);
619 if (this.channels[c] === null) {
620 return true;
621 }
622 }
623 return false;
624 };
597 };
625
598
626 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
599 Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
@@ -633,7 +606,8 b' define(['
633 throw new Error("kernel is not connected");
606 throw new Error("kernel is not connected");
634 }
607 }
635 var msg = this._get_msg(msg_type, content, metadata, buffers);
608 var msg = this._get_msg(msg_type, content, metadata, buffers);
636 this.channels.shell.send(serialize.serialize(msg));
609 msg.channel = 'shell';
610 this.ws.send(serialize.serialize(msg));
637 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
611 this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
638 return msg.header.msg_id;
612 return msg.header.msg_id;
639 };
613 };
@@ -784,7 +758,8 b' define(['
784 };
758 };
785 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
759 this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
786 var msg = this._get_msg("input_reply", content);
760 var msg = this._get_msg("input_reply", content);
787 this.channels.stdin.send(serialize.serialize(msg));
761 msg.channel = 'stdin';
762 this.ws.send(serialize.serialize(msg));
788 return msg.header.msg_id;
763 return msg.header.msg_id;
789 };
764 };
790
765
@@ -878,14 +853,27 b' define(['
878 }
853 }
879 };
854 };
880
855
881 /**
856 Kernel.prototype._handle_ws_message = function (e) {
882 * @function _handle_shell_reply
857 serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this));
883 */
858 };
884 Kernel.prototype._handle_shell_reply = function (e) {
859
885 serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this));
860 Kernel.prototype._finish_ws_message = function (msg) {
861 switch (msg.channel) {
862 case 'shell':
863 this._handle_shell_reply(msg);
864 break;
865 case 'iopub':
866 this._handle_iopub_message(msg);
867 break;
868 case 'stdin':
869 this._handle_input_request(msg);
870 break;
871 default:
872 console.error("unrecognized message channel", msg.channel, msg);
873 }
886 };
874 };
887
875
888 Kernel.prototype._finish_shell_reply = function (reply) {
876 Kernel.prototype._handle_shell_reply = function (reply) {
889 this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
877 this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
890 var content = reply.content;
878 var content = reply.content;
891 var metadata = reply.metadata;
879 var metadata = reply.metadata;
@@ -1030,12 +1018,7 b' define(['
1030 *
1018 *
1031 * @function _handle_iopub_message
1019 * @function _handle_iopub_message
1032 */
1020 */
1033 Kernel.prototype._handle_iopub_message = function (e) {
1021 Kernel.prototype._handle_iopub_message = function (msg) {
1034 serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this));
1035 };
1036
1037
1038 Kernel.prototype._finish_iopub_message = function (msg) {
1039 var handler = this.get_iopub_handler(msg.header.msg_type);
1022 var handler = this.get_iopub_handler(msg.header.msg_type);
1040 if (handler !== undefined) {
1023 if (handler !== undefined) {
1041 handler(msg);
1024 handler(msg);
@@ -1045,12 +1028,7 b' define(['
1045 /**
1028 /**
1046 * @function _handle_input_request
1029 * @function _handle_input_request
1047 */
1030 */
1048 Kernel.prototype._handle_input_request = function (e) {
1031 Kernel.prototype._handle_input_request = function (request) {
1049 serialize.deserialize(e.data, $.proxy(this._finish_input_request, this));
1050 };
1051
1052
1053 Kernel.prototype._finish_input_request = function (request) {
1054 var header = request.header;
1032 var header = request.header;
1055 var content = request.content;
1033 var content = request.content;
1056 var metadata = request.metadata;
1034 var metadata = request.metadata;
General Comments 0
You need to be logged in to leave comments. Login now