Show More
@@ -452,7 +452,7 b' class Controller(object):' | |||||
452 |
|
452 | |||
453 | def save_task_result(self, idents, msg): |
|
453 | def save_task_result(self, idents, msg): | |
454 | """save the result of a completed task.""" |
|
454 | """save the result of a completed task.""" | |
455 |
client_id |
|
455 | client_id = idents[0] | |
456 | try: |
|
456 | try: | |
457 | msg = self.session.unpack_message(msg, content=False) |
|
457 | msg = self.session.unpack_message(msg, content=False) | |
458 | except: |
|
458 | except: | |
@@ -461,19 +461,24 b' class Controller(object):' | |||||
461 | return |
|
461 | return | |
462 |
|
462 | |||
463 | parent = msg['parent_header'] |
|
463 | parent = msg['parent_header'] | |
464 | eid = self.by_ident[engine_uuid] |
|
|||
465 | if not parent: |
|
464 | if not parent: | |
466 | # print msg |
|
465 | # print msg | |
467 |
|
|
466 | logger.warn("Task %r had no parent!"%msg) | |
468 | return |
|
467 | return | |
469 | msg_id = parent['msg_id'] |
|
468 | msg_id = parent['msg_id'] | |
470 | self.results[msg_id] = msg |
|
469 | self.results[msg_id] = msg | |
471 | if msg_id in self.pending and msg_id in self.tasks[eid]: |
|
470 | ||
|
471 | header = msg['header'] | |||
|
472 | engine_uuid = header.get('engine', None) | |||
|
473 | eid = self.by_ident.get(engine_uuid, None) | |||
|
474 | ||||
|
475 | if msg_id in self.pending: | |||
472 | self.pending.pop(msg_id) |
|
476 | self.pending.pop(msg_id) | |
473 | if msg_id in self.mia: |
|
477 | if msg_id in self.mia: | |
474 | self.mia.remove(msg_id) |
|
478 | self.mia.remove(msg_id) | |
475 | self.completed[eid].append(msg_id) |
|
479 | if eid is not None and msg_id in self.tasks[eid]: | |
476 |
self. |
|
480 | self.completed[eid].append(msg_id) | |
|
481 | self.tasks[eid].remove(msg_id) | |||
477 | else: |
|
482 | else: | |
478 | logger.debug("task::unknown task %s finished"%msg_id) |
|
483 | logger.debug("task::unknown task %s finished"%msg_id) | |
479 |
|
484 | |||
@@ -539,16 +544,28 b' class Controller(object):' | |||||
539 | content.update(self.engine_addrs) |
|
544 | content.update(self.engine_addrs) | |
540 | # check if requesting available IDs: |
|
545 | # check if requesting available IDs: | |
541 | if queue in self.by_ident: |
|
546 | if queue in self.by_ident: | |
542 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} |
|
547 | try: | |
|
548 | raise KeyError("queue_id %r in use"%queue) | |||
|
549 | except: | |||
|
550 | content = wrap_exception() | |||
543 | elif heart in self.hearts: # need to check unique hearts? |
|
551 | elif heart in self.hearts: # need to check unique hearts? | |
544 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} |
|
552 | try: | |
|
553 | raise KeyError("heart_id %r in use"%heart) | |||
|
554 | except: | |||
|
555 | content = wrap_exception() | |||
545 | else: |
|
556 | else: | |
546 | for h, pack in self.incoming_registrations.iteritems(): |
|
557 | for h, pack in self.incoming_registrations.iteritems(): | |
547 | if heart == h: |
|
558 | if heart == h: | |
548 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} |
|
559 | try: | |
|
560 | raise KeyError("heart_id %r in use"%heart) | |||
|
561 | except: | |||
|
562 | content = wrap_exception() | |||
549 | break |
|
563 | break | |
550 | elif queue == pack[1]: |
|
564 | elif queue == pack[1]: | |
551 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} |
|
565 | try: | |
|
566 | raise KeyError("queue_id %r in use"%queue) | |||
|
567 | except: | |||
|
568 | content = wrap_exception() | |||
552 | break |
|
569 | break | |
553 |
|
570 | |||
554 | msg = self.session.send(self.registrar, "registration_reply", |
|
571 | msg = self.session.send(self.registrar, "registration_reply", | |
@@ -566,7 +583,7 b' class Controller(object):' | |||||
566 | dc.start() |
|
583 | dc.start() | |
567 | self.incoming_registrations[heart] = (eid,queue,reg,dc) |
|
584 | self.incoming_registrations[heart] = (eid,queue,reg,dc) | |
568 | else: |
|
585 | else: | |
569 |
logger.error("registration::registration %i failed: %s"%(eid, content[' |
|
586 | logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) | |
570 | return eid |
|
587 | return eid | |
571 |
|
588 | |||
572 | def unregister_engine(self, ident, msg): |
|
589 | def unregister_engine(self, ident, msg): | |
@@ -688,14 +705,23 b' class Controller(object):' | |||||
688 | self.results.pop(msg_id) |
|
705 | self.results.pop(msg_id) | |
689 | else: |
|
706 | else: | |
690 | if msg_id in self.pending: |
|
707 | if msg_id in self.pending: | |
691 | reply = dict(status='error', reason="msg pending: %r"%msg_id) |
|
708 | try: | |
|
709 | raise IndexError("msg pending: %r"%msg_id) | |||
|
710 | except: | |||
|
711 | reply = wrap_exception() | |||
692 | else: |
|
712 | else: | |
693 | reply = dict(status='error', reason="No such msg: %r"%msg_id) |
|
713 | try: | |
|
714 | raise IndexError("No such msg: %r"%msg_id) | |||
|
715 | except: | |||
|
716 | reply = wrap_exception() | |||
694 | break |
|
717 | break | |
695 | eids = content.get('engine_ids', []) |
|
718 | eids = content.get('engine_ids', []) | |
696 | for eid in eids: |
|
719 | for eid in eids: | |
697 | if eid not in self.engines: |
|
720 | if eid not in self.engines: | |
698 | reply = dict(status='error', reason="No such engine: %i"%eid) |
|
721 | try: | |
|
722 | raise IndexError("No such engine: %i"%eid) | |||
|
723 | except: | |||
|
724 | reply = wrap_exception() | |||
699 | break |
|
725 | break | |
700 | msg_ids = self.completed.pop(eid) |
|
726 | msg_ids = self.completed.pop(eid) | |
701 | for msg_id in msg_ids: |
|
727 | for msg_id in msg_ids: | |
@@ -725,8 +751,10 b' class Controller(object):' | |||||
725 | if not statusonly: |
|
751 | if not statusonly: | |
726 | content[msg_id] = self.results[msg_id]['content'] |
|
752 | content[msg_id] = self.results[msg_id]['content'] | |
727 | else: |
|
753 | else: | |
728 | content = dict(status='error') |
|
754 | try: | |
729 |
|
|
755 | raise KeyError('No such message: '+msg_id) | |
|
756 | except: | |||
|
757 | content = wrap_exception() | |||
730 | break |
|
758 | break | |
731 | self.session.send(self.clientele, "result_reply", content=content, |
|
759 | self.session.send(self.clientele, "result_reply", content=content, | |
732 | parent=msg, ident=client_id) |
|
760 | parent=msg, ident=client_id) |
@@ -143,6 +143,7 b' class Kernel(object):' | |||||
143 | self.control_stream = control_stream |
|
143 | self.control_stream = control_stream | |
144 | # self.control_socket = control_stream.socket |
|
144 | # self.control_socket = control_stream.socket | |
145 | self.reply_stream = reply_stream |
|
145 | self.reply_stream = reply_stream | |
|
146 | self.identity = self.reply_stream.getsockopt(zmq.IDENTITY) | |||
146 | self.task_stream = task_stream |
|
147 | self.task_stream = task_stream | |
147 | self.pub_stream = pub_stream |
|
148 | self.pub_stream = pub_stream | |
148 | self.client = client |
|
149 | self.client = client | |
@@ -158,7 +159,7 b' class Kernel(object):' | |||||
158 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: |
|
159 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: | |
159 | self.queue_handlers[msg_type] = getattr(self, msg_type) |
|
160 | self.queue_handlers[msg_type] = getattr(self, msg_type) | |
160 |
|
161 | |||
161 | for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys(): |
|
162 | for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys(): | |
162 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
163 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
163 |
|
164 | |||
164 | #-------------------- control handlers ----------------------------- |
|
165 | #-------------------- control handlers ----------------------------- | |
@@ -214,7 +215,7 b' class Kernel(object):' | |||||
214 | print(Message(reply_msg), file=sys.__stdout__) |
|
215 | print(Message(reply_msg), file=sys.__stdout__) | |
215 |
|
216 | |||
216 | def kill_request(self, stream, idents, parent): |
|
217 | def kill_request(self, stream, idents, parent): | |
217 |
"""kill oursel |
|
218 | """kill ourself. This should really be handled in an external process""" | |
218 | self.abort_queues() |
|
219 | self.abort_queues() | |
219 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, |
|
220 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, | |
220 | content = dict(status='ok')) |
|
221 | content = dict(status='ok')) | |
@@ -225,6 +226,12 b' class Kernel(object):' | |||||
225 | time.sleep(1) |
|
226 | time.sleep(1) | |
226 | os.kill(os.getpid(), SIGKILL) |
|
227 | os.kill(os.getpid(), SIGKILL) | |
227 |
|
228 | |||
|
229 | def clear_request(self, stream, idents, parent): | |||
|
230 | """Clear our namespace.""" | |||
|
231 | self.user_ns = {} | |||
|
232 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |||
|
233 | content = dict(status='ok')) | |||
|
234 | ||||
228 | def dispatch_control(self, msg): |
|
235 | def dispatch_control(self, msg): | |
229 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
236 | idents,msg = self.session.feed_identities(msg, copy=False) | |
230 | msg = self.session.unpack_message(msg, content=True, copy=False) |
|
237 | msg = self.session.unpack_message(msg, content=True, copy=False) | |
@@ -330,7 +337,7 b' class Kernel(object):' | |||||
330 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
337 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
331 | # self.pub_stream.send(pyin_msg) |
|
338 | # self.pub_stream.send(pyin_msg) | |
332 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
339 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
333 | sub = {'dependencies_met' : True} |
|
340 | sub = {'dependencies_met' : True, 'engine' : self.identity} | |
334 | try: |
|
341 | try: | |
335 | # allow for not overriding displayhook |
|
342 | # allow for not overriding displayhook | |
336 | if hasattr(sys.displayhook, 'set_parent'): |
|
343 | if hasattr(sys.displayhook, 'set_parent'): | |
@@ -382,7 +389,7 b' class Kernel(object):' | |||||
382 | result_buf = [] |
|
389 | result_buf = [] | |
383 |
|
390 | |||
384 | if etype is UnmetDependency: |
|
391 | if etype is UnmetDependency: | |
385 |
sub = |
|
392 | sub['dependencies_met'] = False | |
386 | else: |
|
393 | else: | |
387 | reply_content = {'status' : 'ok'} |
|
394 | reply_content = {'status' : 'ok'} | |
388 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
395 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
General Comments 0
You need to be logged in to leave comments.
Login now