##// END OF EJS Templates
fix subheaders for execute_reply and aborted messages...
MinRK -
Show More
@@ -721,13 +721,16 b' class Hub(SessionFactory):'
721 header = msg['header']
721 header = msg['header']
722 engine_uuid = header.get('engine', None)
722 engine_uuid = header.get('engine', None)
723 eid = self.by_ident.get(engine_uuid, None)
723 eid = self.by_ident.get(engine_uuid, None)
724
725 status = header.get('status', None)
724
726
725 if msg_id in self.pending:
727 if msg_id in self.pending:
726 self.log.info("task::task %r finished on %s", msg_id, eid)
728 self.log.info("task::task %r finished on %s", msg_id, eid)
727 self.pending.remove(msg_id)
729 self.pending.remove(msg_id)
728 self.all_completed.add(msg_id)
730 self.all_completed.add(msg_id)
729 if eid is not None:
731 if eid is not None:
730 self.completed[eid].append(msg_id)
732 if status != 'aborted':
733 self.completed[eid].append(msg_id)
731 if msg_id in self.tasks[eid]:
734 if msg_id in self.tasks[eid]:
732 self.tasks[eid].remove(msg_id)
735 self.tasks[eid].remove(msg_id)
733 completed = header['date']
736 completed = header['date']
@@ -218,7 +218,9 b' class Kernel(Configurable):'
218 # is it safe to assume a msg_id will not be resubmitted?
218 # is it safe to assume a msg_id will not be resubmitted?
219 reply_type = msg_type.split('_')[0] + '_reply'
219 reply_type = msg_type.split('_')[0] + '_reply'
220 status = {'status' : 'aborted'}
220 status = {'status' : 'aborted'}
221 reply_msg = self.session.send(stream, reply_type, subheader=status,
221 sub = {'engine' : self.ident}
222 sub.update(status)
223 reply_msg = self.session.send(stream, reply_type, subheader=sub,
222 content=status, parent=msg, ident=idents)
224 content=status, parent=msg, ident=idents)
223 return
225 return
224
226
@@ -279,7 +281,15 b' class Kernel(Configurable):'
279 #---------------------------------------------------------------------------
281 #---------------------------------------------------------------------------
280 # Kernel request handlers
282 # Kernel request handlers
281 #---------------------------------------------------------------------------
283 #---------------------------------------------------------------------------
282
284
285 def _make_subheader(self):
286 """init subheader dict, for execute/apply_reply"""
287 return {
288 'dependencies_met' : True,
289 'engine' : self.ident,
290 'started': datetime.now(),
291 }
292
283 def _publish_pyin(self, code, parent, execution_count):
293 def _publish_pyin(self, code, parent, execution_count):
284 """Publish the code request on the pyin stream."""
294 """Publish the code request on the pyin stream."""
285
295
@@ -305,6 +315,8 b' class Kernel(Configurable):'
305 self.log.error("Got bad msg: ")
315 self.log.error("Got bad msg: ")
306 self.log.error("%s", parent)
316 self.log.error("%s", parent)
307 return
317 return
318
319 sub = self._make_subheader()
308
320
309 shell = self.shell # we'll need this a lot here
321 shell = self.shell # we'll need this a lot here
310
322
@@ -393,8 +405,16 b' class Kernel(Configurable):'
393
405
394 # Send the reply.
406 # Send the reply.
395 reply_content = json_clean(reply_content)
407 reply_content = json_clean(reply_content)
408
409 sub['status'] = reply_content['status']
410 if reply_content['status'] == 'error' and \
411 reply_content['ename'] == 'UnmetDependency':
412 sub['dependencies_met'] = False
413
396 reply_msg = self.session.send(stream, u'execute_reply',
414 reply_msg = self.session.send(stream, u'execute_reply',
397 reply_content, parent, ident=ident)
415 reply_content, parent, subheader=sub,
416 ident=ident)
417
398 self.log.debug("%s", reply_msg)
418 self.log.debug("%s", reply_msg)
399
419
400 if not silent and reply_msg['content']['status'] == u'error':
420 if not silent and reply_msg['content']['status'] == u'error':
@@ -497,8 +517,7 b' class Kernel(Configurable):'
497 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
517 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
498 # self.iopub_socket.send(pyin_msg)
518 # self.iopub_socket.send(pyin_msg)
499 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
519 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
500 sub = {'dependencies_met' : True, 'engine' : self.ident,
520 sub = self._make_subheader()
501 'started': datetime.now()}
502 try:
521 try:
503 # allow for not overriding displayhook
522 # allow for not overriding displayhook
504 if hasattr(sys.displayhook, 'set_parent'):
523 if hasattr(sys.displayhook, 'set_parent'):
@@ -619,11 +638,12 b' class Kernel(Configurable):'
619 self.log.info("%s", msg)
638 self.log.info("%s", msg)
620 msg_type = msg['header']['msg_type']
639 msg_type = msg['header']['msg_type']
621 reply_type = msg_type.split('_')[0] + '_reply'
640 reply_type = msg_type.split('_')[0] + '_reply'
622 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
641
623 # self.reply_stream.send(ident,zmq.SNDMORE)
642 status = {'status' : 'aborted'}
624 # self.reply_stream.send_json(reply_msg)
643 sub = {'engine' : self.ident}
625 reply_msg = self.session.send(stream, reply_type,
644 sub.update(status)
626 content={'status' : 'aborted'}, parent=msg, ident=idents)
645 reply_msg = self.session.send(stream, reply_type, subheader=sub,
646 content=status, parent=msg, ident=idents)
627 self.log.debug("%s", reply_msg)
647 self.log.debug("%s", reply_msg)
628 # We need to wait a bit for requests to come in. This can probably
648 # We need to wait a bit for requests to come in. This can probably
629 # be set shorter for true asynchronous clients.
649 # be set shorter for true asynchronous clients.
General Comments 0
You need to be logged in to leave comments. Login now