Show More
@@ -452,7 +452,7 b' class Controller(object):' | |||
|
452 | 452 | |
|
453 | 453 | def save_task_result(self, idents, msg): |
|
454 | 454 | """save the result of a completed task.""" |
|
455 |
client_id |
|
|
455 | client_id = idents[0] | |
|
456 | 456 | try: |
|
457 | 457 | msg = self.session.unpack_message(msg, content=False) |
|
458 | 458 | except: |
@@ -461,19 +461,24 b' class Controller(object):' | |||
|
461 | 461 | return |
|
462 | 462 | |
|
463 | 463 | parent = msg['parent_header'] |
|
464 | eid = self.by_ident[engine_uuid] | |
|
465 | 464 | if not parent: |
|
466 | 465 | # print msg |
|
467 |
|
|
|
466 | logger.warn("Task %r had no parent!"%msg) | |
|
468 | 467 | return |
|
469 | 468 | msg_id = parent['msg_id'] |
|
470 | 469 | self.results[msg_id] = msg |
|
471 | if msg_id in self.pending and msg_id in self.tasks[eid]: | |
|
470 | ||
|
471 | header = msg['header'] | |
|
472 | engine_uuid = header.get('engine', None) | |
|
473 | eid = self.by_ident.get(engine_uuid, None) | |
|
474 | ||
|
475 | if msg_id in self.pending: | |
|
472 | 476 | self.pending.pop(msg_id) |
|
473 | 477 | if msg_id in self.mia: |
|
474 | 478 | self.mia.remove(msg_id) |
|
475 | self.completed[eid].append(msg_id) | |
|
476 |
self. |
|
|
479 | if eid is not None and msg_id in self.tasks[eid]: | |
|
480 | self.completed[eid].append(msg_id) | |
|
481 | self.tasks[eid].remove(msg_id) | |
|
477 | 482 | else: |
|
478 | 483 | logger.debug("task::unknown task %s finished"%msg_id) |
|
479 | 484 | |
@@ -539,16 +544,28 b' class Controller(object):' | |||
|
539 | 544 | content.update(self.engine_addrs) |
|
540 | 545 | # check if requesting available IDs: |
|
541 | 546 | if queue in self.by_ident: |
|
542 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |
|
547 | try: | |
|
548 | raise KeyError("queue_id %r in use"%queue) | |
|
549 | except: | |
|
550 | content = wrap_exception() | |
|
543 | 551 | elif heart in self.hearts: # need to check unique hearts? |
|
544 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |
|
552 | try: | |
|
553 | raise KeyError("heart_id %r in use"%heart) | |
|
554 | except: | |
|
555 | content = wrap_exception() | |
|
545 | 556 | else: |
|
546 | 557 | for h, pack in self.incoming_registrations.iteritems(): |
|
547 | 558 | if heart == h: |
|
548 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |
|
559 | try: | |
|
560 | raise KeyError("heart_id %r in use"%heart) | |
|
561 | except: | |
|
562 | content = wrap_exception() | |
|
549 | 563 | break |
|
550 | 564 | elif queue == pack[1]: |
|
551 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |
|
565 | try: | |
|
566 | raise KeyError("queue_id %r in use"%queue) | |
|
567 | except: | |
|
568 | content = wrap_exception() | |
|
552 | 569 | break |
|
553 | 570 | |
|
554 | 571 | msg = self.session.send(self.registrar, "registration_reply", |
@@ -566,7 +583,7 b' class Controller(object):' | |||
|
566 | 583 | dc.start() |
|
567 | 584 | self.incoming_registrations[heart] = (eid,queue,reg,dc) |
|
568 | 585 | else: |
|
569 |
logger.error("registration::registration %i failed: %s"%(eid, content[' |
|
|
586 | logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) | |
|
570 | 587 | return eid |
|
571 | 588 | |
|
572 | 589 | def unregister_engine(self, ident, msg): |
@@ -688,14 +705,23 b' class Controller(object):' | |||
|
688 | 705 | self.results.pop(msg_id) |
|
689 | 706 | else: |
|
690 | 707 | if msg_id in self.pending: |
|
691 | reply = dict(status='error', reason="msg pending: %r"%msg_id) | |
|
708 | try: | |
|
709 | raise IndexError("msg pending: %r"%msg_id) | |
|
710 | except: | |
|
711 | reply = wrap_exception() | |
|
692 | 712 | else: |
|
693 | reply = dict(status='error', reason="No such msg: %r"%msg_id) | |
|
713 | try: | |
|
714 | raise IndexError("No such msg: %r"%msg_id) | |
|
715 | except: | |
|
716 | reply = wrap_exception() | |
|
694 | 717 | break |
|
695 | 718 | eids = content.get('engine_ids', []) |
|
696 | 719 | for eid in eids: |
|
697 | 720 | if eid not in self.engines: |
|
698 | reply = dict(status='error', reason="No such engine: %i"%eid) | |
|
721 | try: | |
|
722 | raise IndexError("No such engine: %i"%eid) | |
|
723 | except: | |
|
724 | reply = wrap_exception() | |
|
699 | 725 | break |
|
700 | 726 | msg_ids = self.completed.pop(eid) |
|
701 | 727 | for msg_id in msg_ids: |
@@ -725,8 +751,10 b' class Controller(object):' | |||
|
725 | 751 | if not statusonly: |
|
726 | 752 | content[msg_id] = self.results[msg_id]['content'] |
|
727 | 753 | else: |
|
728 | content = dict(status='error') | |
|
729 |
|
|
|
754 | try: | |
|
755 | raise KeyError('No such message: '+msg_id) | |
|
756 | except: | |
|
757 | content = wrap_exception() | |
|
730 | 758 | break |
|
731 | 759 | self.session.send(self.clientele, "result_reply", content=content, |
|
732 | 760 | parent=msg, ident=client_id) |
@@ -143,6 +143,7 b' class Kernel(object):' | |||
|
143 | 143 | self.control_stream = control_stream |
|
144 | 144 | # self.control_socket = control_stream.socket |
|
145 | 145 | self.reply_stream = reply_stream |
|
146 | self.identity = self.reply_stream.getsockopt(zmq.IDENTITY) | |
|
146 | 147 | self.task_stream = task_stream |
|
147 | 148 | self.pub_stream = pub_stream |
|
148 | 149 | self.client = client |
@@ -158,7 +159,7 b' class Kernel(object):' | |||
|
158 | 159 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: |
|
159 | 160 | self.queue_handlers[msg_type] = getattr(self, msg_type) |
|
160 | 161 | |
|
161 | for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys(): | |
|
162 | for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys(): | |
|
162 | 163 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
163 | 164 | |
|
164 | 165 | #-------------------- control handlers ----------------------------- |
@@ -214,7 +215,7 b' class Kernel(object):' | |||
|
214 | 215 | print(Message(reply_msg), file=sys.__stdout__) |
|
215 | 216 | |
|
216 | 217 | def kill_request(self, stream, idents, parent): |
|
217 |
"""kill oursel |
|
|
218 | """kill ourself. This should really be handled in an external process""" | |
|
218 | 219 | self.abort_queues() |
|
219 | 220 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, |
|
220 | 221 | content = dict(status='ok')) |
@@ -225,6 +226,12 b' class Kernel(object):' | |||
|
225 | 226 | time.sleep(1) |
|
226 | 227 | os.kill(os.getpid(), SIGKILL) |
|
227 | 228 | |
|
229 | def clear_request(self, stream, idents, parent): | |
|
230 | """Clear our namespace.""" | |
|
231 | self.user_ns = {} | |
|
232 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |
|
233 | content = dict(status='ok')) | |
|
234 | ||
|
228 | 235 | def dispatch_control(self, msg): |
|
229 | 236 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
230 | 237 | msg = self.session.unpack_message(msg, content=True, copy=False) |
@@ -330,7 +337,7 b' class Kernel(object):' | |||
|
330 | 337 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
331 | 338 | # self.pub_stream.send(pyin_msg) |
|
332 | 339 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
333 | sub = {'dependencies_met' : True} | |
|
340 | sub = {'dependencies_met' : True, 'engine' : self.identity} | |
|
334 | 341 | try: |
|
335 | 342 | # allow for not overriding displayhook |
|
336 | 343 | if hasattr(sys.displayhook, 'set_parent'): |
@@ -382,7 +389,7 b' class Kernel(object):' | |||
|
382 | 389 | result_buf = [] |
|
383 | 390 | |
|
384 | 391 | if etype is UnmetDependency: |
|
385 |
sub = |
|
|
392 | sub['dependencies_met'] = False | |
|
386 | 393 | else: |
|
387 | 394 | reply_content = {'status' : 'ok'} |
|
388 | 395 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
General Comments 0
You need to be logged in to leave comments.
Login now