Show More
@@ -198,9 +198,6 b' class HubFactory(RegistrationFactory):' | |||||
198 | def __init__(self, **kwargs): |
|
198 | def __init__(self, **kwargs): | |
199 | super(HubFactory, self).__init__(**kwargs) |
|
199 | super(HubFactory, self).__init__(**kwargs) | |
200 | self._update_monitor_url() |
|
200 | self._update_monitor_url() | |
201 | # self.on_trait_change(self._sync_ips, 'ip') |
|
|||
202 | # self.on_trait_change(self._sync_transports, 'transport') |
|
|||
203 | # self.subconstructors.append(self.construct_hub) |
|
|||
204 |
|
201 | |||
205 |
|
202 | |||
206 | def construct(self): |
|
203 | def construct(self): | |
@@ -449,34 +446,16 b' class Hub(LoggingFactory):' | |||||
449 | # dispatch methods (1 per stream) |
|
446 | # dispatch methods (1 per stream) | |
450 | #----------------------------------------------------------------------------- |
|
447 | #----------------------------------------------------------------------------- | |
451 |
|
448 | |||
452 | # def dispatch_registration_request(self, msg): |
|
|||
453 | # """""" |
|
|||
454 | # self.log.debug("registration::dispatch_register_request(%s)"%msg) |
|
|||
455 | # idents,msg = self.session.feed_identities(msg) |
|
|||
456 | # if not idents: |
|
|||
457 | # self.log.error("Bad Query Message: %s"%msg, exc_info=True) |
|
|||
458 | # return |
|
|||
459 | # try: |
|
|||
460 | # msg = self.session.unpack_message(msg,content=True) |
|
|||
461 | # except: |
|
|||
462 | # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) |
|
|||
463 | # return |
|
|||
464 | # |
|
|||
465 | # msg_type = msg['msg_type'] |
|
|||
466 | # content = msg['content'] |
|
|||
467 | # |
|
|||
468 | # handler = self.query_handlers.get(msg_type, None) |
|
|||
469 | # if handler is None: |
|
|||
470 | # self.log.error("registration::got bad registration message: %s"%msg) |
|
|||
471 | # else: |
|
|||
472 | # handler(idents, msg) |
|
|||
473 |
|
449 | |||
474 | def dispatch_monitor_traffic(self, msg): |
|
450 | def dispatch_monitor_traffic(self, msg): | |
475 | """all ME and Task queue messages come through here, as well as |
|
451 | """all ME and Task queue messages come through here, as well as | |
476 | IOPub traffic.""" |
|
452 | IOPub traffic.""" | |
477 | self.log.debug("monitor traffic: %r"%msg[:2]) |
|
453 | self.log.debug("monitor traffic: %r"%msg[:2]) | |
478 | switch = msg[0] |
|
454 | switch = msg[0] | |
479 | idents, msg = self.session.feed_identities(msg[1:]) |
|
455 | try: | |
|
456 | idents, msg = self.session.feed_identities(msg[1:]) | |||
|
457 | except ValueError: | |||
|
458 | idents=[] | |||
480 | if not idents: |
|
459 | if not idents: | |
481 | self.log.error("Bad Monitor Message: %r"%msg) |
|
460 | self.log.error("Bad Monitor Message: %r"%msg) | |
482 | return |
|
461 | return | |
@@ -557,19 +536,19 b' class Hub(LoggingFactory):' | |||||
557 |
|
536 | |||
558 | def save_queue_request(self, idents, msg): |
|
537 | def save_queue_request(self, idents, msg): | |
559 | if len(idents) < 2: |
|
538 | if len(idents) < 2: | |
560 |
self.log.error("invalid identity prefix: % |
|
539 | self.log.error("invalid identity prefix: %r"%idents) | |
561 | return |
|
540 | return | |
562 | queue_id, client_id = idents[:2] |
|
541 | queue_id, client_id = idents[:2] | |
563 | try: |
|
542 | try: | |
564 | msg = self.session.unpack_message(msg, content=False) |
|
543 | msg = self.session.unpack_message(msg, content=False) | |
565 | except: |
|
544 | except Exception: | |
566 |
self.log.error("queue::client %r sent invalid message to %r: % |
|
545 | self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) | |
567 | return |
|
546 | return | |
568 |
|
547 | |||
569 | eid = self.by_ident.get(queue_id, None) |
|
548 | eid = self.by_ident.get(queue_id, None) | |
570 | if eid is None: |
|
549 | if eid is None: | |
571 | self.log.error("queue::target %r not registered"%queue_id) |
|
550 | self.log.error("queue::target %r not registered"%queue_id) | |
572 |
self.log.debug("queue:: valid are: % |
|
551 | self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) | |
573 | return |
|
552 | return | |
574 |
|
553 | |||
575 | header = msg['header'] |
|
554 | header = msg['header'] | |
@@ -597,21 +576,20 b' class Hub(LoggingFactory):' | |||||
597 |
|
576 | |||
598 | def save_queue_result(self, idents, msg): |
|
577 | def save_queue_result(self, idents, msg): | |
599 | if len(idents) < 2: |
|
578 | if len(idents) < 2: | |
600 |
self.log.error("invalid identity prefix: % |
|
579 | self.log.error("invalid identity prefix: %r"%idents) | |
601 | return |
|
580 | return | |
602 |
|
581 | |||
603 | client_id, queue_id = idents[:2] |
|
582 | client_id, queue_id = idents[:2] | |
604 | try: |
|
583 | try: | |
605 | msg = self.session.unpack_message(msg, content=False) |
|
584 | msg = self.session.unpack_message(msg, content=False) | |
606 | except: |
|
585 | except Exception: | |
607 |
self.log.error("queue::engine %r sent invalid message to %r: % |
|
586 | self.log.error("queue::engine %r sent invalid message to %r: %r"%( | |
608 | queue_id,client_id, msg), exc_info=True) |
|
587 | queue_id,client_id, msg), exc_info=True) | |
609 | return |
|
588 | return | |
610 |
|
589 | |||
611 | eid = self.by_ident.get(queue_id, None) |
|
590 | eid = self.by_ident.get(queue_id, None) | |
612 | if eid is None: |
|
591 | if eid is None: | |
613 | self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) |
|
592 | self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) | |
614 | # self.log.debug("queue:: %s"%msg[2:]) |
|
|||
615 | return |
|
593 | return | |
616 |
|
594 | |||
617 | parent = msg['parent_header'] |
|
595 | parent = msg['parent_header'] | |
@@ -626,7 +604,7 b' class Hub(LoggingFactory):' | |||||
626 | elif msg_id not in self.all_completed: |
|
604 | elif msg_id not in self.all_completed: | |
627 | # it could be a result from a dead engine that died before delivering the |
|
605 | # it could be a result from a dead engine that died before delivering the | |
628 | # result |
|
606 | # result | |
629 |
self.log.warn("queue:: unknown msg finished % |
|
607 | self.log.warn("queue:: unknown msg finished %r"%msg_id) | |
630 | return |
|
608 | return | |
631 | # update record anyway, because the unregistration could have been premature |
|
609 | # update record anyway, because the unregistration could have been premature | |
632 | rheader = msg['header'] |
|
610 | rheader = msg['header'] | |
@@ -656,8 +634,8 b' class Hub(LoggingFactory):' | |||||
656 |
|
634 | |||
657 | try: |
|
635 | try: | |
658 | msg = self.session.unpack_message(msg, content=False) |
|
636 | msg = self.session.unpack_message(msg, content=False) | |
659 | except: |
|
637 | except Exception: | |
660 |
self.log.error("task::client %r sent invalid task message: % |
|
638 | self.log.error("task::client %r sent invalid task message: %r"%( | |
661 | client_id, msg), exc_info=True) |
|
639 | client_id, msg), exc_info=True) | |
662 | return |
|
640 | return | |
663 | record = init_record(msg) |
|
641 | record = init_record(msg) | |
@@ -700,10 +678,9 b' class Hub(LoggingFactory):' | |||||
700 | client_id = idents[0] |
|
678 | client_id = idents[0] | |
701 | try: |
|
679 | try: | |
702 | msg = self.session.unpack_message(msg, content=False) |
|
680 | msg = self.session.unpack_message(msg, content=False) | |
703 | except: |
|
681 | except Exception: | |
704 |
self.log.error("task::invalid task result message send to %r: % |
|
682 | self.log.error("task::invalid task result message send to %r: %r"%( | |
705 | client_id, msg), exc_info=True) |
|
683 | client_id, msg), exc_info=True) | |
706 | raise |
|
|||
707 | return |
|
684 | return | |
708 |
|
685 | |||
709 | parent = msg['parent_header'] |
|
686 | parent = msg['parent_header'] | |
@@ -745,12 +722,12 b' class Hub(LoggingFactory):' | |||||
745 | self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) |
|
722 | self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) | |
746 |
|
723 | |||
747 | else: |
|
724 | else: | |
748 |
self.log.debug("task::unknown task % |
|
725 | self.log.debug("task::unknown task %r finished"%msg_id) | |
749 |
|
726 | |||
750 | def save_task_destination(self, idents, msg): |
|
727 | def save_task_destination(self, idents, msg): | |
751 | try: |
|
728 | try: | |
752 | msg = self.session.unpack_message(msg, content=True) |
|
729 | msg = self.session.unpack_message(msg, content=True) | |
753 | except: |
|
730 | except Exception: | |
754 | self.log.error("task::invalid task tracking message", exc_info=True) |
|
731 | self.log.error("task::invalid task tracking message", exc_info=True) | |
755 | return |
|
732 | return | |
756 | content = msg['content'] |
|
733 | content = msg['content'] | |
@@ -759,11 +736,11 b' class Hub(LoggingFactory):' | |||||
759 | engine_uuid = content['engine_id'] |
|
736 | engine_uuid = content['engine_id'] | |
760 | eid = self.by_ident[engine_uuid] |
|
737 | eid = self.by_ident[engine_uuid] | |
761 |
|
738 | |||
762 |
self.log.info("task::task % |
|
739 | self.log.info("task::task %r arrived on %r"%(msg_id, eid)) | |
763 | if msg_id in self.unassigned: |
|
740 | if msg_id in self.unassigned: | |
764 | self.unassigned.remove(msg_id) |
|
741 | self.unassigned.remove(msg_id) | |
765 | # else: |
|
742 | # else: | |
766 |
# self.log.debug("task::task % |
|
743 | # self.log.debug("task::task %r not listed as MIA?!"%(msg_id)) | |
767 |
|
744 | |||
768 | self.tasks[eid].append(msg_id) |
|
745 | self.tasks[eid].append(msg_id) | |
769 | # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) |
|
746 | # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) | |
@@ -787,13 +764,13 b' class Hub(LoggingFactory):' | |||||
787 | # print (topics) |
|
764 | # print (topics) | |
788 | try: |
|
765 | try: | |
789 | msg = self.session.unpack_message(msg, content=True) |
|
766 | msg = self.session.unpack_message(msg, content=True) | |
790 | except: |
|
767 | except Exception: | |
791 | self.log.error("iopub::invalid IOPub message", exc_info=True) |
|
768 | self.log.error("iopub::invalid IOPub message", exc_info=True) | |
792 | return |
|
769 | return | |
793 |
|
770 | |||
794 | parent = msg['parent_header'] |
|
771 | parent = msg['parent_header'] | |
795 | if not parent: |
|
772 | if not parent: | |
796 |
self.log.error("iopub::invalid IOPub message: % |
|
773 | self.log.error("iopub::invalid IOPub message: %r"%msg) | |
797 | return |
|
774 | return | |
798 | msg_id = parent['msg_id'] |
|
775 | msg_id = parent['msg_id'] | |
799 | msg_type = msg['msg_type'] |
|
776 | msg_type = msg['msg_type'] | |
@@ -833,7 +810,7 b' class Hub(LoggingFactory):' | |||||
833 |
|
810 | |||
834 | def connection_request(self, client_id, msg): |
|
811 | def connection_request(self, client_id, msg): | |
835 | """Reply with connection addresses for clients.""" |
|
812 | """Reply with connection addresses for clients.""" | |
836 |
self.log.info("client::client % |
|
813 | self.log.info("client::client %r connected"%client_id) | |
837 | content = dict(status='ok') |
|
814 | content = dict(status='ok') | |
838 | content.update(self.client_info) |
|
815 | content.update(self.client_info) | |
839 | jsonable = {} |
|
816 | jsonable = {} | |
@@ -905,7 +882,7 b' class Hub(LoggingFactory):' | |||||
905 | dc.start() |
|
882 | dc.start() | |
906 | self.incoming_registrations[heart] = (eid,queue,reg[0],dc) |
|
883 | self.incoming_registrations[heart] = (eid,queue,reg[0],dc) | |
907 | else: |
|
884 | else: | |
908 |
self.log.error("registration::registration %i failed: % |
|
885 | self.log.error("registration::registration %i failed: %r"%(eid, content['evalue'])) | |
909 | return eid |
|
886 | return eid | |
910 |
|
887 | |||
911 | def unregister_engine(self, ident, msg): |
|
888 | def unregister_engine(self, ident, msg): | |
@@ -913,9 +890,9 b' class Hub(LoggingFactory):' | |||||
913 | try: |
|
890 | try: | |
914 | eid = msg['content']['id'] |
|
891 | eid = msg['content']['id'] | |
915 | except: |
|
892 | except: | |
916 |
self.log.error("registration::bad engine id for unregistration: % |
|
893 | self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True) | |
917 | return |
|
894 | return | |
918 |
self.log.info("registration::unregister_engine(% |
|
895 | self.log.info("registration::unregister_engine(%r)"%eid) | |
919 | # print (eid) |
|
896 | # print (eid) | |
920 | uuid = self.keytable[eid] |
|
897 | uuid = self.keytable[eid] | |
921 | content=dict(id=eid, queue=uuid) |
|
898 | content=dict(id=eid, queue=uuid) | |
@@ -1135,7 +1112,7 b' class Hub(LoggingFactory):' | |||||
1135 | elif len(records) < len(msg_ids): |
|
1112 | elif len(records) < len(msg_ids): | |
1136 | missing = [ m for m in msg_ids if m not in found_ids ] |
|
1113 | missing = [ m for m in msg_ids if m not in found_ids ] | |
1137 | try: |
|
1114 | try: | |
1138 |
raise KeyError("No such msg(s): % |
|
1115 | raise KeyError("No such msg(s): %r"%missing) | |
1139 | except KeyError: |
|
1116 | except KeyError: | |
1140 | return finish(error.wrap_exception()) |
|
1117 | return finish(error.wrap_exception()) | |
1141 | elif invalid_ids: |
|
1118 | elif invalid_ids: |
@@ -261,7 +261,6 b' class TaskScheduler(SessionFactory):' | |||||
261 | continue |
|
261 | continue | |
262 |
|
262 | |||
263 | raw_msg = lost[msg_id][0] |
|
263 | raw_msg = lost[msg_id][0] | |
264 |
|
||||
265 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
264 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
266 | msg = self.session.unpack_message(msg, copy=False, content=False) |
|
265 | msg = self.session.unpack_message(msg, copy=False, content=False) | |
267 | parent = msg['header'] |
|
266 | parent = msg['header'] | |
@@ -294,9 +293,10 b' class TaskScheduler(SessionFactory):' | |||||
294 | idents, msg = self.session.feed_identities(raw_msg, copy=False) |
|
293 | idents, msg = self.session.feed_identities(raw_msg, copy=False) | |
295 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
294 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
296 | except Exception: |
|
295 | except Exception: | |
297 |
self.log.error("task::Invaid task |
|
296 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) | |
298 | return |
|
297 | return | |
299 |
|
298 | |||
|
299 | ||||
300 | # send to monitor |
|
300 | # send to monitor | |
301 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) |
|
301 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | |
302 |
|
302 | |||
@@ -497,7 +497,7 b' class TaskScheduler(SessionFactory):' | |||||
497 | else: |
|
497 | else: | |
498 | self.finish_job(idx) |
|
498 | self.finish_job(idx) | |
499 | except Exception: |
|
499 | except Exception: | |
500 |
self.log.error("task::Invaid result: % |
|
500 | self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True) | |
501 | return |
|
501 | return | |
502 |
|
502 | |||
503 | header = msg['header'] |
|
503 | header = msg['header'] |
General Comments 0
You need to be logged in to leave comments.
Login now