From d7d4f70ea993f41483a28934469289a000a54822 2011-12-12 21:46:47 From: MinRK Date: 2011-12-12 21:46:47 Subject: [PATCH] minor controller logging adjustments * add debug message when heartbeats go out * don't call unregister_engine more than once for a failed heart * adjust logging calls, to avoid formatting unused messages --- diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index c1db4f0..875f6d8 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -53,6 +53,7 @@ class Heart(object): def start(self): return self.device.start() + class HeartMonitor(LoggingConfigurable): """A basic HeartMonitor class pingstream: a PUB stream @@ -92,12 +93,12 @@ class HeartMonitor(LoggingConfigurable): def add_new_heart_handler(self, handler): """add a new handler for new hearts""" - self.log.debug("heartbeat::new_heart_handler: %s"%handler) + self.log.debug("heartbeat::new_heart_handler: %s", handler) self._new_handlers.add(handler) def add_heart_failure_handler(self, handler): """add a new handler for heart failure""" - self.log.debug("heartbeat::new heart failure handler: %s"%handler) + self.log.debug("heartbeat::new heart failure handler: %s", handler) self._failure_handlers.add(handler) def beat(self): @@ -107,7 +108,7 @@ class HeartMonitor(LoggingConfigurable): toc = time.time() self.lifetime += toc-self.tic self.tic = toc - # self.log.debug("heartbeat::%s"%self.lifetime) + self.log.debug("heartbeat::sending %s", self.lifetime) goodhearts = self.hearts.intersection(self.responses) missed_beats = self.hearts.difference(goodhearts) heartfailures = self.on_probation.intersection(missed_beats) @@ -117,7 +118,7 @@ class HeartMonitor(LoggingConfigurable): self.on_probation = missed_beats.intersection(self.hearts) self.responses = set() # print self.on_probation, self.hearts - # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) + # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) self.pingstream.send(asbytes(str(self.lifetime))) def handle_new_heart(self, heart): @@ -125,7 +126,7 @@ class HeartMonitor(LoggingConfigurable): for handler in self._new_handlers: handler(heart) else: - self.log.info("heartbeat::yay, got new heart %s!"%heart) + self.log.info("heartbeat::yay, got new heart %s!", heart) self.hearts.add(heart) def handle_heart_failure(self, heart): @@ -134,10 +135,10 @@ class HeartMonitor(LoggingConfigurable): try: handler(heart) except Exception as e: - self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) + self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True) pass else: - self.log.info("heartbeat::Heart %s failed :("%heart) + self.log.info("heartbeat::Heart %s failed :(", heart) self.hearts.remove(heart) @@ -151,11 +152,10 @@ class HeartMonitor(LoggingConfigurable): self.responses.add(msg[0]) elif msg[1] == last: delta = time.time()-self.tic + (self.lifetime-self.last_ping) - self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) + self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta) self.responses.add(msg[0]) else: - self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% - (msg[1],self.lifetime)) + self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime) if __name__ == '__main__': diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 0868616..f557214 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -192,7 +192,7 @@ class HubFactory(RegistrationFactory): self._update_monitor_url() def _update_monitor_url(self): - self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port) + self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port) def _transport_changed(self, name, old, new): self.engine_transport = new @@ -214,8 +214,8 @@ class HubFactory(RegistrationFactory): def init_hub(self): """construct""" - client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i" - engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i" + client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i" + engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i" ctx = self.context loop = self.loop @@ -223,10 +223,10 @@ class HubFactory(RegistrationFactory): # Registrar socket q = ZMQStream(ctx.socket(zmq.ROUTER), loop) q.bind(client_iface % self.regport) - self.log.info("Hub listening on %s for registration."%(client_iface%self.regport)) + self.log.info("Hub listening on %s for registration.", client_iface % self.regport) if self.client_ip != self.engine_ip: q.bind(engine_iface % self.regport) - self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport)) + self.log.info("Hub listening on %s for registration.", engine_iface % self.regport) ### Engine connections ### @@ -282,8 +282,8 @@ class HubFactory(RegistrationFactory): 'iopub' : client_iface%self.iopub[0], 'notification': client_iface%self.notifier_port } - self.log.debug("Hub engine addrs: %s"%self.engine_info) - self.log.debug("Hub client addrs: %s"%self.client_info) + self.log.debug("Hub engine addrs: %s", self.engine_info) + self.log.debug("Hub client addrs: %s", self.client_info) # resubmit stream r = ZMQStream(ctx.socket(zmq.DEALER), loop) @@ -444,7 +444,7 @@ class Hub(SessionFactory): targets = _targets bad_targets = [ t for t in targets if t not in self.ids ] if bad_targets: - raise IndexError("No Such Engine: %r"%bad_targets) + raise IndexError("No Such Engine: %r" % bad_targets) if not targets: raise IndexError("No Engines Registered") return targets @@ -457,20 +457,20 @@ class Hub(SessionFactory): def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" - self.log.debug("monitor traffic: %r"%msg[:2]) + self.log.debug("monitor traffic: %r", msg[:2]) switch = msg[0] try: idents, msg = self.session.feed_identities(msg[1:]) except ValueError: idents=[] if not idents: - self.log.error("Bad Monitor Message: %r"%msg) + self.log.error("Bad Monitor Message: %r", msg) return handler = self.monitor_handlers.get(switch, None) if handler is not None: handler(idents, msg) else: - self.log.error("Invalid monitor topic: %r"%switch) + self.log.error("Invalid monitor topic: %r", switch) def dispatch_query(self, msg): @@ -480,27 +480,27 @@ class Hub(SessionFactory): except ValueError: idents = [] if not idents: - self.log.error("Bad Query Message: %r"%msg) + self.log.error("Bad Query Message: %r", msg) return client_id = idents[0] try: msg = self.session.unserialize(msg, content=True) except Exception: content = error.wrap_exception() - self.log.error("Bad Query Message: %r"%msg, exc_info=True) + self.log.error("Bad Query Message: %r", msg, exc_info=True) self.session.send(self.query, "hub_error", ident=client_id, content=content) return # print client_id, header, parent, content #switch on message type: msg_type = msg['header']['msg_type'] - self.log.info("client::client %r requested %r"%(client_id, msg_type)) + self.log.info("client::client %r requested %r", client_id, msg_type) handler = self.query_handlers.get(msg_type, None) try: - assert handler is not None, "Bad Message Type: %r"%msg_type + assert handler is not None, "Bad Message Type: %r" % msg_type except: content = error.wrap_exception() - self.log.error("Bad Message Type: %r"%msg_type, exc_info=True) + self.log.error("Bad Message Type: %r", msg_type, exc_info=True) self.session.send(self.query, "hub_error", ident=client_id, content=content) return @@ -522,9 +522,9 @@ class Hub(SessionFactory): """handler to attach to heartbeater. Called when a new heart starts to beat. Triggers completion of registration.""" - self.log.debug("heartbeat::handle_new_heart(%r)"%heart) + self.log.debug("heartbeat::handle_new_heart(%r)", heart) if heart not in self.incoming_registrations: - self.log.info("heartbeat::ignoring new heart: %r"%heart) + self.log.info("heartbeat::ignoring new heart: %r", heart) else: self.finish_registration(heart) @@ -533,11 +533,11 @@ class Hub(SessionFactory): """handler to attach to heartbeater. called when a previously registered heart fails to respond to beat request. triggers unregistration""" - self.log.debug("heartbeat::handle_heart_failure(%r)"%heart) + self.log.debug("heartbeat::handle_heart_failure(%r)", heart) eid = self.hearts.get(heart, None) queue = self.engines[eid].queue - if eid is None: - self.log.info("heartbeat::ignoring heart failure %r"%heart) + if eid is None or self.keytable[eid] in self.dead_engines: + self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) else: self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) @@ -545,22 +545,23 @@ class Hub(SessionFactory): def save_queue_request(self, idents, msg): if len(idents) < 2: - self.log.error("invalid identity prefix: %r"%idents) + self.log.error("invalid identity prefix: %r", idents) return queue_id, client_id = idents[:2] try: msg = self.session.unserialize(msg) except Exception: - self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) + self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: - self.log.error("queue::target %r not registered"%queue_id) - self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) + self.log.error("queue::target %r not registered", queue_id) + self.log.debug("queue:: valid are: %r", self.by_ident.keys()) return record = init_record(msg) msg_id = record['msg_id'] + self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) # Unicode in records record['engine_uuid'] = queue_id.decode('ascii') record['client_uuid'] = client_id.decode('ascii') @@ -572,18 +573,18 @@ class Hub(SessionFactory): for key,evalue in existing.iteritems(): rvalue = record.get(key, None) if evalue and rvalue and evalue != rvalue: - self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue)) + self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue) elif evalue and not rvalue: record[key] = evalue try: self.db.update_record(msg_id, record) except Exception: - self.log.error("DB Error updating record %r"%msg_id, exc_info=True) + self.log.error("DB Error updating record %r", msg_id, exc_info=True) except KeyError: try: self.db.add_record(msg_id, record) except Exception: - self.log.error("DB Error adding record %r"%msg_id, exc_info=True) + self.log.error("DB Error adding record %r", msg_id, exc_info=True) self.pending.add(msg_id) @@ -591,20 +592,20 @@ class Hub(SessionFactory): def save_queue_result(self, idents, msg): if len(idents) < 2: - self.log.error("invalid identity prefix: %r"%idents) + self.log.error("invalid identity prefix: %r", idents) return client_id, queue_id = idents[:2] try: msg = self.session.unserialize(msg) except Exception: - self.log.error("queue::engine %r sent invalid message to %r: %r"%( - queue_id,client_id, msg), exc_info=True) + self.log.error("queue::engine %r sent invalid message to %r: %r", + queue_id, client_id, msg, exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: - self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) + self.log.error("queue::unknown engine %r is sending a reply: ", queue_id) return parent = msg['parent_header'] @@ -616,10 +617,11 @@ class Hub(SessionFactory): self.all_completed.add(msg_id) self.queues[eid].remove(msg_id) self.completed[eid].append(msg_id) + self.log.info("queue::request %r completed on %s", msg_id, eid) elif msg_id not in self.all_completed: # it could be a result from a dead engine that died before delivering the # result - self.log.warn("queue:: unknown msg finished %r"%msg_id) + self.log.warn("queue:: unknown msg finished %r", msg_id) return # update record anyway, because the unregistration could have been premature rheader = msg['header'] @@ -636,7 +638,7 @@ class Hub(SessionFactory): try: self.db.update_record(msg_id, result) except Exception: - self.log.error("DB Error updating record %r"%msg_id, exc_info=True) + self.log.error("DB Error updating record %r", msg_id, exc_info=True) #--------------------- Task Queue Traffic ------------------------------ @@ -648,8 +650,8 @@ class Hub(SessionFactory): try: msg = self.session.unserialize(msg) except Exception: - self.log.error("task::client %r sent invalid task message: %r"%( - client_id, msg), exc_info=True) + self.log.error("task::client %r sent invalid task message: %r", + client_id, msg, exc_info=True) return record = init_record(msg) @@ -677,20 +679,20 @@ class Hub(SessionFactory): continue rvalue = record.get(key, None) if evalue and rvalue and evalue != rvalue: - self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue)) + self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue) elif evalue and not rvalue: record[key] = evalue try: self.db.update_record(msg_id, record) except Exception: - self.log.error("DB Error updating record %r"%msg_id, exc_info=True) + self.log.error("DB Error updating record %r", msg_id, exc_info=True) except KeyError: try: self.db.add_record(msg_id, record) except Exception: - self.log.error("DB Error adding record %r"%msg_id, exc_info=True) + self.log.error("DB Error adding record %r", msg_id, exc_info=True) except Exception: - self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) + self.log.error("DB Error saving task request %r", msg_id, exc_info=True) def save_task_result(self, idents, msg): """save the result of a completed task.""" @@ -698,14 +700,14 @@ class Hub(SessionFactory): try: msg = self.session.unserialize(msg) except Exception: - self.log.error("task::invalid task result message send to %r: %r"%( - client_id, msg), exc_info=True) + self.log.error("task::invalid task result message send to %r: %r", + client_id, msg, exc_info=True) return parent = msg['parent_header'] if not parent: # print msg - self.log.warn("Task %r had no parent!"%msg) + self.log.warn("Task %r had no parent!", msg) return msg_id = parent['msg_id'] if msg_id in self.unassigned: @@ -716,6 +718,7 @@ class Hub(SessionFactory): eid = self.by_ident.get(engine_uuid, None) if msg_id in self.pending: + self.log.info("task::task %r finished on %s", msg_id, eid) self.pending.remove(msg_id) self.all_completed.add(msg_id) if eid is not None: @@ -736,10 +739,10 @@ class Hub(SessionFactory): try: self.db.update_record(msg_id, result) except Exception: - self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) + self.log.error("DB Error saving task request %r", msg_id, exc_info=True) else: - self.log.debug("task::unknown task %r finished"%msg_id) + self.log.debug("task::unknown task %r finished", msg_id) def save_task_destination(self, idents, msg): try: @@ -753,7 +756,7 @@ class Hub(SessionFactory): engine_uuid = content['engine_id'] eid = self.by_ident[util.asbytes(engine_uuid)] - self.log.info("task::task %r arrived on %r"%(msg_id, eid)) + self.log.info("task::task %r arrived on %r", msg_id, eid) if msg_id in self.unassigned: self.unassigned.remove(msg_id) # else: @@ -764,7 +767,7 @@ class Hub(SessionFactory): try: self.db.update_record(msg_id, dict(engine_uuid=engine_uuid)) except Exception: - self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True) + self.log.error("DB Error saving task destination %r", msg_id, exc_info=True) def mia_task_request(self, idents, msg): @@ -787,7 +790,7 @@ class Hub(SessionFactory): parent = msg['parent_header'] if not parent: - self.log.error("iopub::invalid IOPub message: %r"%msg) + self.log.error("iopub::invalid IOPub message: %r", msg) return msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] @@ -817,7 +820,7 @@ class Hub(SessionFactory): try: self.db.update_record(msg_id, d) except Exception: - self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True) + self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True) @@ -827,7 +830,7 @@ class Hub(SessionFactory): def connection_request(self, client_id, msg): """Reply with connection addresses for clients.""" - self.log.info("client::client %r connected"%client_id) + self.log.info("client::client %r connected", client_id) content = dict(status='ok') content.update(self.client_info) jsonable = {} @@ -852,37 +855,37 @@ class Hub(SessionFactory): eid = self._next_id # print (eid, queue, reg, heart) - self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) + self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart) content = dict(id=eid,status='ok') content.update(self.engine_info) # check if requesting available IDs: if queue in self.by_ident: try: - raise KeyError("queue_id %r in use"%queue) + raise KeyError("queue_id %r in use" % queue) except: content = error.wrap_exception() - self.log.error("queue_id %r in use"%queue, exc_info=True) + self.log.error("queue_id %r in use", queue, exc_info=True) elif heart in self.hearts: # need to check unique hearts? try: - raise KeyError("heart_id %r in use"%heart) + raise KeyError("heart_id %r in use" % heart) except: - self.log.error("heart_id %r in use"%heart, exc_info=True) + self.log.error("heart_id %r in use", heart, exc_info=True) content = error.wrap_exception() else: for h, pack in self.incoming_registrations.iteritems(): if heart == h: try: - raise KeyError("heart_id %r in use"%heart) + raise KeyError("heart_id %r in use" % heart) except: - self.log.error("heart_id %r in use"%heart, exc_info=True) + self.log.error("heart_id %r in use", heart, exc_info=True) content = error.wrap_exception() break elif queue == pack[1]: try: - raise KeyError("queue_id %r in use"%queue) + raise KeyError("queue_id %r in use" % queue) except: - self.log.error("queue_id %r in use"%queue, exc_info=True) + self.log.error("queue_id %r in use", queue, exc_info=True) content = error.wrap_exception() break @@ -901,7 +904,7 @@ class Hub(SessionFactory): dc.start() self.incoming_registrations[heart] = (eid,queue,reg[0],dc) else: - self.log.error("registration::registration %i failed: %r"%(eid, content['evalue'])) + self.log.error("registration::registration %i failed: %r", eid, content['evalue']) return eid def unregister_engine(self, ident, msg): @@ -909,9 +912,9 @@ class Hub(SessionFactory): try: eid = msg['content']['id'] except: - self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True) + self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True) return - self.log.info("registration::unregister_engine(%r)"%eid) + self.log.info("registration::unregister_engine(%r)", eid) # print (eid) uuid = self.keytable[eid] content=dict(id=eid, queue=uuid.decode('ascii')) @@ -945,7 +948,7 @@ class Hub(SessionFactory): self.pending.remove(msg_id) self.all_completed.add(msg_id) try: - raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id)) + raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id)) except: content = error.wrap_exception() # build a fake header: @@ -958,7 +961,7 @@ class Hub(SessionFactory): try: self.db.update_record(msg_id, rec) except Exception: - self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True) + self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True) def finish_registration(self, heart): @@ -969,7 +972,7 @@ class Hub(SessionFactory): except KeyError: self.log.error("registration::tried to finish nonexistant registration", exc_info=True) return - self.log.info("registration::finished registering engine %i:%r"%(eid,queue)) + self.log.info("registration::finished registering engine %i:%r", eid, queue) if purge is not None: purge.stop() control = queue @@ -985,12 +988,12 @@ class Hub(SessionFactory): content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii')) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) - self.log.info("engine::Engine Connected: %i"%eid) + self.log.info("engine::Engine Connected: %i", eid) def _purge_stalled_registration(self, heart): if heart in self.incoming_registrations: eid = self.incoming_registrations.pop(heart)[0] - self.log.info("registration::purging stalled registration: %i"%eid) + self.log.info("registration::purging stalled registration: %i", eid) else: pass @@ -1077,7 +1080,7 @@ class Hub(SessionFactory): pending = filter(lambda m: m in self.pending, msg_ids) if pending: try: - raise IndexError("msg pending: %r"%pending[0]) + raise IndexError("msg pending: %r" % pending[0]) except: reply = error.wrap_exception() else: @@ -1091,7 +1094,7 @@ class Hub(SessionFactory): for eid in eids: if eid not in self.engines: try: - raise IndexError("No such engine: %i"%eid) + raise IndexError("No such engine: %i" % eid) except: reply = error.wrap_exception() break @@ -1131,13 +1134,13 @@ class Hub(SessionFactory): elif len(records) < len(msg_ids): missing = [ m for m in msg_ids if m not in found_ids ] try: - raise KeyError("No such msg(s): %r"%missing) + raise KeyError("No such msg(s): %r" % missing) except KeyError: return finish(error.wrap_exception()) elif invalid_ids: msg_id = invalid_ids[0] try: - raise ValueError("Task %r appears to be inflight"%(msg_id)) + raise ValueError("Task %r appears to be inflight" % msg_id) except Exception: return finish(error.wrap_exception()) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 90e913e..26c6f96 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -225,7 +225,7 @@ class TaskScheduler(SessionFactory): try: handler(asbytes(msg['content']['queue'])) except Exception: - self.log.error("task::Invalid notification msg: %r",msg) + self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) def _register_engine(self, uid): """New engine with ident `uid` became available."""