diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 2075f56..8809ed6 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -42,7 +42,7 @@ from IPython.utils import py3compat from IPython.utils.frame import extract_module_locals from IPython.utils.jsonutil import json_clean from IPython.utils.traitlets import ( - Any, Instance, Float, Dict, CaselessStrEnum, List + Any, Instance, Float, Dict, CaselessStrEnum, List, Set ) from entry_point import base_launch_kernel @@ -84,7 +84,7 @@ class Kernel(Configurable): self.shell.init_user_ns() # Private interface - + # Time to sleep after flushing the stdout/err buffers in each execute # cycle. While this introduces a hard limit on the minimal latency of the # execute cycle, it helps prevent output synchronization problems for @@ -109,6 +109,9 @@ class Kernel(Configurable): # This is a dict of port number that the kernel is listening on. It is set # by record_ports and used by connect_request. _recorded_ports = Dict() + + # set of aborted msg_ids + aborted = Set() @@ -179,6 +182,7 @@ class Kernel(Configurable): def dispatch_message(self, socket, idents, msg, handlers): msg_type = msg['header']['msg_type'] + msg_id = msg['header']['msg_id'] # This assert will raise in versions of zeromq 2.0.7 and lesser. # We now require 2.0.8 or above, so we can uncomment for safety. @@ -191,6 +195,17 @@ class Kernel(Configurable): self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) self.log.debug(' Content: %s\n --->\n ', msg['content']) + # check if request has been aborted + if msg_id in self.aborted: + self.aborted.remove(msg_id) + # is it safe to assume a msg_id will not be resubmitted? + reply_type = msg_type.split('_')[0] + '_reply' + status = {'status' : 'aborted'} + reply_msg = self.session.send(socket, reply_type, subheader=status, + content=status, parent=msg, ident=idents) + return + + # Find and call actual handler for message handler = handlers.get(msg_type, None) if handler is None: @@ -390,7 +405,7 @@ class Kernel(Configurable): self.log.debug("%s", reply_msg) if reply_msg['content']['status'] == u'error': - self._abort_queue() + self._abort_queues() self.session.send(self.iopub_socket, u'status', @@ -548,31 +563,6 @@ class Kernel(Configurable): # Control messages #--------------------------------------------------------------------------- - def abort_queues(self): - for socket in self.shell_sockets: - if socket: - self.abort_queue(socket) - - def abort_queue(self, socket): - while True: - idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True) - if msg is None: - return - - self.log.info("Aborting:") - self.log.info("%s", msg) - msg_type = msg['header']['msg_type'] - reply_type = msg_type.split('_')[0] + '_reply' - # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) - # self.reply_socket.send(ident,zmq.SNDMORE) - # self.reply_socket.send_json(reply_msg) - reply_msg = self.session.send(socket, reply_type, - content={'status' : 'aborted'}, parent=msg, ident=idents) - self.log.debug("%s", reply_msg) - # We need to wait a bit for requests to come in. This can probably - # be set shorter for true asynchronous clients. - time.sleep(0.05) - def abort_request(self, socket, ident, parent): """abort a specifig msg by id""" msg_ids = parent['content'].get('msg_ids', None) @@ -600,28 +590,31 @@ class Kernel(Configurable): # Protected interface #--------------------------------------------------------------------------- - def _abort_queue(self): + def _abort_queues(self): + for socket in self.shell_sockets: + if socket: + self._abort_queue(socket) + + def _abort_queue(self, socket): while True: - try: - ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) - except Exception: - self.log.warn("Invalid Message:", exc_info=True) - continue + idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True) if msg is None: - break - else: - assert ident is not None, \ - "Unexpected missing message part." + return - self.log.debug("Aborting:\n%s", msg) + self.log.info("Aborting:") + self.log.info("%s", msg) msg_type = msg['header']['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' - reply_msg = self.session.send(self.shell_socket, reply_type, - {'status' : 'aborted'}, msg, ident=ident) + # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) + # self.reply_socket.send(ident,zmq.SNDMORE) + # self.reply_socket.send_json(reply_msg) + reply_msg = self.session.send(socket, reply_type, + content={'status' : 'aborted'}, parent=msg, ident=idents) self.log.debug("%s", reply_msg) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. - time.sleep(0.1) + time.sleep(0.05) + def _no_raw_input(self): """Raise StdinNotImplentedError if active frontend doesn't support