##// END OF EJS Templates
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
MinRK -
Show More
@@ -198,9 +198,6 b' class HubFactory(RegistrationFactory):'
198 def __init__(self, **kwargs):
198 def __init__(self, **kwargs):
199 super(HubFactory, self).__init__(**kwargs)
199 super(HubFactory, self).__init__(**kwargs)
200 self._update_monitor_url()
200 self._update_monitor_url()
201 # self.on_trait_change(self._sync_ips, 'ip')
202 # self.on_trait_change(self._sync_transports, 'transport')
203 # self.subconstructors.append(self.construct_hub)
204
201
205
202
206 def construct(self):
203 def construct(self):
@@ -449,34 +446,16 b' class Hub(LoggingFactory):'
449 # dispatch methods (1 per stream)
446 # dispatch methods (1 per stream)
450 #-----------------------------------------------------------------------------
447 #-----------------------------------------------------------------------------
451
448
452 # def dispatch_registration_request(self, msg):
453 # """"""
454 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
455 # idents,msg = self.session.feed_identities(msg)
456 # if not idents:
457 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
458 # return
459 # try:
460 # msg = self.session.unpack_message(msg,content=True)
461 # except:
462 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
463 # return
464 #
465 # msg_type = msg['msg_type']
466 # content = msg['content']
467 #
468 # handler = self.query_handlers.get(msg_type, None)
469 # if handler is None:
470 # self.log.error("registration::got bad registration message: %s"%msg)
471 # else:
472 # handler(idents, msg)
473
449
474 def dispatch_monitor_traffic(self, msg):
450 def dispatch_monitor_traffic(self, msg):
475 """all ME and Task queue messages come through here, as well as
451 """all ME and Task queue messages come through here, as well as
476 IOPub traffic."""
452 IOPub traffic."""
477 self.log.debug("monitor traffic: %r"%msg[:2])
453 self.log.debug("monitor traffic: %r"%msg[:2])
478 switch = msg[0]
454 switch = msg[0]
479 idents, msg = self.session.feed_identities(msg[1:])
455 try:
456 idents, msg = self.session.feed_identities(msg[1:])
457 except ValueError:
458 idents=[]
480 if not idents:
459 if not idents:
481 self.log.error("Bad Monitor Message: %r"%msg)
460 self.log.error("Bad Monitor Message: %r"%msg)
482 return
461 return
@@ -557,19 +536,19 b' class Hub(LoggingFactory):'
557
536
558 def save_queue_request(self, idents, msg):
537 def save_queue_request(self, idents, msg):
559 if len(idents) < 2:
538 if len(idents) < 2:
560 self.log.error("invalid identity prefix: %s"%idents)
539 self.log.error("invalid identity prefix: %r"%idents)
561 return
540 return
562 queue_id, client_id = idents[:2]
541 queue_id, client_id = idents[:2]
563 try:
542 try:
564 msg = self.session.unpack_message(msg, content=False)
543 msg = self.session.unpack_message(msg, content=False)
565 except:
544 except Exception:
566 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
545 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
567 return
546 return
568
547
569 eid = self.by_ident.get(queue_id, None)
548 eid = self.by_ident.get(queue_id, None)
570 if eid is None:
549 if eid is None:
571 self.log.error("queue::target %r not registered"%queue_id)
550 self.log.error("queue::target %r not registered"%queue_id)
572 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
551 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
573 return
552 return
574
553
575 header = msg['header']
554 header = msg['header']
@@ -597,21 +576,20 b' class Hub(LoggingFactory):'
597
576
598 def save_queue_result(self, idents, msg):
577 def save_queue_result(self, idents, msg):
599 if len(idents) < 2:
578 if len(idents) < 2:
600 self.log.error("invalid identity prefix: %s"%idents)
579 self.log.error("invalid identity prefix: %r"%idents)
601 return
580 return
602
581
603 client_id, queue_id = idents[:2]
582 client_id, queue_id = idents[:2]
604 try:
583 try:
605 msg = self.session.unpack_message(msg, content=False)
584 msg = self.session.unpack_message(msg, content=False)
606 except:
585 except Exception:
607 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
586 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
608 queue_id,client_id, msg), exc_info=True)
587 queue_id,client_id, msg), exc_info=True)
609 return
588 return
610
589
611 eid = self.by_ident.get(queue_id, None)
590 eid = self.by_ident.get(queue_id, None)
612 if eid is None:
591 if eid is None:
613 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
592 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
614 # self.log.debug("queue:: %s"%msg[2:])
615 return
593 return
616
594
617 parent = msg['parent_header']
595 parent = msg['parent_header']
@@ -626,7 +604,7 b' class Hub(LoggingFactory):'
626 elif msg_id not in self.all_completed:
604 elif msg_id not in self.all_completed:
627 # it could be a result from a dead engine that died before delivering the
605 # it could be a result from a dead engine that died before delivering the
628 # result
606 # result
629 self.log.warn("queue:: unknown msg finished %s"%msg_id)
607 self.log.warn("queue:: unknown msg finished %r"%msg_id)
630 return
608 return
631 # update record anyway, because the unregistration could have been premature
609 # update record anyway, because the unregistration could have been premature
632 rheader = msg['header']
610 rheader = msg['header']
@@ -656,8 +634,8 b' class Hub(LoggingFactory):'
656
634
657 try:
635 try:
658 msg = self.session.unpack_message(msg, content=False)
636 msg = self.session.unpack_message(msg, content=False)
659 except:
637 except Exception:
660 self.log.error("task::client %r sent invalid task message: %s"%(
638 self.log.error("task::client %r sent invalid task message: %r"%(
661 client_id, msg), exc_info=True)
639 client_id, msg), exc_info=True)
662 return
640 return
663 record = init_record(msg)
641 record = init_record(msg)
@@ -700,10 +678,9 b' class Hub(LoggingFactory):'
700 client_id = idents[0]
678 client_id = idents[0]
701 try:
679 try:
702 msg = self.session.unpack_message(msg, content=False)
680 msg = self.session.unpack_message(msg, content=False)
703 except:
681 except Exception:
704 self.log.error("task::invalid task result message send to %r: %s"%(
682 self.log.error("task::invalid task result message send to %r: %r"%(
705 client_id, msg), exc_info=True)
683 client_id, msg), exc_info=True)
706 raise
707 return
684 return
708
685
709 parent = msg['parent_header']
686 parent = msg['parent_header']
@@ -745,12 +722,12 b' class Hub(LoggingFactory):'
745 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
722 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
746
723
747 else:
724 else:
748 self.log.debug("task::unknown task %s finished"%msg_id)
725 self.log.debug("task::unknown task %r finished"%msg_id)
749
726
750 def save_task_destination(self, idents, msg):
727 def save_task_destination(self, idents, msg):
751 try:
728 try:
752 msg = self.session.unpack_message(msg, content=True)
729 msg = self.session.unpack_message(msg, content=True)
753 except:
730 except Exception:
754 self.log.error("task::invalid task tracking message", exc_info=True)
731 self.log.error("task::invalid task tracking message", exc_info=True)
755 return
732 return
756 content = msg['content']
733 content = msg['content']
@@ -759,11 +736,11 b' class Hub(LoggingFactory):'
759 engine_uuid = content['engine_id']
736 engine_uuid = content['engine_id']
760 eid = self.by_ident[engine_uuid]
737 eid = self.by_ident[engine_uuid]
761
738
762 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
739 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
763 if msg_id in self.unassigned:
740 if msg_id in self.unassigned:
764 self.unassigned.remove(msg_id)
741 self.unassigned.remove(msg_id)
765 # else:
742 # else:
766 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
743 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
767
744
768 self.tasks[eid].append(msg_id)
745 self.tasks[eid].append(msg_id)
769 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
746 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
@@ -787,13 +764,13 b' class Hub(LoggingFactory):'
787 # print (topics)
764 # print (topics)
788 try:
765 try:
789 msg = self.session.unpack_message(msg, content=True)
766 msg = self.session.unpack_message(msg, content=True)
790 except:
767 except Exception:
791 self.log.error("iopub::invalid IOPub message", exc_info=True)
768 self.log.error("iopub::invalid IOPub message", exc_info=True)
792 return
769 return
793
770
794 parent = msg['parent_header']
771 parent = msg['parent_header']
795 if not parent:
772 if not parent:
796 self.log.error("iopub::invalid IOPub message: %s"%msg)
773 self.log.error("iopub::invalid IOPub message: %r"%msg)
797 return
774 return
798 msg_id = parent['msg_id']
775 msg_id = parent['msg_id']
799 msg_type = msg['msg_type']
776 msg_type = msg['msg_type']
@@ -833,7 +810,7 b' class Hub(LoggingFactory):'
833
810
834 def connection_request(self, client_id, msg):
811 def connection_request(self, client_id, msg):
835 """Reply with connection addresses for clients."""
812 """Reply with connection addresses for clients."""
836 self.log.info("client::client %s connected"%client_id)
813 self.log.info("client::client %r connected"%client_id)
837 content = dict(status='ok')
814 content = dict(status='ok')
838 content.update(self.client_info)
815 content.update(self.client_info)
839 jsonable = {}
816 jsonable = {}
@@ -905,7 +882,7 b' class Hub(LoggingFactory):'
905 dc.start()
882 dc.start()
906 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
883 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
907 else:
884 else:
908 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
885 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
909 return eid
886 return eid
910
887
911 def unregister_engine(self, ident, msg):
888 def unregister_engine(self, ident, msg):
@@ -913,9 +890,9 b' class Hub(LoggingFactory):'
913 try:
890 try:
914 eid = msg['content']['id']
891 eid = msg['content']['id']
915 except:
892 except:
916 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
893 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
917 return
894 return
918 self.log.info("registration::unregister_engine(%s)"%eid)
895 self.log.info("registration::unregister_engine(%r)"%eid)
919 # print (eid)
896 # print (eid)
920 uuid = self.keytable[eid]
897 uuid = self.keytable[eid]
921 content=dict(id=eid, queue=uuid)
898 content=dict(id=eid, queue=uuid)
@@ -1135,7 +1112,7 b' class Hub(LoggingFactory):'
1135 elif len(records) < len(msg_ids):
1112 elif len(records) < len(msg_ids):
1136 missing = [ m for m in msg_ids if m not in found_ids ]
1113 missing = [ m for m in msg_ids if m not in found_ids ]
1137 try:
1114 try:
1138 raise KeyError("No such msg(s): %s"%missing)
1115 raise KeyError("No such msg(s): %r"%missing)
1139 except KeyError:
1116 except KeyError:
1140 return finish(error.wrap_exception())
1117 return finish(error.wrap_exception())
1141 elif invalid_ids:
1118 elif invalid_ids:
@@ -261,7 +261,6 b' class TaskScheduler(SessionFactory):'
261 continue
261 continue
262
262
263 raw_msg = lost[msg_id][0]
263 raw_msg = lost[msg_id][0]
264
265 idents,msg = self.session.feed_identities(raw_msg, copy=False)
264 idents,msg = self.session.feed_identities(raw_msg, copy=False)
266 msg = self.session.unpack_message(msg, copy=False, content=False)
265 msg = self.session.unpack_message(msg, copy=False, content=False)
267 parent = msg['header']
266 parent = msg['header']
@@ -294,9 +293,10 b' class TaskScheduler(SessionFactory):'
294 idents, msg = self.session.feed_identities(raw_msg, copy=False)
293 idents, msg = self.session.feed_identities(raw_msg, copy=False)
295 msg = self.session.unpack_message(msg, content=False, copy=False)
294 msg = self.session.unpack_message(msg, content=False, copy=False)
296 except Exception:
295 except Exception:
297 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
296 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
298 return
297 return
299
298
299
300 # send to monitor
300 # send to monitor
301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
302
302
@@ -497,7 +497,7 b' class TaskScheduler(SessionFactory):'
497 else:
497 else:
498 self.finish_job(idx)
498 self.finish_job(idx)
499 except Exception:
499 except Exception:
500 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
500 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
501 return
501 return
502
502
503 header = msg['header']
503 header = msg['header']
General Comments 0
You need to be logged in to leave comments. Login now