diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index c6e51a8..41ab511 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -721,13 +721,16 @@ class Hub(SessionFactory): header = msg['header'] engine_uuid = header.get('engine', None) eid = self.by_ident.get(engine_uuid, None) + + status = header.get('status', None) if msg_id in self.pending: self.log.info("task::task %r finished on %s", msg_id, eid) self.pending.remove(msg_id) self.all_completed.add(msg_id) if eid is not None: - self.completed[eid].append(msg_id) + if status != 'aborted': + self.completed[eid].append(msg_id) if msg_id in self.tasks[eid]: self.tasks[eid].remove(msg_id) completed = header['date'] diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 2a80610..192d4bd 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -218,7 +218,9 @@ class Kernel(Configurable): # is it safe to assume a msg_id will not be resubmitted? reply_type = msg_type.split('_')[0] + '_reply' status = {'status' : 'aborted'} - reply_msg = self.session.send(stream, reply_type, subheader=status, + sub = {'engine' : self.ident} + sub.update(status) + reply_msg = self.session.send(stream, reply_type, subheader=sub, content=status, parent=msg, ident=idents) return @@ -279,7 +281,15 @@ class Kernel(Configurable): #--------------------------------------------------------------------------- # Kernel request handlers #--------------------------------------------------------------------------- - + + def _make_subheader(self): + """init subheader dict, for execute/apply_reply""" + return { + 'dependencies_met' : True, + 'engine' : self.ident, + 'started': datetime.now(), + } + def _publish_pyin(self, code, parent, execution_count): """Publish the code request on the pyin stream.""" @@ -305,6 +315,8 @@ class Kernel(Configurable): self.log.error("Got bad msg: ") self.log.error("%s", parent) return + + sub = self._make_subheader() shell = self.shell # we'll need this a lot here @@ -393,8 +405,16 @@ class Kernel(Configurable): # Send the reply. reply_content = json_clean(reply_content) + + sub['status'] = reply_content['status'] + if reply_content['status'] == 'error' and \ + reply_content['ename'] == 'UnmetDependency': + sub['dependencies_met'] = False + reply_msg = self.session.send(stream, u'execute_reply', - reply_content, parent, ident=ident) + reply_content, parent, subheader=sub, + ident=ident) + self.log.debug("%s", reply_msg) if not silent and reply_msg['content']['status'] == u'error': @@ -497,8 +517,7 @@ class Kernel(Configurable): # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_socket.send(pyin_msg) # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) - sub = {'dependencies_met' : True, 'engine' : self.ident, - 'started': datetime.now()} + sub = self._make_subheader() try: # allow for not overriding displayhook if hasattr(sys.displayhook, 'set_parent'): @@ -619,11 +638,12 @@ class Kernel(Configurable): self.log.info("%s", msg) msg_type = msg['header']['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' - # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) - # self.reply_stream.send(ident,zmq.SNDMORE) - # self.reply_stream.send_json(reply_msg) - reply_msg = self.session.send(stream, reply_type, - content={'status' : 'aborted'}, parent=msg, ident=idents) + + status = {'status' : 'aborted'} + sub = {'engine' : self.ident} + sub.update(status) + reply_msg = self.session.send(stream, reply_type, subheader=sub, + content=status, parent=msg, ident=idents) self.log.debug("%s", reply_msg) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients.