diff --git a/IPython/html/base/zmqhandlers.py b/IPython/html/base/zmqhandlers.py index 6f5b302..8d7a2dc 100644 --- a/IPython/html/base/zmqhandlers.py +++ b/IPython/html/base/zmqhandlers.py @@ -138,7 +138,7 @@ class ZMQStreamHandler(WebSocketHandler): """meaningless for websockets""" pass - def _reserialize_reply(self, msg_list): + def _reserialize_reply(self, msg_list, channel=None): """Reserialize a reply message using JSON. This takes the msg list from the ZMQ socket, deserializes it using @@ -148,6 +148,8 @@ class ZMQStreamHandler(WebSocketHandler): """ idents, msg_list = self.session.feed_identities(msg_list) msg = self.session.deserialize(msg_list) + if channel: + msg['channel'] = channel if msg['buffers']: buf = serialize_binary_message(msg) return buf @@ -155,12 +157,13 @@ class ZMQStreamHandler(WebSocketHandler): smsg = json.dumps(msg, default=date_default) return cast_unicode(smsg) - def _on_zmq_reply(self, msg_list): + def _on_zmq_reply(self, stream, msg_list): # Sometimes this gets triggered when the on_close method is scheduled in the # eventloop but hasn't been called. - if self.stream.closed(): return + if stream.closed(): return + channel = getattr(stream, 'channel', None) try: - msg = self._reserialize_reply(msg_list) + msg = self._reserialize_reply(msg_list, channel=channel) except Exception: self.log.critical("Malformed message: %r" % msg_list, exc_info=True) else: diff --git a/IPython/html/services/kernels/handlers.py b/IPython/html/services/kernels/handlers.py index 681a7fc..3b17f3b 100644 --- a/IPython/html/services/kernels/handlers.py +++ b/IPython/html/services/kernels/handlers.py @@ -84,7 +84,7 @@ class KernelActionHandler(IPythonHandler): self.finish() -class ZMQChannelHandler(AuthenticatedZMQStreamHandler): +class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): @property def kernel_info_timeout(self): @@ -95,8 +95,13 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): def create_stream(self): km = self.kernel_manager - meth = getattr(km, 'connect_%s' % self.channel) - self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession) + identity = self.session.bsession + for channel in ('shell', 'iopub', 'stdin'): + meth = getattr(km, 'connect_' + channel) + self.channels[channel] = stream = meth(self.kernel_id, identity=identity) + stream.channel = channel + km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) + km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') def request_kernel_info(self): """send a request for kernel_info""" @@ -160,8 +165,9 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): self._kernel_info_future.set_result(info) def initialize(self): - super(ZMQChannelHandler, self).initialize() + super(ZMQChannelsHandler, self).initialize() self.zmq_stream = None + self.channels = {} self.kernel_id = None self.kernel_info_channel = None self._kernel_info_future = Future() @@ -169,7 +175,7 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): @gen.coroutine def pre_get(self): # authenticate first - super(ZMQChannelHandler, self).pre_get() + super(ZMQChannelsHandler, self).pre_get() # then request kernel info, waiting up to a certain time before giving up. # We don't want to wait forever, because browsers don't take it well when # servers never respond to websocket connection requests. @@ -189,57 +195,37 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): @gen.coroutine def get(self, kernel_id): self.kernel_id = cast_unicode(kernel_id, 'ascii') - yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id) + yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id) def open(self, kernel_id): - super(ZMQChannelHandler, self).open() + super(ZMQChannelsHandler, self).open() try: self.create_stream() except web.HTTPError as e: self.log.error("Error opening stream: %s", e) # WebSockets don't response to traditional error codes so we # close the connection. - if not self.stream.closed(): - self.stream.close() + for channel, stream in self.channels.items(): + if not stream.closed(): + stream.close() self.close() else: - self.zmq_stream.on_recv(self._on_zmq_reply) + for channel, stream in self.channels.items(): + stream.on_recv_stream(self._on_zmq_reply) def on_message(self, msg): - if self.zmq_stream is None: - return - elif self.zmq_stream.closed(): - self.log.info("%s closed, closing websocket.", self) - self.close() - return if isinstance(msg, bytes): msg = deserialize_binary_message(msg) else: msg = json.loads(msg) - self.session.send(self.zmq_stream, msg) + channel = msg.pop('channel', None) + if channel is None: + self.log.warn("No channel specified, assuming shell: %s", msg) + channel = 'shell' + stream = self.channels[channel] + self.session.send(stream, msg) def on_close(self): - # This method can be called twice, once by self.kernel_died and once - # from the WebSocket close event. If the WebSocket connection is - # closed before the ZMQ streams are setup, they could be None. - if self.zmq_stream is not None and not self.zmq_stream.closed(): - self.zmq_stream.on_recv(None) - # close the socket directly, don't wait for the stream - socket = self.zmq_stream.socket - self.zmq_stream.close() - socket.close() - - -class IOPubHandler(ZMQChannelHandler): - channel = 'iopub' - - def create_stream(self): - super(IOPubHandler, self).create_stream() - km = self.kernel_manager - km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) - km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') - - def on_close(self): km = self.kernel_manager if self.kernel_id in km: km.remove_restart_callback( @@ -248,12 +234,24 @@ class IOPubHandler(ZMQChannelHandler): km.remove_restart_callback( self.kernel_id, self.on_restart_failed, 'dead', ) - super(IOPubHandler, self).on_close() - + # This method can be called twice, once by self.kernel_died and once + # from the WebSocket close event. If the WebSocket connection is + # closed before the ZMQ streams are setup, they could be None. + for channel, stream in self.channels.items(): + if stream is not None and not stream.closed(): + stream.on_recv(None) + # close the socket directly, don't wait for the stream + socket = stream.socket + stream.close() + socket.close() + + self.channels = {} + def _send_status_message(self, status): msg = self.session.msg("status", {'execution_state': status} ) + msg['channel'] = 'iopub' self.write_message(json.dumps(msg, default=date_default)) def on_kernel_restarted(self): @@ -263,18 +261,6 @@ class IOPubHandler(ZMQChannelHandler): def on_restart_failed(self): logging.error("kernel %s restarted failed!", self.kernel_id) self._send_status_message('dead') - - def on_message(self, msg): - """IOPub messages make no sense""" - pass - - -class ShellHandler(ZMQChannelHandler): - channel = 'shell' - - -class StdinHandler(ZMQChannelHandler): - channel = 'stdin' #----------------------------------------------------------------------------- @@ -289,7 +275,5 @@ default_handlers = [ (r"/api/kernels", MainKernelHandler), (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler), (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler), - (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler), - (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler), - (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler) + (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler), ] diff --git a/IPython/html/static/services/kernels/kernel.js b/IPython/html/static/services/kernels/kernel.js index 03c6953..bb566ac 100644 --- a/IPython/html/static/services/kernels/kernel.js +++ b/IPython/html/static/services/kernels/kernel.js @@ -28,12 +28,7 @@ define([ this.id = null; this.name = name; - - this.channels = { - 'shell': null, - 'iopub': null, - 'stdin': null - }; + this.ws = null; this.kernel_service_url = kernel_service_url; this.kernel_url = null; @@ -429,7 +424,7 @@ define([ Kernel.prototype.start_channels = function () { /** - * Start the `shell`and `iopub` channels. + * Start the websocket channels. * Will stop and restart them if they already exist. * * @function start_channels @@ -440,16 +435,12 @@ define([ console.log("Starting WebSockets:", ws_host_url); - var channel_url = function(channel) { - return [ + this.ws = new this.WebSocket([ that.ws_url, - utils.url_join_encode(that.kernel_url, channel), + utils.url_join_encode(that.kernel_url, 'channels'), "?session_id=" + that.session_id - ].join(''); - }; - this.channels.shell = new this.WebSocket(channel_url("shell")); - this.channels.stdin = new this.WebSocket(channel_url("stdin")); - this.channels.iopub = new this.WebSocket(channel_url("iopub")); + ].join('') + ); var already_called_onclose = false; // only alert once var ws_closed_early = function(evt){ @@ -489,28 +480,22 @@ define([ that._ws_closed(ws_host_url, true); }; - for (var c in this.channels) { - this.channels[c].onopen = $.proxy(this._ws_opened, this); - this.channels[c].onclose = ws_closed_early; - this.channels[c].onerror = ws_error; - } + this.ws.onopen = $.proxy(this._ws_opened, this); + this.ws.onclose = ws_closed_early; + this.ws.onerror = ws_error; // switch from early-close to late-close message after 1s setTimeout(function() { - for (var c in that.channels) { - if (that.channels[c] !== null) { - that.channels[c].onclose = ws_closed_late; - } + if (that.ws !== null) { + that.ws.onclose = ws_closed_late; } }, 1000); - this.channels.shell.onmessage = $.proxy(this._handle_shell_reply, this); - this.channels.iopub.onmessage = $.proxy(this._handle_iopub_message, this); - this.channels.stdin.onmessage = $.proxy(this._handle_input_request, this); + this.ws.onmessage = $.proxy(this._handle_ws_message, this); }; Kernel.prototype._ws_opened = function (evt) { /** * Handle a websocket entering the open state, - * signaling that the kernel is connected when all channels are open. + * signaling that the kernel is connected when websocket is open. * * @function _ws_opened */ @@ -522,8 +507,7 @@ define([ Kernel.prototype._ws_closed = function(ws_url, error) { /** - * Handle a websocket entering the closed state. This closes the - * other communication channels if they are open. If the websocket + * Handle a websocket entering the closed state. If the websocket * was not closed due to an error, try to reconnect to the kernel. * * @function _ws_closed @@ -560,27 +544,23 @@ define([ Kernel.prototype.stop_channels = function () { /** - * Close the websocket channels. After successful close, the value - * in `this.channels[channel_name]` will be null. + * Close the websocket. After successful close, the value + * in `this.ws` will be null. * * @function stop_channels */ var that = this; - var close = function (c) { - return function () { - if (that.channels[c] && that.channels[c].readyState === WebSocket.CLOSED) { - that.channels[c] = null; - } - }; + var close = function () { + if (that.ws && that.ws.readyState === WebSocket.CLOSED) { + that.ws = null; + } }; - for (var c in this.channels) { - if ( this.channels[c] !== null ) { - if (this.channels[c].readyState === WebSocket.OPEN) { - this.channels[c].onclose = close(c); - this.channels[c].close(); - } else { - close(c)(); - } + if (this.ws !== null) { + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.onclose = close; + this.ws.close(); + } else { + close(); } } }; @@ -588,20 +568,18 @@ define([ Kernel.prototype.is_connected = function () { /** * Check whether there is a connection to the kernel. This - * function only returns true if all channel objects have been - * created and have a state of WebSocket.OPEN. + * function only returns true if websocket has been + * created and has a state of WebSocket.OPEN. * * @function is_connected * @returns {bool} - whether there is a connection */ - for (var c in this.channels) { - // if any channel is not ready, then we're not connected - if (this.channels[c] === null) { - return false; - } - if (this.channels[c].readyState !== WebSocket.OPEN) { - return false; - } + // if any channel is not ready, then we're not connected + if (this.ws === null) { + return false; + } + if (this.ws.readyState !== WebSocket.OPEN) { + return false; } return true; }; @@ -615,12 +593,7 @@ define([ * @function is_fully_disconnected * @returns {bool} - whether the kernel is fully disconnected */ - for (var c in this.channels) { - if (this.channels[c] === null) { - return true; - } - } - return false; + return (this.ws === null); }; Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) { @@ -633,7 +606,8 @@ define([ throw new Error("kernel is not connected"); } var msg = this._get_msg(msg_type, content, metadata, buffers); - this.channels.shell.send(serialize.serialize(msg)); + msg.channel = 'shell'; + this.ws.send(serialize.serialize(msg)); this.set_callbacks_for_msg(msg.header.msg_id, callbacks); return msg.header.msg_id; }; @@ -784,7 +758,8 @@ define([ }; this.events.trigger('input_reply.Kernel', {kernel: this, content: content}); var msg = this._get_msg("input_reply", content); - this.channels.stdin.send(serialize.serialize(msg)); + msg.channel = 'stdin'; + this.ws.send(serialize.serialize(msg)); return msg.header.msg_id; }; @@ -877,15 +852,28 @@ define([ this.last_msg_callbacks = {}; } }; - - /** - * @function _handle_shell_reply - */ - Kernel.prototype._handle_shell_reply = function (e) { - serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this)); + + Kernel.prototype._handle_ws_message = function (e) { + serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this)); }; - Kernel.prototype._finish_shell_reply = function (reply) { + Kernel.prototype._finish_ws_message = function (msg) { + switch (msg.channel) { + case 'shell': + this._handle_shell_reply(msg); + break; + case 'iopub': + this._handle_iopub_message(msg); + break; + case 'stdin': + this._handle_input_request(msg); + break; + default: + console.error("unrecognized message channel", msg.channel, msg); + } + }; + + Kernel.prototype._handle_shell_reply = function (reply) { this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply}); var content = reply.content; var metadata = reply.metadata; @@ -1030,12 +1018,7 @@ define([ * * @function _handle_iopub_message */ - Kernel.prototype._handle_iopub_message = function (e) { - serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this)); - }; - - - Kernel.prototype._finish_iopub_message = function (msg) { + Kernel.prototype._handle_iopub_message = function (msg) { var handler = this.get_iopub_handler(msg.header.msg_type); if (handler !== undefined) { handler(msg); @@ -1045,12 +1028,7 @@ define([ /** * @function _handle_input_request */ - Kernel.prototype._handle_input_request = function (e) { - serialize.deserialize(e.data, $.proxy(this._finish_input_request, this)); - }; - - - Kernel.prototype._finish_input_request = function (request) { + Kernel.prototype._handle_input_request = function (request) { var header = request.header; var content = request.content; var metadata = request.metadata;