From 26fbd2077dcdef4bfbca07b797ccab536f481a71 2011-06-20 23:40:16 From: MinRK Date: 2011-06-20 23:40:16 Subject: [PATCH] cleanup Hub/Scheduler to prevent '%s'% errors --- diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 5b3bdaa..24a5773 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -198,9 +198,6 @@ class HubFactory(RegistrationFactory): def __init__(self, **kwargs): super(HubFactory, self).__init__(**kwargs) self._update_monitor_url() - # self.on_trait_change(self._sync_ips, 'ip') - # self.on_trait_change(self._sync_transports, 'transport') - # self.subconstructors.append(self.construct_hub) def construct(self): @@ -449,34 +446,16 @@ class Hub(LoggingFactory): # dispatch methods (1 per stream) #----------------------------------------------------------------------------- - # def dispatch_registration_request(self, msg): - # """""" - # self.log.debug("registration::dispatch_register_request(%s)"%msg) - # idents,msg = self.session.feed_identities(msg) - # if not idents: - # self.log.error("Bad Query Message: %s"%msg, exc_info=True) - # return - # try: - # msg = self.session.unpack_message(msg,content=True) - # except: - # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) - # return - # - # msg_type = msg['msg_type'] - # content = msg['content'] - # - # handler = self.query_handlers.get(msg_type, None) - # if handler is None: - # self.log.error("registration::got bad registration message: %s"%msg) - # else: - # handler(idents, msg) def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" self.log.debug("monitor traffic: %r"%msg[:2]) switch = msg[0] - idents, msg = self.session.feed_identities(msg[1:]) + try: + idents, msg = self.session.feed_identities(msg[1:]) + except ValueError: + idents=[] if not idents: self.log.error("Bad Monitor Message: %r"%msg) return @@ -557,19 +536,19 @@ class Hub(LoggingFactory): def save_queue_request(self, idents, msg): if len(idents) < 2: - self.log.error("invalid identity prefix: %s"%idents) + self.log.error("invalid identity prefix: %r"%idents) return queue_id, client_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) - except: - self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True) + except Exception: + self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: self.log.error("queue::target %r not registered"%queue_id) - self.log.debug("queue:: valid are: %s"%(self.by_ident.keys())) + self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) return header = msg['header'] @@ -597,21 +576,20 @@ class Hub(LoggingFactory): def save_queue_result(self, idents, msg): if len(idents) < 2: - self.log.error("invalid identity prefix: %s"%idents) + self.log.error("invalid identity prefix: %r"%idents) return client_id, queue_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) - except: - self.log.error("queue::engine %r sent invalid message to %r: %s"%( + except Exception: + self.log.error("queue::engine %r sent invalid message to %r: %r"%( queue_id,client_id, msg), exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) - # self.log.debug("queue:: %s"%msg[2:]) return parent = msg['parent_header'] @@ -626,7 +604,7 @@ class Hub(LoggingFactory): elif msg_id not in self.all_completed: # it could be a result from a dead engine that died before delivering the # result - self.log.warn("queue:: unknown msg finished %s"%msg_id) + self.log.warn("queue:: unknown msg finished %r"%msg_id) return # update record anyway, because the unregistration could have been premature rheader = msg['header'] @@ -656,8 +634,8 @@ class Hub(LoggingFactory): try: msg = self.session.unpack_message(msg, content=False) - except: - self.log.error("task::client %r sent invalid task message: %s"%( + except Exception: + self.log.error("task::client %r sent invalid task message: %r"%( client_id, msg), exc_info=True) return record = init_record(msg) @@ -700,10 +678,9 @@ class Hub(LoggingFactory): client_id = idents[0] try: msg = self.session.unpack_message(msg, content=False) - except: - self.log.error("task::invalid task result message send to %r: %s"%( + except Exception: + self.log.error("task::invalid task result message send to %r: %r"%( client_id, msg), exc_info=True) - raise return parent = msg['parent_header'] @@ -745,12 +722,12 @@ class Hub(LoggingFactory): self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) else: - self.log.debug("task::unknown task %s finished"%msg_id) + self.log.debug("task::unknown task %r finished"%msg_id) def save_task_destination(self, idents, msg): try: msg = self.session.unpack_message(msg, content=True) - except: + except Exception: self.log.error("task::invalid task tracking message", exc_info=True) return content = msg['content'] @@ -759,11 +736,11 @@ class Hub(LoggingFactory): engine_uuid = content['engine_id'] eid = self.by_ident[engine_uuid] - self.log.info("task::task %s arrived on %s"%(msg_id, eid)) + self.log.info("task::task %r arrived on %r"%(msg_id, eid)) if msg_id in self.unassigned: self.unassigned.remove(msg_id) # else: - # self.log.debug("task::task %s not listed as MIA?!"%(msg_id)) + # self.log.debug("task::task %r not listed as MIA?!"%(msg_id)) self.tasks[eid].append(msg_id) # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) @@ -787,13 +764,13 @@ class Hub(LoggingFactory): # print (topics) try: msg = self.session.unpack_message(msg, content=True) - except: + except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return parent = msg['parent_header'] if not parent: - self.log.error("iopub::invalid IOPub message: %s"%msg) + self.log.error("iopub::invalid IOPub message: %r"%msg) return msg_id = parent['msg_id'] msg_type = msg['msg_type'] @@ -833,7 +810,7 @@ class Hub(LoggingFactory): def connection_request(self, client_id, msg): """Reply with connection addresses for clients.""" - self.log.info("client::client %s connected"%client_id) + self.log.info("client::client %r connected"%client_id) content = dict(status='ok') content.update(self.client_info) jsonable = {} @@ -905,7 +882,7 @@ class Hub(LoggingFactory): dc.start() self.incoming_registrations[heart] = (eid,queue,reg[0],dc) else: - self.log.error("registration::registration %i failed: %s"%(eid, content['evalue'])) + self.log.error("registration::registration %i failed: %r"%(eid, content['evalue'])) return eid def unregister_engine(self, ident, msg): @@ -913,9 +890,9 @@ class Hub(LoggingFactory): try: eid = msg['content']['id'] except: - self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) + self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True) return - self.log.info("registration::unregister_engine(%s)"%eid) + self.log.info("registration::unregister_engine(%r)"%eid) # print (eid) uuid = self.keytable[eid] content=dict(id=eid, queue=uuid) @@ -1135,7 +1112,7 @@ class Hub(LoggingFactory): elif len(records) < len(msg_ids): missing = [ m for m in msg_ids if m not in found_ids ] try: - raise KeyError("No such msg(s): %s"%missing) + raise KeyError("No such msg(s): %r"%missing) except KeyError: return finish(error.wrap_exception()) elif invalid_ids: diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 6fc6a8c..28f0d42 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -261,7 +261,6 @@ class TaskScheduler(SessionFactory): continue raw_msg = lost[msg_id][0] - idents,msg = self.session.feed_identities(raw_msg, copy=False) msg = self.session.unpack_message(msg, copy=False, content=False) parent = msg['header'] @@ -294,9 +293,10 @@ class TaskScheduler(SessionFactory): idents, msg = self.session.feed_identities(raw_msg, copy=False) msg = self.session.unpack_message(msg, content=False, copy=False) except Exception: - self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True) + self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) return + # send to monitor self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) @@ -497,7 +497,7 @@ class TaskScheduler(SessionFactory): else: self.finish_job(idx) except Exception: - self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) + self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True) return header = msg['header']