##// END OF EJS Templates
use wrap_exception in controller, fix clear on kernel
MinRK -
Show More
@@ -452,7 +452,7 b' class Controller(object):'
452
452
453 def save_task_result(self, idents, msg):
453 def save_task_result(self, idents, msg):
454 """save the result of a completed task."""
454 """save the result of a completed task."""
455 client_id, engine_uuid = idents[:2]
455 client_id = idents[0]
456 try:
456 try:
457 msg = self.session.unpack_message(msg, content=False)
457 msg = self.session.unpack_message(msg, content=False)
458 except:
458 except:
@@ -461,19 +461,24 b' class Controller(object):'
461 return
461 return
462
462
463 parent = msg['parent_header']
463 parent = msg['parent_header']
464 eid = self.by_ident[engine_uuid]
465 if not parent:
464 if not parent:
466 # print msg
465 # print msg
467 # logger.warn("")
466 logger.warn("Task %r had no parent!"%msg)
468 return
467 return
469 msg_id = parent['msg_id']
468 msg_id = parent['msg_id']
470 self.results[msg_id] = msg
469 self.results[msg_id] = msg
471 if msg_id in self.pending and msg_id in self.tasks[eid]:
470
471 header = msg['header']
472 engine_uuid = header.get('engine', None)
473 eid = self.by_ident.get(engine_uuid, None)
474
475 if msg_id in self.pending:
472 self.pending.pop(msg_id)
476 self.pending.pop(msg_id)
473 if msg_id in self.mia:
477 if msg_id in self.mia:
474 self.mia.remove(msg_id)
478 self.mia.remove(msg_id)
475 self.completed[eid].append(msg_id)
479 if eid is not None and msg_id in self.tasks[eid]:
476 self.tasks[eid].remove(msg_id)
480 self.completed[eid].append(msg_id)
481 self.tasks[eid].remove(msg_id)
477 else:
482 else:
478 logger.debug("task::unknown task %s finished"%msg_id)
483 logger.debug("task::unknown task %s finished"%msg_id)
479
484
@@ -539,16 +544,28 b' class Controller(object):'
539 content.update(self.engine_addrs)
544 content.update(self.engine_addrs)
540 # check if requesting available IDs:
545 # check if requesting available IDs:
541 if queue in self.by_ident:
546 if queue in self.by_ident:
542 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
547 try:
548 raise KeyError("queue_id %r in use"%queue)
549 except:
550 content = wrap_exception()
543 elif heart in self.hearts: # need to check unique hearts?
551 elif heart in self.hearts: # need to check unique hearts?
544 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
552 try:
553 raise KeyError("heart_id %r in use"%heart)
554 except:
555 content = wrap_exception()
545 else:
556 else:
546 for h, pack in self.incoming_registrations.iteritems():
557 for h, pack in self.incoming_registrations.iteritems():
547 if heart == h:
558 if heart == h:
548 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
559 try:
560 raise KeyError("heart_id %r in use"%heart)
561 except:
562 content = wrap_exception()
549 break
563 break
550 elif queue == pack[1]:
564 elif queue == pack[1]:
551 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
565 try:
566 raise KeyError("queue_id %r in use"%queue)
567 except:
568 content = wrap_exception()
552 break
569 break
553
570
554 msg = self.session.send(self.registrar, "registration_reply",
571 msg = self.session.send(self.registrar, "registration_reply",
@@ -566,7 +583,7 b' class Controller(object):'
566 dc.start()
583 dc.start()
567 self.incoming_registrations[heart] = (eid,queue,reg,dc)
584 self.incoming_registrations[heart] = (eid,queue,reg,dc)
568 else:
585 else:
569 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
586 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
570 return eid
587 return eid
571
588
572 def unregister_engine(self, ident, msg):
589 def unregister_engine(self, ident, msg):
@@ -688,14 +705,23 b' class Controller(object):'
688 self.results.pop(msg_id)
705 self.results.pop(msg_id)
689 else:
706 else:
690 if msg_id in self.pending:
707 if msg_id in self.pending:
691 reply = dict(status='error', reason="msg pending: %r"%msg_id)
708 try:
709 raise IndexError("msg pending: %r"%msg_id)
710 except:
711 reply = wrap_exception()
692 else:
712 else:
693 reply = dict(status='error', reason="No such msg: %r"%msg_id)
713 try:
714 raise IndexError("No such msg: %r"%msg_id)
715 except:
716 reply = wrap_exception()
694 break
717 break
695 eids = content.get('engine_ids', [])
718 eids = content.get('engine_ids', [])
696 for eid in eids:
719 for eid in eids:
697 if eid not in self.engines:
720 if eid not in self.engines:
698 reply = dict(status='error', reason="No such engine: %i"%eid)
721 try:
722 raise IndexError("No such engine: %i"%eid)
723 except:
724 reply = wrap_exception()
699 break
725 break
700 msg_ids = self.completed.pop(eid)
726 msg_ids = self.completed.pop(eid)
701 for msg_id in msg_ids:
727 for msg_id in msg_ids:
@@ -725,8 +751,10 b' class Controller(object):'
725 if not statusonly:
751 if not statusonly:
726 content[msg_id] = self.results[msg_id]['content']
752 content[msg_id] = self.results[msg_id]['content']
727 else:
753 else:
728 content = dict(status='error')
754 try:
729 content['reason'] = 'no such message: '+msg_id
755 raise KeyError('No such message: '+msg_id)
756 except:
757 content = wrap_exception()
730 break
758 break
731 self.session.send(self.clientele, "result_reply", content=content,
759 self.session.send(self.clientele, "result_reply", content=content,
732 parent=msg, ident=client_id)
760 parent=msg, ident=client_id)
@@ -143,6 +143,7 b' class Kernel(object):'
143 self.control_stream = control_stream
143 self.control_stream = control_stream
144 # self.control_socket = control_stream.socket
144 # self.control_socket = control_stream.socket
145 self.reply_stream = reply_stream
145 self.reply_stream = reply_stream
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
146 self.task_stream = task_stream
147 self.task_stream = task_stream
147 self.pub_stream = pub_stream
148 self.pub_stream = pub_stream
148 self.client = client
149 self.client = client
@@ -158,7 +159,7 b' class Kernel(object):'
158 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
159 self.queue_handlers[msg_type] = getattr(self, msg_type)
160 self.queue_handlers[msg_type] = getattr(self, msg_type)
160
161
161 for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
162 self.control_handlers[msg_type] = getattr(self, msg_type)
163 self.control_handlers[msg_type] = getattr(self, msg_type)
163
164
164 #-------------------- control handlers -----------------------------
165 #-------------------- control handlers -----------------------------
@@ -214,7 +215,7 b' class Kernel(object):'
214 print(Message(reply_msg), file=sys.__stdout__)
215 print(Message(reply_msg), file=sys.__stdout__)
215
216
216 def kill_request(self, stream, idents, parent):
217 def kill_request(self, stream, idents, parent):
217 """kill ourselves. This should really be handled in an external process"""
218 """kill ourself. This should really be handled in an external process"""
218 self.abort_queues()
219 self.abort_queues()
219 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
220 content = dict(status='ok'))
221 content = dict(status='ok'))
@@ -225,6 +226,12 b' class Kernel(object):'
225 time.sleep(1)
226 time.sleep(1)
226 os.kill(os.getpid(), SIGKILL)
227 os.kill(os.getpid(), SIGKILL)
227
228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
234
228 def dispatch_control(self, msg):
235 def dispatch_control(self, msg):
229 idents,msg = self.session.feed_identities(msg, copy=False)
236 idents,msg = self.session.feed_identities(msg, copy=False)
230 msg = self.session.unpack_message(msg, content=True, copy=False)
237 msg = self.session.unpack_message(msg, content=True, copy=False)
@@ -330,7 +337,7 b' class Kernel(object):'
330 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
337 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
331 # self.pub_stream.send(pyin_msg)
338 # self.pub_stream.send(pyin_msg)
332 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
333 sub = {'dependencies_met' : True}
340 sub = {'dependencies_met' : True, 'engine' : self.identity}
334 try:
341 try:
335 # allow for not overriding displayhook
342 # allow for not overriding displayhook
336 if hasattr(sys.displayhook, 'set_parent'):
343 if hasattr(sys.displayhook, 'set_parent'):
@@ -382,7 +389,7 b' class Kernel(object):'
382 result_buf = []
389 result_buf = []
383
390
384 if etype is UnmetDependency:
391 if etype is UnmetDependency:
385 sub = {'dependencies_met' : False}
392 sub['dependencies_met'] = False
386 else:
393 else:
387 reply_content = {'status' : 'ok'}
394 reply_content = {'status' : 'ok'}
388 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
395 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
General Comments 0
You need to be logged in to leave comments. Login now