##// END OF EJS Templates
use wrap_exception in controller, fix clear on kernel
MinRK -
Show More
@@ -452,7 +452,7 b' class Controller(object):'
452 452
453 453 def save_task_result(self, idents, msg):
454 454 """save the result of a completed task."""
455 client_id, engine_uuid = idents[:2]
455 client_id = idents[0]
456 456 try:
457 457 msg = self.session.unpack_message(msg, content=False)
458 458 except:
@@ -461,17 +461,22 b' class Controller(object):'
461 461 return
462 462
463 463 parent = msg['parent_header']
464 eid = self.by_ident[engine_uuid]
465 464 if not parent:
466 465 # print msg
467 # logger.warn("")
466 logger.warn("Task %r had no parent!"%msg)
468 467 return
469 468 msg_id = parent['msg_id']
470 469 self.results[msg_id] = msg
471 if msg_id in self.pending and msg_id in self.tasks[eid]:
470
471 header = msg['header']
472 engine_uuid = header.get('engine', None)
473 eid = self.by_ident.get(engine_uuid, None)
474
475 if msg_id in self.pending:
472 476 self.pending.pop(msg_id)
473 477 if msg_id in self.mia:
474 478 self.mia.remove(msg_id)
479 if eid is not None and msg_id in self.tasks[eid]:
475 480 self.completed[eid].append(msg_id)
476 481 self.tasks[eid].remove(msg_id)
477 482 else:
@@ -539,16 +544,28 b' class Controller(object):'
539 544 content.update(self.engine_addrs)
540 545 # check if requesting available IDs:
541 546 if queue in self.by_ident:
542 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
547 try:
548 raise KeyError("queue_id %r in use"%queue)
549 except:
550 content = wrap_exception()
543 551 elif heart in self.hearts: # need to check unique hearts?
544 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
552 try:
553 raise KeyError("heart_id %r in use"%heart)
554 except:
555 content = wrap_exception()
545 556 else:
546 557 for h, pack in self.incoming_registrations.iteritems():
547 558 if heart == h:
548 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
559 try:
560 raise KeyError("heart_id %r in use"%heart)
561 except:
562 content = wrap_exception()
549 563 break
550 564 elif queue == pack[1]:
551 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
565 try:
566 raise KeyError("queue_id %r in use"%queue)
567 except:
568 content = wrap_exception()
552 569 break
553 570
554 571 msg = self.session.send(self.registrar, "registration_reply",
@@ -566,7 +583,7 b' class Controller(object):'
566 583 dc.start()
567 584 self.incoming_registrations[heart] = (eid,queue,reg,dc)
568 585 else:
569 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
586 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
570 587 return eid
571 588
572 589 def unregister_engine(self, ident, msg):
@@ -688,14 +705,23 b' class Controller(object):'
688 705 self.results.pop(msg_id)
689 706 else:
690 707 if msg_id in self.pending:
691 reply = dict(status='error', reason="msg pending: %r"%msg_id)
708 try:
709 raise IndexError("msg pending: %r"%msg_id)
710 except:
711 reply = wrap_exception()
692 712 else:
693 reply = dict(status='error', reason="No such msg: %r"%msg_id)
713 try:
714 raise IndexError("No such msg: %r"%msg_id)
715 except:
716 reply = wrap_exception()
694 717 break
695 718 eids = content.get('engine_ids', [])
696 719 for eid in eids:
697 720 if eid not in self.engines:
698 reply = dict(status='error', reason="No such engine: %i"%eid)
721 try:
722 raise IndexError("No such engine: %i"%eid)
723 except:
724 reply = wrap_exception()
699 725 break
700 726 msg_ids = self.completed.pop(eid)
701 727 for msg_id in msg_ids:
@@ -725,8 +751,10 b' class Controller(object):'
725 751 if not statusonly:
726 752 content[msg_id] = self.results[msg_id]['content']
727 753 else:
728 content = dict(status='error')
729 content['reason'] = 'no such message: '+msg_id
754 try:
755 raise KeyError('No such message: '+msg_id)
756 except:
757 content = wrap_exception()
730 758 break
731 759 self.session.send(self.clientele, "result_reply", content=content,
732 760 parent=msg, ident=client_id)
@@ -143,6 +143,7 b' class Kernel(object):'
143 143 self.control_stream = control_stream
144 144 # self.control_socket = control_stream.socket
145 145 self.reply_stream = reply_stream
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
146 147 self.task_stream = task_stream
147 148 self.pub_stream = pub_stream
148 149 self.client = client
@@ -158,7 +159,7 b' class Kernel(object):'
158 159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
159 160 self.queue_handlers[msg_type] = getattr(self, msg_type)
160 161
161 for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
162 163 self.control_handlers[msg_type] = getattr(self, msg_type)
163 164
164 165 #-------------------- control handlers -----------------------------
@@ -214,7 +215,7 b' class Kernel(object):'
214 215 print(Message(reply_msg), file=sys.__stdout__)
215 216
216 217 def kill_request(self, stream, idents, parent):
217 """kill ourselves. This should really be handled in an external process"""
218 """kill ourself. This should really be handled in an external process"""
218 219 self.abort_queues()
219 220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
220 221 content = dict(status='ok'))
@@ -225,6 +226,12 b' class Kernel(object):'
225 226 time.sleep(1)
226 227 os.kill(os.getpid(), SIGKILL)
227 228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
234
228 235 def dispatch_control(self, msg):
229 236 idents,msg = self.session.feed_identities(msg, copy=False)
230 237 msg = self.session.unpack_message(msg, content=True, copy=False)
@@ -330,7 +337,7 b' class Kernel(object):'
330 337 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
331 338 # self.pub_stream.send(pyin_msg)
332 339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
333 sub = {'dependencies_met' : True}
340 sub = {'dependencies_met' : True, 'engine' : self.identity}
334 341 try:
335 342 # allow for not overriding displayhook
336 343 if hasattr(sys.displayhook, 'set_parent'):
@@ -382,7 +389,7 b' class Kernel(object):'
382 389 result_buf = []
383 390
384 391 if etype is UnmetDependency:
385 sub = {'dependencies_met' : False}
392 sub['dependencies_met'] = False
386 393 else:
387 394 reply_content = {'status' : 'ok'}
388 395 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
General Comments 0
You need to be logged in to leave comments. Login now