##// END OF EJS Templates
minor controller logging adjustments...
MinRK -
Show More
@@ -53,6 +53,7 class Heart(object):
53 53 def start(self):
54 54 return self.device.start()
55 55
56
56 57 class HeartMonitor(LoggingConfigurable):
57 58 """A basic HeartMonitor class
58 59 pingstream: a PUB stream
@@ -92,12 +93,12 class HeartMonitor(LoggingConfigurable):
92 93
93 94 def add_new_heart_handler(self, handler):
94 95 """add a new handler for new hearts"""
95 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
96 self.log.debug("heartbeat::new_heart_handler: %s", handler)
96 97 self._new_handlers.add(handler)
97 98
98 99 def add_heart_failure_handler(self, handler):
99 100 """add a new handler for heart failure"""
100 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
101 self.log.debug("heartbeat::new heart failure handler: %s", handler)
101 102 self._failure_handlers.add(handler)
102 103
103 104 def beat(self):
@@ -107,7 +108,7 class HeartMonitor(LoggingConfigurable):
107 108 toc = time.time()
108 109 self.lifetime += toc-self.tic
109 110 self.tic = toc
110 # self.log.debug("heartbeat::%s"%self.lifetime)
111 self.log.debug("heartbeat::sending %s", self.lifetime)
111 112 goodhearts = self.hearts.intersection(self.responses)
112 113 missed_beats = self.hearts.difference(goodhearts)
113 114 heartfailures = self.on_probation.intersection(missed_beats)
@@ -117,7 +118,7 class HeartMonitor(LoggingConfigurable):
117 118 self.on_probation = missed_beats.intersection(self.hearts)
118 119 self.responses = set()
119 120 # print self.on_probation, self.hearts
120 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
121 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
121 122 self.pingstream.send(asbytes(str(self.lifetime)))
122 123
123 124 def handle_new_heart(self, heart):
@@ -125,7 +126,7 class HeartMonitor(LoggingConfigurable):
125 126 for handler in self._new_handlers:
126 127 handler(heart)
127 128 else:
128 self.log.info("heartbeat::yay, got new heart %s!"%heart)
129 self.log.info("heartbeat::yay, got new heart %s!", heart)
129 130 self.hearts.add(heart)
130 131
131 132 def handle_heart_failure(self, heart):
@@ -134,10 +135,10 class HeartMonitor(LoggingConfigurable):
134 135 try:
135 136 handler(heart)
136 137 except Exception as e:
137 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
138 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
138 139 pass
139 140 else:
140 self.log.info("heartbeat::Heart %s failed :("%heart)
141 self.log.info("heartbeat::Heart %s failed :(", heart)
141 142 self.hearts.remove(heart)
142 143
143 144
@@ -151,11 +152,10 class HeartMonitor(LoggingConfigurable):
151 152 self.responses.add(msg[0])
152 153 elif msg[1] == last:
153 154 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
154 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
155 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
155 156 self.responses.add(msg[0])
156 157 else:
157 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
158 (msg[1],self.lifetime))
158 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
159 159
160 160
161 161 if __name__ == '__main__':
@@ -223,10 +223,10 class HubFactory(RegistrationFactory):
223 223 # Registrar socket
224 224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 227 if self.client_ip != self.engine_ip:
228 228 q.bind(engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230 230
231 231 ### Engine connections ###
232 232
@@ -282,8 +282,8 class HubFactory(RegistrationFactory):
282 282 'iopub' : client_iface%self.iopub[0],
283 283 'notification': client_iface%self.notifier_port
284 284 }
285 self.log.debug("Hub engine addrs: %s"%self.engine_info)
286 self.log.debug("Hub client addrs: %s"%self.client_info)
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
287 287
288 288 # resubmit stream
289 289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
@@ -457,20 +457,20 class Hub(SessionFactory):
457 457 def dispatch_monitor_traffic(self, msg):
458 458 """all ME and Task queue messages come through here, as well as
459 459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r"%msg[:2])
460 self.log.debug("monitor traffic: %r", msg[:2])
461 461 switch = msg[0]
462 462 try:
463 463 idents, msg = self.session.feed_identities(msg[1:])
464 464 except ValueError:
465 465 idents=[]
466 466 if not idents:
467 self.log.error("Bad Monitor Message: %r"%msg)
467 self.log.error("Bad Monitor Message: %r", msg)
468 468 return
469 469 handler = self.monitor_handlers.get(switch, None)
470 470 if handler is not None:
471 471 handler(idents, msg)
472 472 else:
473 self.log.error("Invalid monitor topic: %r"%switch)
473 self.log.error("Invalid monitor topic: %r", switch)
474 474
475 475
476 476 def dispatch_query(self, msg):
@@ -480,27 +480,27 class Hub(SessionFactory):
480 480 except ValueError:
481 481 idents = []
482 482 if not idents:
483 self.log.error("Bad Query Message: %r"%msg)
483 self.log.error("Bad Query Message: %r", msg)
484 484 return
485 485 client_id = idents[0]
486 486 try:
487 487 msg = self.session.unserialize(msg, content=True)
488 488 except Exception:
489 489 content = error.wrap_exception()
490 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 491 self.session.send(self.query, "hub_error", ident=client_id,
492 492 content=content)
493 493 return
494 494 # print client_id, header, parent, content
495 495 #switch on message type:
496 496 msg_type = msg['header']['msg_type']
497 self.log.info("client::client %r requested %r"%(client_id, msg_type))
497 self.log.info("client::client %r requested %r", client_id, msg_type)
498 498 handler = self.query_handlers.get(msg_type, None)
499 499 try:
500 500 assert handler is not None, "Bad Message Type: %r"%msg_type
501 501 except:
502 502 content = error.wrap_exception()
503 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 504 self.session.send(self.query, "hub_error", ident=client_id,
505 505 content=content)
506 506 return
@@ -522,9 +522,9 class Hub(SessionFactory):
522 522 """handler to attach to heartbeater.
523 523 Called when a new heart starts to beat.
524 524 Triggers completion of registration."""
525 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 526 if heart not in self.incoming_registrations:
527 self.log.info("heartbeat::ignoring new heart: %r"%heart)
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 528 else:
529 529 self.finish_registration(heart)
530 530
@@ -533,11 +533,11 class Hub(SessionFactory):
533 533 """handler to attach to heartbeater.
534 534 called when a previously registered heart fails to respond to beat request.
535 535 triggers unregistration"""
536 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 537 eid = self.hearts.get(heart, None)
538 538 queue = self.engines[eid].queue
539 if eid is None:
540 self.log.info("heartbeat::ignoring heart failure %r"%heart)
539 if eid is None or self.keytable[eid] in self.dead_engines:
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 541 else:
542 542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543 543
@@ -545,22 +545,23 class Hub(SessionFactory):
545 545
546 546 def save_queue_request(self, idents, msg):
547 547 if len(idents) < 2:
548 self.log.error("invalid identity prefix: %r"%idents)
548 self.log.error("invalid identity prefix: %r", idents)
549 549 return
550 550 queue_id, client_id = idents[:2]
551 551 try:
552 552 msg = self.session.unserialize(msg)
553 553 except Exception:
554 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 555 return
556 556
557 557 eid = self.by_ident.get(queue_id, None)
558 558 if eid is None:
559 self.log.error("queue::target %r not registered"%queue_id)
560 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
559 self.log.error("queue::target %r not registered", queue_id)
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 561 return
562 562 record = init_record(msg)
563 563 msg_id = record['msg_id']
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
564 565 # Unicode in records
565 566 record['engine_uuid'] = queue_id.decode('ascii')
566 567 record['client_uuid'] = client_id.decode('ascii')
@@ -572,18 +573,18 class Hub(SessionFactory):
572 573 for key,evalue in existing.iteritems():
573 574 rvalue = record.get(key, None)
574 575 if evalue and rvalue and evalue != rvalue:
575 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
576 577 elif evalue and not rvalue:
577 578 record[key] = evalue
578 579 try:
579 580 self.db.update_record(msg_id, record)
580 581 except Exception:
581 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
582 583 except KeyError:
583 584 try:
584 585 self.db.add_record(msg_id, record)
585 586 except Exception:
586 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
587 588
588 589
589 590 self.pending.add(msg_id)
@@ -591,20 +592,20 class Hub(SessionFactory):
591 592
592 593 def save_queue_result(self, idents, msg):
593 594 if len(idents) < 2:
594 self.log.error("invalid identity prefix: %r"%idents)
595 self.log.error("invalid identity prefix: %r", idents)
595 596 return
596 597
597 598 client_id, queue_id = idents[:2]
598 599 try:
599 600 msg = self.session.unserialize(msg)
600 601 except Exception:
601 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
602 queue_id,client_id, msg), exc_info=True)
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
603 queue_id, client_id, msg, exc_info=True)
603 604 return
604 605
605 606 eid = self.by_ident.get(queue_id, None)
606 607 if eid is None:
607 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
608 609 return
609 610
610 611 parent = msg['parent_header']
@@ -616,10 +617,11 class Hub(SessionFactory):
616 617 self.all_completed.add(msg_id)
617 618 self.queues[eid].remove(msg_id)
618 619 self.completed[eid].append(msg_id)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
619 621 elif msg_id not in self.all_completed:
620 622 # it could be a result from a dead engine that died before delivering the
621 623 # result
622 self.log.warn("queue:: unknown msg finished %r"%msg_id)
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
623 625 return
624 626 # update record anyway, because the unregistration could have been premature
625 627 rheader = msg['header']
@@ -636,7 +638,7 class Hub(SessionFactory):
636 638 try:
637 639 self.db.update_record(msg_id, result)
638 640 except Exception:
639 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
640 642
641 643
642 644 #--------------------- Task Queue Traffic ------------------------------
@@ -648,8 +650,8 class Hub(SessionFactory):
648 650 try:
649 651 msg = self.session.unserialize(msg)
650 652 except Exception:
651 self.log.error("task::client %r sent invalid task message: %r"%(
652 client_id, msg), exc_info=True)
653 self.log.error("task::client %r sent invalid task message: %r",
654 client_id, msg, exc_info=True)
653 655 return
654 656 record = init_record(msg)
655 657
@@ -677,20 +679,20 class Hub(SessionFactory):
677 679 continue
678 680 rvalue = record.get(key, None)
679 681 if evalue and rvalue and evalue != rvalue:
680 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
681 683 elif evalue and not rvalue:
682 684 record[key] = evalue
683 685 try:
684 686 self.db.update_record(msg_id, record)
685 687 except Exception:
686 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687 689 except KeyError:
688 690 try:
689 691 self.db.add_record(msg_id, record)
690 692 except Exception:
691 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
692 694 except Exception:
693 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
694 696
695 697 def save_task_result(self, idents, msg):
696 698 """save the result of a completed task."""
@@ -698,14 +700,14 class Hub(SessionFactory):
698 700 try:
699 701 msg = self.session.unserialize(msg)
700 702 except Exception:
701 self.log.error("task::invalid task result message send to %r: %r"%(
702 client_id, msg), exc_info=True)
703 self.log.error("task::invalid task result message send to %r: %r",
704 client_id, msg, exc_info=True)
703 705 return
704 706
705 707 parent = msg['parent_header']
706 708 if not parent:
707 709 # print msg
708 self.log.warn("Task %r had no parent!"%msg)
710 self.log.warn("Task %r had no parent!", msg)
709 711 return
710 712 msg_id = parent['msg_id']
711 713 if msg_id in self.unassigned:
@@ -716,6 +718,7 class Hub(SessionFactory):
716 718 eid = self.by_ident.get(engine_uuid, None)
717 719
718 720 if msg_id in self.pending:
721 self.log.info("task::task %r finished on %s", msg_id, eid)
719 722 self.pending.remove(msg_id)
720 723 self.all_completed.add(msg_id)
721 724 if eid is not None:
@@ -736,10 +739,10 class Hub(SessionFactory):
736 739 try:
737 740 self.db.update_record(msg_id, result)
738 741 except Exception:
739 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
740 743
741 744 else:
742 self.log.debug("task::unknown task %r finished"%msg_id)
745 self.log.debug("task::unknown task %r finished", msg_id)
743 746
744 747 def save_task_destination(self, idents, msg):
745 748 try:
@@ -753,7 +756,7 class Hub(SessionFactory):
753 756 engine_uuid = content['engine_id']
754 757 eid = self.by_ident[util.asbytes(engine_uuid)]
755 758
756 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
757 760 if msg_id in self.unassigned:
758 761 self.unassigned.remove(msg_id)
759 762 # else:
@@ -764,7 +767,7 class Hub(SessionFactory):
764 767 try:
765 768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
766 769 except Exception:
767 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
768 771
769 772
770 773 def mia_task_request(self, idents, msg):
@@ -787,7 +790,7 class Hub(SessionFactory):
787 790
788 791 parent = msg['parent_header']
789 792 if not parent:
790 self.log.error("iopub::invalid IOPub message: %r"%msg)
793 self.log.error("iopub::invalid IOPub message: %r", msg)
791 794 return
792 795 msg_id = parent['msg_id']
793 796 msg_type = msg['header']['msg_type']
@@ -817,7 +820,7 class Hub(SessionFactory):
817 820 try:
818 821 self.db.update_record(msg_id, d)
819 822 except Exception:
820 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
821 824
822 825
823 826
@@ -827,7 +830,7 class Hub(SessionFactory):
827 830
828 831 def connection_request(self, client_id, msg):
829 832 """Reply with connection addresses for clients."""
830 self.log.info("client::client %r connected"%client_id)
833 self.log.info("client::client %r connected", client_id)
831 834 content = dict(status='ok')
832 835 content.update(self.client_info)
833 836 jsonable = {}
@@ -852,7 +855,7 class Hub(SessionFactory):
852 855 eid = self._next_id
853 856 # print (eid, queue, reg, heart)
854 857
855 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
856 859
857 860 content = dict(id=eid,status='ok')
858 861 content.update(self.engine_info)
@@ -862,12 +865,12 class Hub(SessionFactory):
862 865 raise KeyError("queue_id %r in use"%queue)
863 866 except:
864 867 content = error.wrap_exception()
865 self.log.error("queue_id %r in use"%queue, exc_info=True)
868 self.log.error("queue_id %r in use", queue, exc_info=True)
866 869 elif heart in self.hearts: # need to check unique hearts?
867 870 try:
868 871 raise KeyError("heart_id %r in use"%heart)
869 872 except:
870 self.log.error("heart_id %r in use"%heart, exc_info=True)
873 self.log.error("heart_id %r in use", heart, exc_info=True)
871 874 content = error.wrap_exception()
872 875 else:
873 876 for h, pack in self.incoming_registrations.iteritems():
@@ -875,14 +878,14 class Hub(SessionFactory):
875 878 try:
876 879 raise KeyError("heart_id %r in use"%heart)
877 880 except:
878 self.log.error("heart_id %r in use"%heart, exc_info=True)
881 self.log.error("heart_id %r in use", heart, exc_info=True)
879 882 content = error.wrap_exception()
880 883 break
881 884 elif queue == pack[1]:
882 885 try:
883 886 raise KeyError("queue_id %r in use"%queue)
884 887 except:
885 self.log.error("queue_id %r in use"%queue, exc_info=True)
888 self.log.error("queue_id %r in use", queue, exc_info=True)
886 889 content = error.wrap_exception()
887 890 break
888 891
@@ -901,7 +904,7 class Hub(SessionFactory):
901 904 dc.start()
902 905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
903 906 else:
904 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
905 908 return eid
906 909
907 910 def unregister_engine(self, ident, msg):
@@ -909,9 +912,9 class Hub(SessionFactory):
909 912 try:
910 913 eid = msg['content']['id']
911 914 except:
912 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
913 916 return
914 self.log.info("registration::unregister_engine(%r)"%eid)
917 self.log.info("registration::unregister_engine(%r)", eid)
915 918 # print (eid)
916 919 uuid = self.keytable[eid]
917 920 content=dict(id=eid, queue=uuid.decode('ascii'))
@@ -958,7 +961,7 class Hub(SessionFactory):
958 961 try:
959 962 self.db.update_record(msg_id, rec)
960 963 except Exception:
961 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
962 965
963 966
964 967 def finish_registration(self, heart):
@@ -969,7 +972,7 class Hub(SessionFactory):
969 972 except KeyError:
970 973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
971 974 return
972 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
973 976 if purge is not None:
974 977 purge.stop()
975 978 control = queue
@@ -985,12 +988,12 class Hub(SessionFactory):
985 988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
986 989 if self.notifier:
987 990 self.session.send(self.notifier, "registration_notification", content=content)
988 self.log.info("engine::Engine Connected: %i"%eid)
991 self.log.info("engine::Engine Connected: %i", eid)
989 992
990 993 def _purge_stalled_registration(self, heart):
991 994 if heart in self.incoming_registrations:
992 995 eid = self.incoming_registrations.pop(heart)[0]
993 self.log.info("registration::purging stalled registration: %i"%eid)
996 self.log.info("registration::purging stalled registration: %i", eid)
994 997 else:
995 998 pass
996 999
@@ -1137,7 +1140,7 class Hub(SessionFactory):
1137 1140 elif invalid_ids:
1138 1141 msg_id = invalid_ids[0]
1139 1142 try:
1140 raise ValueError("Task %r appears to be inflight"%(msg_id))
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1141 1144 except Exception:
1142 1145 return finish(error.wrap_exception())
1143 1146
@@ -225,7 +225,7 class TaskScheduler(SessionFactory):
225 225 try:
226 226 handler(asbytes(msg['content']['queue']))
227 227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
228 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
229 229
230 230 def _register_engine(self, uid):
231 231 """New engine with ident `uid` became available."""
General Comments 0
You need to be logged in to leave comments. Login now