##// END OF EJS Templates
better handle aborted/unschedulers tasks
MinRK -
Show More
@@ -307,7 +307,7 b' class Hub(LoggingFactory):'
307 completed=Dict() # completed msg_ids keyed by engine_id
307 completed=Dict() # completed msg_ids keyed by engine_id
308 all_completed=Set() # completed msg_ids keyed by engine_id
308 all_completed=Set() # completed msg_ids keyed by engine_id
309 dead_engines=Set() # completed msg_ids keyed by engine_id
309 dead_engines=Set() # completed msg_ids keyed by engine_id
310 # mia=None
310 unassigned=Set() # set of task msg_ds not yet assigned a destination
311 incoming_registrations=Dict()
311 incoming_registrations=Dict()
312 registration_timeout=Int()
312 registration_timeout=Int()
313 _idcounter=Int(0)
313 _idcounter=Int(0)
@@ -640,6 +640,7 b' class Hub(LoggingFactory):'
640 header = msg['header']
640 header = msg['header']
641 msg_id = header['msg_id']
641 msg_id = header['msg_id']
642 self.pending.add(msg_id)
642 self.pending.add(msg_id)
643 self.unassigned.add(msg_id)
643 try:
644 try:
644 # it's posible iopub arrived first:
645 # it's posible iopub arrived first:
645 existing = self.db.get_record(msg_id)
646 existing = self.db.get_record(msg_id)
@@ -670,6 +671,8 b' class Hub(LoggingFactory):'
670 self.log.warn("Task %r had no parent!"%msg)
671 self.log.warn("Task %r had no parent!"%msg)
671 return
672 return
672 msg_id = parent['msg_id']
673 msg_id = parent['msg_id']
674 if msg_id in self.unassigned:
675 self.unassigned.remove(msg_id)
673
676
674 header = msg['header']
677 header = msg['header']
675 engine_uuid = header.get('engine', None)
678 engine_uuid = header.get('engine', None)
@@ -713,8 +716,8 b' class Hub(LoggingFactory):'
713 eid = self.by_ident[engine_uuid]
716 eid = self.by_ident[engine_uuid]
714
717
715 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
718 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
716 # if msg_id in self.mia:
719 if msg_id in self.unassigned:
717 # self.mia.remove(msg_id)
720 self.unassigned.remove(msg_id)
718 # else:
721 # else:
719 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
722 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
720
723
@@ -1003,7 +1006,8 b' class Hub(LoggingFactory):'
1003 completed = len(completed)
1006 completed = len(completed)
1004 tasks = len(tasks)
1007 tasks = len(tasks)
1005 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1008 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1006 # pending
1009 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1010
1007 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1011 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1008
1012
1009 def purge_results(self, client_id, msg):
1013 def purge_results(self, client_id, msg):
@@ -380,8 +380,9 b' class Kernel(SessionFactory):'
380 self.aborted.remove(msg_id)
380 self.aborted.remove(msg_id)
381 # is it safe to assume a msg_id will not be resubmitted?
381 # is it safe to assume a msg_id will not be resubmitted?
382 reply_type = msg['msg_type'].split('_')[0] + '_reply'
382 reply_type = msg['msg_type'].split('_')[0] + '_reply'
383 reply_msg = self.session.send(stream, reply_type,
383 status = {'status' : 'aborted'}
384 content={'status' : 'aborted'}, parent=msg, ident=idents)
384 reply_msg = self.session.send(stream, reply_type, subheader=status,
385 content=status, parent=msg, ident=idents)
385 return
386 return
386 handler = self.shell_handlers.get(msg['msg_type'], None)
387 handler = self.shell_handlers.get(msg['msg_type'], None)
387 if handler is None:
388 if handler is None:
General Comments 0
You need to be logged in to leave comments. Login now