Show More
@@ -198,9 +198,6 b' class HubFactory(RegistrationFactory):' | |||
|
198 | 198 | def __init__(self, **kwargs): |
|
199 | 199 | super(HubFactory, self).__init__(**kwargs) |
|
200 | 200 | self._update_monitor_url() |
|
201 | # self.on_trait_change(self._sync_ips, 'ip') | |
|
202 | # self.on_trait_change(self._sync_transports, 'transport') | |
|
203 | # self.subconstructors.append(self.construct_hub) | |
|
204 | 201 | |
|
205 | 202 | |
|
206 | 203 | def construct(self): |
@@ -449,34 +446,16 b' class Hub(LoggingFactory):' | |||
|
449 | 446 | # dispatch methods (1 per stream) |
|
450 | 447 | #----------------------------------------------------------------------------- |
|
451 | 448 | |
|
452 | # def dispatch_registration_request(self, msg): | |
|
453 | # """""" | |
|
454 | # self.log.debug("registration::dispatch_register_request(%s)"%msg) | |
|
455 | # idents,msg = self.session.feed_identities(msg) | |
|
456 | # if not idents: | |
|
457 | # self.log.error("Bad Query Message: %s"%msg, exc_info=True) | |
|
458 | # return | |
|
459 | # try: | |
|
460 | # msg = self.session.unpack_message(msg,content=True) | |
|
461 | # except: | |
|
462 | # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) | |
|
463 | # return | |
|
464 | # | |
|
465 | # msg_type = msg['msg_type'] | |
|
466 | # content = msg['content'] | |
|
467 | # | |
|
468 | # handler = self.query_handlers.get(msg_type, None) | |
|
469 | # if handler is None: | |
|
470 | # self.log.error("registration::got bad registration message: %s"%msg) | |
|
471 | # else: | |
|
472 | # handler(idents, msg) | |
|
473 | 449 | |
|
474 | 450 | def dispatch_monitor_traffic(self, msg): |
|
475 | 451 | """all ME and Task queue messages come through here, as well as |
|
476 | 452 | IOPub traffic.""" |
|
477 | 453 | self.log.debug("monitor traffic: %r"%msg[:2]) |
|
478 | 454 | switch = msg[0] |
|
479 | idents, msg = self.session.feed_identities(msg[1:]) | |
|
455 | try: | |
|
456 | idents, msg = self.session.feed_identities(msg[1:]) | |
|
457 | except ValueError: | |
|
458 | idents=[] | |
|
480 | 459 | if not idents: |
|
481 | 460 | self.log.error("Bad Monitor Message: %r"%msg) |
|
482 | 461 | return |
@@ -557,19 +536,19 b' class Hub(LoggingFactory):' | |||
|
557 | 536 | |
|
558 | 537 | def save_queue_request(self, idents, msg): |
|
559 | 538 | if len(idents) < 2: |
|
560 |
self.log.error("invalid identity prefix: % |
|
|
539 | self.log.error("invalid identity prefix: %r"%idents) | |
|
561 | 540 | return |
|
562 | 541 | queue_id, client_id = idents[:2] |
|
563 | 542 | try: |
|
564 | 543 | msg = self.session.unpack_message(msg, content=False) |
|
565 | except: | |
|
566 |
self.log.error("queue::client %r sent invalid message to %r: % |
|
|
544 | except Exception: | |
|
545 | self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) | |
|
567 | 546 | return |
|
568 | 547 | |
|
569 | 548 | eid = self.by_ident.get(queue_id, None) |
|
570 | 549 | if eid is None: |
|
571 | 550 | self.log.error("queue::target %r not registered"%queue_id) |
|
572 |
self.log.debug("queue:: valid are: % |
|
|
551 | self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) | |
|
573 | 552 | return |
|
574 | 553 | |
|
575 | 554 | header = msg['header'] |
@@ -597,21 +576,20 b' class Hub(LoggingFactory):' | |||
|
597 | 576 | |
|
598 | 577 | def save_queue_result(self, idents, msg): |
|
599 | 578 | if len(idents) < 2: |
|
600 |
self.log.error("invalid identity prefix: % |
|
|
579 | self.log.error("invalid identity prefix: %r"%idents) | |
|
601 | 580 | return |
|
602 | 581 | |
|
603 | 582 | client_id, queue_id = idents[:2] |
|
604 | 583 | try: |
|
605 | 584 | msg = self.session.unpack_message(msg, content=False) |
|
606 | except: | |
|
607 |
self.log.error("queue::engine %r sent invalid message to %r: % |
|
|
585 | except Exception: | |
|
586 | self.log.error("queue::engine %r sent invalid message to %r: %r"%( | |
|
608 | 587 | queue_id,client_id, msg), exc_info=True) |
|
609 | 588 | return |
|
610 | 589 | |
|
611 | 590 | eid = self.by_ident.get(queue_id, None) |
|
612 | 591 | if eid is None: |
|
613 | 592 | self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) |
|
614 | # self.log.debug("queue:: %s"%msg[2:]) | |
|
615 | 593 | return |
|
616 | 594 | |
|
617 | 595 | parent = msg['parent_header'] |
@@ -626,7 +604,7 b' class Hub(LoggingFactory):' | |||
|
626 | 604 | elif msg_id not in self.all_completed: |
|
627 | 605 | # it could be a result from a dead engine that died before delivering the |
|
628 | 606 | # result |
|
629 |
self.log.warn("queue:: unknown msg finished % |
|
|
607 | self.log.warn("queue:: unknown msg finished %r"%msg_id) | |
|
630 | 608 | return |
|
631 | 609 | # update record anyway, because the unregistration could have been premature |
|
632 | 610 | rheader = msg['header'] |
@@ -656,8 +634,8 b' class Hub(LoggingFactory):' | |||
|
656 | 634 | |
|
657 | 635 | try: |
|
658 | 636 | msg = self.session.unpack_message(msg, content=False) |
|
659 | except: | |
|
660 |
self.log.error("task::client %r sent invalid task message: % |
|
|
637 | except Exception: | |
|
638 | self.log.error("task::client %r sent invalid task message: %r"%( | |
|
661 | 639 | client_id, msg), exc_info=True) |
|
662 | 640 | return |
|
663 | 641 | record = init_record(msg) |
@@ -700,10 +678,9 b' class Hub(LoggingFactory):' | |||
|
700 | 678 | client_id = idents[0] |
|
701 | 679 | try: |
|
702 | 680 | msg = self.session.unpack_message(msg, content=False) |
|
703 | except: | |
|
704 |
self.log.error("task::invalid task result message send to %r: % |
|
|
681 | except Exception: | |
|
682 | self.log.error("task::invalid task result message send to %r: %r"%( | |
|
705 | 683 | client_id, msg), exc_info=True) |
|
706 | raise | |
|
707 | 684 | return |
|
708 | 685 | |
|
709 | 686 | parent = msg['parent_header'] |
@@ -745,12 +722,12 b' class Hub(LoggingFactory):' | |||
|
745 | 722 | self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) |
|
746 | 723 | |
|
747 | 724 | else: |
|
748 |
self.log.debug("task::unknown task % |
|
|
725 | self.log.debug("task::unknown task %r finished"%msg_id) | |
|
749 | 726 | |
|
750 | 727 | def save_task_destination(self, idents, msg): |
|
751 | 728 | try: |
|
752 | 729 | msg = self.session.unpack_message(msg, content=True) |
|
753 | except: | |
|
730 | except Exception: | |
|
754 | 731 | self.log.error("task::invalid task tracking message", exc_info=True) |
|
755 | 732 | return |
|
756 | 733 | content = msg['content'] |
@@ -759,11 +736,11 b' class Hub(LoggingFactory):' | |||
|
759 | 736 | engine_uuid = content['engine_id'] |
|
760 | 737 | eid = self.by_ident[engine_uuid] |
|
761 | 738 | |
|
762 |
self.log.info("task::task % |
|
|
739 | self.log.info("task::task %r arrived on %r"%(msg_id, eid)) | |
|
763 | 740 | if msg_id in self.unassigned: |
|
764 | 741 | self.unassigned.remove(msg_id) |
|
765 | 742 | # else: |
|
766 |
# self.log.debug("task::task % |
|
|
743 | # self.log.debug("task::task %r not listed as MIA?!"%(msg_id)) | |
|
767 | 744 | |
|
768 | 745 | self.tasks[eid].append(msg_id) |
|
769 | 746 | # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) |
@@ -787,13 +764,13 b' class Hub(LoggingFactory):' | |||
|
787 | 764 | # print (topics) |
|
788 | 765 | try: |
|
789 | 766 | msg = self.session.unpack_message(msg, content=True) |
|
790 | except: | |
|
767 | except Exception: | |
|
791 | 768 | self.log.error("iopub::invalid IOPub message", exc_info=True) |
|
792 | 769 | return |
|
793 | 770 | |
|
794 | 771 | parent = msg['parent_header'] |
|
795 | 772 | if not parent: |
|
796 |
self.log.error("iopub::invalid IOPub message: % |
|
|
773 | self.log.error("iopub::invalid IOPub message: %r"%msg) | |
|
797 | 774 | return |
|
798 | 775 | msg_id = parent['msg_id'] |
|
799 | 776 | msg_type = msg['msg_type'] |
@@ -833,7 +810,7 b' class Hub(LoggingFactory):' | |||
|
833 | 810 | |
|
834 | 811 | def connection_request(self, client_id, msg): |
|
835 | 812 | """Reply with connection addresses for clients.""" |
|
836 |
self.log.info("client::client % |
|
|
813 | self.log.info("client::client %r connected"%client_id) | |
|
837 | 814 | content = dict(status='ok') |
|
838 | 815 | content.update(self.client_info) |
|
839 | 816 | jsonable = {} |
@@ -905,7 +882,7 b' class Hub(LoggingFactory):' | |||
|
905 | 882 | dc.start() |
|
906 | 883 | self.incoming_registrations[heart] = (eid,queue,reg[0],dc) |
|
907 | 884 | else: |
|
908 |
self.log.error("registration::registration %i failed: % |
|
|
885 | self.log.error("registration::registration %i failed: %r"%(eid, content['evalue'])) | |
|
909 | 886 | return eid |
|
910 | 887 | |
|
911 | 888 | def unregister_engine(self, ident, msg): |
@@ -913,9 +890,9 b' class Hub(LoggingFactory):' | |||
|
913 | 890 | try: |
|
914 | 891 | eid = msg['content']['id'] |
|
915 | 892 | except: |
|
916 |
self.log.error("registration::bad engine id for unregistration: % |
|
|
893 | self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True) | |
|
917 | 894 | return |
|
918 |
self.log.info("registration::unregister_engine(% |
|
|
895 | self.log.info("registration::unregister_engine(%r)"%eid) | |
|
919 | 896 | # print (eid) |
|
920 | 897 | uuid = self.keytable[eid] |
|
921 | 898 | content=dict(id=eid, queue=uuid) |
@@ -1135,7 +1112,7 b' class Hub(LoggingFactory):' | |||
|
1135 | 1112 | elif len(records) < len(msg_ids): |
|
1136 | 1113 | missing = [ m for m in msg_ids if m not in found_ids ] |
|
1137 | 1114 | try: |
|
1138 |
raise KeyError("No such msg(s): % |
|
|
1115 | raise KeyError("No such msg(s): %r"%missing) | |
|
1139 | 1116 | except KeyError: |
|
1140 | 1117 | return finish(error.wrap_exception()) |
|
1141 | 1118 | elif invalid_ids: |
@@ -261,7 +261,6 b' class TaskScheduler(SessionFactory):' | |||
|
261 | 261 | continue |
|
262 | 262 | |
|
263 | 263 | raw_msg = lost[msg_id][0] |
|
264 | ||
|
265 | 264 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
266 | 265 | msg = self.session.unpack_message(msg, copy=False, content=False) |
|
267 | 266 | parent = msg['header'] |
@@ -294,9 +293,10 b' class TaskScheduler(SessionFactory):' | |||
|
294 | 293 | idents, msg = self.session.feed_identities(raw_msg, copy=False) |
|
295 | 294 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
296 | 295 | except Exception: |
|
297 |
self.log.error("task::Invaid task |
|
|
296 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) | |
|
298 | 297 | return |
|
299 | 298 | |
|
299 | ||
|
300 | 300 | # send to monitor |
|
301 | 301 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) |
|
302 | 302 | |
@@ -497,7 +497,7 b' class TaskScheduler(SessionFactory):' | |||
|
497 | 497 | else: |
|
498 | 498 | self.finish_job(idx) |
|
499 | 499 | except Exception: |
|
500 |
self.log.error("task::Invaid result: % |
|
|
500 | self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True) | |
|
501 | 501 | return |
|
502 | 502 | |
|
503 | 503 | header = msg['header'] |
General Comments 0
You need to be logged in to leave comments.
Login now