##// END OF EJS Templates
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
MinRK -
Show More
@@ -198,9 +198,6 b' class HubFactory(RegistrationFactory):'
198 198 def __init__(self, **kwargs):
199 199 super(HubFactory, self).__init__(**kwargs)
200 200 self._update_monitor_url()
201 # self.on_trait_change(self._sync_ips, 'ip')
202 # self.on_trait_change(self._sync_transports, 'transport')
203 # self.subconstructors.append(self.construct_hub)
204 201
205 202
206 203 def construct(self):
@@ -449,34 +446,16 b' class Hub(LoggingFactory):'
449 446 # dispatch methods (1 per stream)
450 447 #-----------------------------------------------------------------------------
451 448
452 # def dispatch_registration_request(self, msg):
453 # """"""
454 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
455 # idents,msg = self.session.feed_identities(msg)
456 # if not idents:
457 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
458 # return
459 # try:
460 # msg = self.session.unpack_message(msg,content=True)
461 # except:
462 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
463 # return
464 #
465 # msg_type = msg['msg_type']
466 # content = msg['content']
467 #
468 # handler = self.query_handlers.get(msg_type, None)
469 # if handler is None:
470 # self.log.error("registration::got bad registration message: %s"%msg)
471 # else:
472 # handler(idents, msg)
473 449
474 450 def dispatch_monitor_traffic(self, msg):
475 451 """all ME and Task queue messages come through here, as well as
476 452 IOPub traffic."""
477 453 self.log.debug("monitor traffic: %r"%msg[:2])
478 454 switch = msg[0]
479 idents, msg = self.session.feed_identities(msg[1:])
455 try:
456 idents, msg = self.session.feed_identities(msg[1:])
457 except ValueError:
458 idents=[]
480 459 if not idents:
481 460 self.log.error("Bad Monitor Message: %r"%msg)
482 461 return
@@ -557,19 +536,19 b' class Hub(LoggingFactory):'
557 536
558 537 def save_queue_request(self, idents, msg):
559 538 if len(idents) < 2:
560 self.log.error("invalid identity prefix: %s"%idents)
539 self.log.error("invalid identity prefix: %r"%idents)
561 540 return
562 541 queue_id, client_id = idents[:2]
563 542 try:
564 543 msg = self.session.unpack_message(msg, content=False)
565 except:
566 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
544 except Exception:
545 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
567 546 return
568 547
569 548 eid = self.by_ident.get(queue_id, None)
570 549 if eid is None:
571 550 self.log.error("queue::target %r not registered"%queue_id)
572 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
551 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
573 552 return
574 553
575 554 header = msg['header']
@@ -597,21 +576,20 b' class Hub(LoggingFactory):'
597 576
598 577 def save_queue_result(self, idents, msg):
599 578 if len(idents) < 2:
600 self.log.error("invalid identity prefix: %s"%idents)
579 self.log.error("invalid identity prefix: %r"%idents)
601 580 return
602 581
603 582 client_id, queue_id = idents[:2]
604 583 try:
605 584 msg = self.session.unpack_message(msg, content=False)
606 except:
607 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
585 except Exception:
586 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
608 587 queue_id,client_id, msg), exc_info=True)
609 588 return
610 589
611 590 eid = self.by_ident.get(queue_id, None)
612 591 if eid is None:
613 592 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
614 # self.log.debug("queue:: %s"%msg[2:])
615 593 return
616 594
617 595 parent = msg['parent_header']
@@ -626,7 +604,7 b' class Hub(LoggingFactory):'
626 604 elif msg_id not in self.all_completed:
627 605 # it could be a result from a dead engine that died before delivering the
628 606 # result
629 self.log.warn("queue:: unknown msg finished %s"%msg_id)
607 self.log.warn("queue:: unknown msg finished %r"%msg_id)
630 608 return
631 609 # update record anyway, because the unregistration could have been premature
632 610 rheader = msg['header']
@@ -656,8 +634,8 b' class Hub(LoggingFactory):'
656 634
657 635 try:
658 636 msg = self.session.unpack_message(msg, content=False)
659 except:
660 self.log.error("task::client %r sent invalid task message: %s"%(
637 except Exception:
638 self.log.error("task::client %r sent invalid task message: %r"%(
661 639 client_id, msg), exc_info=True)
662 640 return
663 641 record = init_record(msg)
@@ -700,10 +678,9 b' class Hub(LoggingFactory):'
700 678 client_id = idents[0]
701 679 try:
702 680 msg = self.session.unpack_message(msg, content=False)
703 except:
704 self.log.error("task::invalid task result message send to %r: %s"%(
681 except Exception:
682 self.log.error("task::invalid task result message send to %r: %r"%(
705 683 client_id, msg), exc_info=True)
706 raise
707 684 return
708 685
709 686 parent = msg['parent_header']
@@ -745,12 +722,12 b' class Hub(LoggingFactory):'
745 722 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
746 723
747 724 else:
748 self.log.debug("task::unknown task %s finished"%msg_id)
725 self.log.debug("task::unknown task %r finished"%msg_id)
749 726
750 727 def save_task_destination(self, idents, msg):
751 728 try:
752 729 msg = self.session.unpack_message(msg, content=True)
753 except:
730 except Exception:
754 731 self.log.error("task::invalid task tracking message", exc_info=True)
755 732 return
756 733 content = msg['content']
@@ -759,11 +736,11 b' class Hub(LoggingFactory):'
759 736 engine_uuid = content['engine_id']
760 737 eid = self.by_ident[engine_uuid]
761 738
762 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
739 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
763 740 if msg_id in self.unassigned:
764 741 self.unassigned.remove(msg_id)
765 742 # else:
766 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
743 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
767 744
768 745 self.tasks[eid].append(msg_id)
769 746 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
@@ -787,13 +764,13 b' class Hub(LoggingFactory):'
787 764 # print (topics)
788 765 try:
789 766 msg = self.session.unpack_message(msg, content=True)
790 except:
767 except Exception:
791 768 self.log.error("iopub::invalid IOPub message", exc_info=True)
792 769 return
793 770
794 771 parent = msg['parent_header']
795 772 if not parent:
796 self.log.error("iopub::invalid IOPub message: %s"%msg)
773 self.log.error("iopub::invalid IOPub message: %r"%msg)
797 774 return
798 775 msg_id = parent['msg_id']
799 776 msg_type = msg['msg_type']
@@ -833,7 +810,7 b' class Hub(LoggingFactory):'
833 810
834 811 def connection_request(self, client_id, msg):
835 812 """Reply with connection addresses for clients."""
836 self.log.info("client::client %s connected"%client_id)
813 self.log.info("client::client %r connected"%client_id)
837 814 content = dict(status='ok')
838 815 content.update(self.client_info)
839 816 jsonable = {}
@@ -905,7 +882,7 b' class Hub(LoggingFactory):'
905 882 dc.start()
906 883 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
907 884 else:
908 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
885 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
909 886 return eid
910 887
911 888 def unregister_engine(self, ident, msg):
@@ -913,9 +890,9 b' class Hub(LoggingFactory):'
913 890 try:
914 891 eid = msg['content']['id']
915 892 except:
916 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
893 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
917 894 return
918 self.log.info("registration::unregister_engine(%s)"%eid)
895 self.log.info("registration::unregister_engine(%r)"%eid)
919 896 # print (eid)
920 897 uuid = self.keytable[eid]
921 898 content=dict(id=eid, queue=uuid)
@@ -1135,7 +1112,7 b' class Hub(LoggingFactory):'
1135 1112 elif len(records) < len(msg_ids):
1136 1113 missing = [ m for m in msg_ids if m not in found_ids ]
1137 1114 try:
1138 raise KeyError("No such msg(s): %s"%missing)
1115 raise KeyError("No such msg(s): %r"%missing)
1139 1116 except KeyError:
1140 1117 return finish(error.wrap_exception())
1141 1118 elif invalid_ids:
@@ -261,7 +261,6 b' class TaskScheduler(SessionFactory):'
261 261 continue
262 262
263 263 raw_msg = lost[msg_id][0]
264
265 264 idents,msg = self.session.feed_identities(raw_msg, copy=False)
266 265 msg = self.session.unpack_message(msg, copy=False, content=False)
267 266 parent = msg['header']
@@ -294,9 +293,10 b' class TaskScheduler(SessionFactory):'
294 293 idents, msg = self.session.feed_identities(raw_msg, copy=False)
295 294 msg = self.session.unpack_message(msg, content=False, copy=False)
296 295 except Exception:
297 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
296 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
298 297 return
299 298
299
300 300 # send to monitor
301 301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
302 302
@@ -497,7 +497,7 b' class TaskScheduler(SessionFactory):'
497 497 else:
498 498 self.finish_job(idx)
499 499 except Exception:
500 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
500 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
501 501 return
502 502
503 503 header = msg['header']
General Comments 0
You need to be logged in to leave comments. Login now