From d7d4f70ea993f41483a28934469289a000a54822 2011-12-12 21:46:47
From: MinRK <benjaminrk@gmail.com>
Date: 2011-12-12 21:46:47
Subject: [PATCH] minor controller logging adjustments

* add debug message when heartbeats go out
* don't call unregister_engine more than once for a failed heart
* adjust logging calls, to avoid formatting unused messages

---

diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py
index c1db4f0..875f6d8 100755
--- a/IPython/parallel/controller/heartmonitor.py
+++ b/IPython/parallel/controller/heartmonitor.py
@@ -53,6 +53,7 @@ class Heart(object):
     def start(self):
         return self.device.start()
 
+
 class HeartMonitor(LoggingConfigurable):
     """A basic HeartMonitor class
     pingstream: a PUB stream
@@ -92,12 +93,12 @@ class HeartMonitor(LoggingConfigurable):
 
     def add_new_heart_handler(self, handler):
         """add a new handler for new hearts"""
-        self.log.debug("heartbeat::new_heart_handler: %s"%handler)
+        self.log.debug("heartbeat::new_heart_handler: %s", handler)
         self._new_handlers.add(handler)
 
     def add_heart_failure_handler(self, handler):
         """add a new handler for heart failure"""
-        self.log.debug("heartbeat::new heart failure handler: %s"%handler)
+        self.log.debug("heartbeat::new heart failure handler: %s", handler)
         self._failure_handlers.add(handler)
 
     def beat(self):
@@ -107,7 +108,7 @@ class HeartMonitor(LoggingConfigurable):
         toc = time.time()
         self.lifetime += toc-self.tic
         self.tic = toc
-        # self.log.debug("heartbeat::%s"%self.lifetime)
+        self.log.debug("heartbeat::sending %s", self.lifetime)
         goodhearts = self.hearts.intersection(self.responses)
         missed_beats = self.hearts.difference(goodhearts)
         heartfailures = self.on_probation.intersection(missed_beats)
@@ -117,7 +118,7 @@ class HeartMonitor(LoggingConfigurable):
         self.on_probation = missed_beats.intersection(self.hearts)
         self.responses = set()
         # print self.on_probation, self.hearts
-        # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
+        # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
         self.pingstream.send(asbytes(str(self.lifetime)))
 
     def handle_new_heart(self, heart):
@@ -125,7 +126,7 @@ class HeartMonitor(LoggingConfigurable):
             for handler in self._new_handlers:
                 handler(heart)
         else:
-            self.log.info("heartbeat::yay, got new heart %s!"%heart)
+            self.log.info("heartbeat::yay, got new heart %s!", heart)
         self.hearts.add(heart)
 
     def handle_heart_failure(self, heart):
@@ -134,10 +135,10 @@ class HeartMonitor(LoggingConfigurable):
                 try:
                     handler(heart)
                 except Exception as e:
-                    self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
+                    self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
                     pass
         else:
-            self.log.info("heartbeat::Heart %s failed :("%heart)
+            self.log.info("heartbeat::Heart %s failed :(", heart)
         self.hearts.remove(heart)
 
 
@@ -151,11 +152,10 @@ class HeartMonitor(LoggingConfigurable):
             self.responses.add(msg[0])
         elif msg[1] == last:
             delta = time.time()-self.tic + (self.lifetime-self.last_ping)
-            self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
+            self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
             self.responses.add(msg[0])
         else:
-            self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
-            (msg[1],self.lifetime))
+            self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
 
 
 if __name__ == '__main__':
diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py
index 0868616..f557214 100644
--- a/IPython/parallel/controller/hub.py
+++ b/IPython/parallel/controller/hub.py
@@ -192,7 +192,7 @@ class HubFactory(RegistrationFactory):
         self._update_monitor_url()
 
     def _update_monitor_url(self):
-        self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
+        self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
 
     def _transport_changed(self, name, old, new):
         self.engine_transport = new
@@ -214,8 +214,8 @@ class HubFactory(RegistrationFactory):
 
     def init_hub(self):
         """construct"""
-        client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
-        engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
+        client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
+        engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
 
         ctx = self.context
         loop = self.loop
@@ -223,10 +223,10 @@ class HubFactory(RegistrationFactory):
         # Registrar socket
         q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
         q.bind(client_iface % self.regport)
-        self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
+        self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
         if self.client_ip != self.engine_ip:
             q.bind(engine_iface % self.regport)
-            self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
+            self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
 
         ### Engine connections ###
 
@@ -282,8 +282,8 @@ class HubFactory(RegistrationFactory):
             'iopub' : client_iface%self.iopub[0],
             'notification': client_iface%self.notifier_port
             }
-        self.log.debug("Hub engine addrs: %s"%self.engine_info)
-        self.log.debug("Hub client addrs: %s"%self.client_info)
+        self.log.debug("Hub engine addrs: %s", self.engine_info)
+        self.log.debug("Hub client addrs: %s", self.client_info)
 
         # resubmit stream
         r = ZMQStream(ctx.socket(zmq.DEALER), loop)
@@ -444,7 +444,7 @@ class Hub(SessionFactory):
         targets = _targets
         bad_targets = [ t for t in targets if t not in self.ids ]
         if bad_targets:
-            raise IndexError("No Such Engine: %r"%bad_targets)
+            raise IndexError("No Such Engine: %r" % bad_targets)
         if not targets:
             raise IndexError("No Engines Registered")
         return targets
@@ -457,20 +457,20 @@ class Hub(SessionFactory):
     def dispatch_monitor_traffic(self, msg):
         """all ME and Task queue messages come through here, as well as
         IOPub traffic."""
-        self.log.debug("monitor traffic: %r"%msg[:2])
+        self.log.debug("monitor traffic: %r", msg[:2])
         switch = msg[0]
         try:
             idents, msg = self.session.feed_identities(msg[1:])
         except ValueError:
             idents=[]
         if not idents:
-            self.log.error("Bad Monitor Message: %r"%msg)
+            self.log.error("Bad Monitor Message: %r", msg)
             return
         handler = self.monitor_handlers.get(switch, None)
         if handler is not None:
             handler(idents, msg)
         else:
-            self.log.error("Invalid monitor topic: %r"%switch)
+            self.log.error("Invalid monitor topic: %r", switch)
 
 
     def dispatch_query(self, msg):
@@ -480,27 +480,27 @@ class Hub(SessionFactory):
         except ValueError:
             idents = []
         if not idents:
-            self.log.error("Bad Query Message: %r"%msg)
+            self.log.error("Bad Query Message: %r", msg)
             return
         client_id = idents[0]
         try:
             msg = self.session.unserialize(msg, content=True)
         except Exception:
             content = error.wrap_exception()
-            self.log.error("Bad Query Message: %r"%msg, exc_info=True)
+            self.log.error("Bad Query Message: %r", msg, exc_info=True)
             self.session.send(self.query, "hub_error", ident=client_id,
                     content=content)
             return
         # print client_id, header, parent, content
         #switch on message type:
         msg_type = msg['header']['msg_type']
-        self.log.info("client::client %r requested %r"%(client_id, msg_type))
+        self.log.info("client::client %r requested %r", client_id, msg_type)
         handler = self.query_handlers.get(msg_type, None)
         try:
-            assert handler is not None, "Bad Message Type: %r"%msg_type
+            assert handler is not None, "Bad Message Type: %r" % msg_type
         except:
             content = error.wrap_exception()
-            self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
+            self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
             self.session.send(self.query, "hub_error", ident=client_id,
                     content=content)
             return
@@ -522,9 +522,9 @@ class Hub(SessionFactory):
         """handler to attach to heartbeater.
         Called when a new heart starts to beat.
         Triggers completion of registration."""
-        self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
+        self.log.debug("heartbeat::handle_new_heart(%r)", heart)
         if heart not in self.incoming_registrations:
-            self.log.info("heartbeat::ignoring new heart: %r"%heart)
+            self.log.info("heartbeat::ignoring new heart: %r", heart)
         else:
             self.finish_registration(heart)
 
@@ -533,11 +533,11 @@ class Hub(SessionFactory):
         """handler to attach to heartbeater.
         called when a previously registered heart fails to respond to beat request.
         triggers unregistration"""
-        self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
+        self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
         eid = self.hearts.get(heart, None)
         queue = self.engines[eid].queue
-        if eid is None:
-            self.log.info("heartbeat::ignoring heart failure %r"%heart)
+        if eid is None or self.keytable[eid] in self.dead_engines:
+            self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
         else:
             self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
 
@@ -545,22 +545,23 @@ class Hub(SessionFactory):
 
     def save_queue_request(self, idents, msg):
         if len(idents) < 2:
-            self.log.error("invalid identity prefix: %r"%idents)
+            self.log.error("invalid identity prefix: %r", idents)
             return
         queue_id, client_id = idents[:2]
         try:
             msg = self.session.unserialize(msg)
         except Exception:
-            self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
+            self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
             return
 
         eid = self.by_ident.get(queue_id, None)
         if eid is None:
-            self.log.error("queue::target %r not registered"%queue_id)
-            self.log.debug("queue::    valid are: %r"%(self.by_ident.keys()))
+            self.log.error("queue::target %r not registered", queue_id)
+            self.log.debug("queue::    valid are: %r", self.by_ident.keys())
             return
         record = init_record(msg)
         msg_id = record['msg_id']
+        self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
         # Unicode in records
         record['engine_uuid'] = queue_id.decode('ascii')
         record['client_uuid'] = client_id.decode('ascii')
@@ -572,18 +573,18 @@ class Hub(SessionFactory):
             for key,evalue in existing.iteritems():
                 rvalue = record.get(key, None)
                 if evalue and rvalue and evalue != rvalue:
-                    self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
+                    self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
                 elif evalue and not rvalue:
                     record[key] = evalue
             try:
                 self.db.update_record(msg_id, record)
             except Exception:
-                self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
+                self.log.error("DB Error updating record %r", msg_id, exc_info=True)
         except KeyError:
             try:
                 self.db.add_record(msg_id, record)
             except Exception:
-                self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
+                self.log.error("DB Error adding record %r", msg_id, exc_info=True)
 
 
         self.pending.add(msg_id)
@@ -591,20 +592,20 @@ class Hub(SessionFactory):
 
     def save_queue_result(self, idents, msg):
         if len(idents) < 2:
-            self.log.error("invalid identity prefix: %r"%idents)
+            self.log.error("invalid identity prefix: %r", idents)
             return
 
         client_id, queue_id = idents[:2]
         try:
             msg = self.session.unserialize(msg)
         except Exception:
-            self.log.error("queue::engine %r sent invalid message to %r: %r"%(
-                    queue_id,client_id, msg), exc_info=True)
+            self.log.error("queue::engine %r sent invalid message to %r: %r",
+                    queue_id, client_id, msg, exc_info=True)
             return
 
         eid = self.by_ident.get(queue_id, None)
         if eid is None:
-            self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
+            self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
             return
 
         parent = msg['parent_header']
@@ -616,10 +617,11 @@ class Hub(SessionFactory):
             self.all_completed.add(msg_id)
             self.queues[eid].remove(msg_id)
             self.completed[eid].append(msg_id)
+            self.log.info("queue::request %r completed on %s", msg_id, eid)
         elif msg_id not in self.all_completed:
             # it could be a result from a dead engine that died before delivering the
             # result
-            self.log.warn("queue:: unknown msg finished %r"%msg_id)
+            self.log.warn("queue:: unknown msg finished %r", msg_id)
             return
         # update record anyway, because the unregistration could have been premature
         rheader = msg['header']
@@ -636,7 +638,7 @@ class Hub(SessionFactory):
         try:
             self.db.update_record(msg_id, result)
         except Exception:
-            self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
+            self.log.error("DB Error updating record %r", msg_id, exc_info=True)
 
 
     #--------------------- Task Queue Traffic ------------------------------
@@ -648,8 +650,8 @@ class Hub(SessionFactory):
         try:
             msg = self.session.unserialize(msg)
         except Exception:
-            self.log.error("task::client %r sent invalid task message: %r"%(
-                    client_id, msg), exc_info=True)
+            self.log.error("task::client %r sent invalid task message: %r",
+                    client_id, msg, exc_info=True)
             return
         record = init_record(msg)
 
@@ -677,20 +679,20 @@ class Hub(SessionFactory):
                     continue
                 rvalue = record.get(key, None)
                 if evalue and rvalue and evalue != rvalue:
-                    self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
+                    self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
                 elif evalue and not rvalue:
                     record[key] = evalue
             try:
                 self.db.update_record(msg_id, record)
             except Exception:
-                self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
+                self.log.error("DB Error updating record %r", msg_id, exc_info=True)
         except KeyError:
             try:
                 self.db.add_record(msg_id, record)
             except Exception:
-                self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
+                self.log.error("DB Error adding record %r", msg_id, exc_info=True)
         except Exception:
-            self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
+            self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
 
     def save_task_result(self, idents, msg):
         """save the result of a completed task."""
@@ -698,14 +700,14 @@ class Hub(SessionFactory):
         try:
             msg = self.session.unserialize(msg)
         except Exception:
-            self.log.error("task::invalid task result message send to %r: %r"%(
-                    client_id, msg), exc_info=True)
+            self.log.error("task::invalid task result message send to %r: %r",
+                    client_id, msg, exc_info=True)
             return
 
         parent = msg['parent_header']
         if not parent:
             # print msg
-            self.log.warn("Task %r had no parent!"%msg)
+            self.log.warn("Task %r had no parent!", msg)
             return
         msg_id = parent['msg_id']
         if msg_id in self.unassigned:
@@ -716,6 +718,7 @@ class Hub(SessionFactory):
         eid = self.by_ident.get(engine_uuid, None)
 
         if msg_id in self.pending:
+            self.log.info("task::task %r finished on %s", msg_id, eid)
             self.pending.remove(msg_id)
             self.all_completed.add(msg_id)
             if eid is not None:
@@ -736,10 +739,10 @@ class Hub(SessionFactory):
             try:
                 self.db.update_record(msg_id, result)
             except Exception:
-                self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
+                self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
 
         else:
-            self.log.debug("task::unknown task %r finished"%msg_id)
+            self.log.debug("task::unknown task %r finished", msg_id)
 
     def save_task_destination(self, idents, msg):
         try:
@@ -753,7 +756,7 @@ class Hub(SessionFactory):
         engine_uuid = content['engine_id']
         eid = self.by_ident[util.asbytes(engine_uuid)]
 
-        self.log.info("task::task %r arrived on %r"%(msg_id, eid))
+        self.log.info("task::task %r arrived on %r", msg_id, eid)
         if msg_id in self.unassigned:
             self.unassigned.remove(msg_id)
         # else:
@@ -764,7 +767,7 @@ class Hub(SessionFactory):
         try:
             self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
         except Exception:
-            self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
+            self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
 
 
     def mia_task_request(self, idents, msg):
@@ -787,7 +790,7 @@ class Hub(SessionFactory):
 
         parent = msg['parent_header']
         if not parent:
-            self.log.error("iopub::invalid IOPub message: %r"%msg)
+            self.log.error("iopub::invalid IOPub message: %r", msg)
             return
         msg_id = parent['msg_id']
         msg_type = msg['header']['msg_type']
@@ -817,7 +820,7 @@ class Hub(SessionFactory):
         try:
             self.db.update_record(msg_id, d)
         except Exception:
-            self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
+            self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
 
 
 
@@ -827,7 +830,7 @@ class Hub(SessionFactory):
 
     def connection_request(self, client_id, msg):
         """Reply with connection addresses for clients."""
-        self.log.info("client::client %r connected"%client_id)
+        self.log.info("client::client %r connected", client_id)
         content = dict(status='ok')
         content.update(self.client_info)
         jsonable = {}
@@ -852,37 +855,37 @@ class Hub(SessionFactory):
         eid = self._next_id
         # print (eid, queue, reg, heart)
 
-        self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
+        self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
 
         content = dict(id=eid,status='ok')
         content.update(self.engine_info)
         # check if requesting available IDs:
         if queue in self.by_ident:
             try:
-                raise KeyError("queue_id %r in use"%queue)
+                raise KeyError("queue_id %r in use" % queue)
             except:
                 content = error.wrap_exception()
-                self.log.error("queue_id %r in use"%queue, exc_info=True)
+                self.log.error("queue_id %r in use", queue, exc_info=True)
         elif heart in self.hearts: # need to check unique hearts?
             try:
-                raise KeyError("heart_id %r in use"%heart)
+                raise KeyError("heart_id %r in use" % heart)
             except:
-                self.log.error("heart_id %r in use"%heart, exc_info=True)
+                self.log.error("heart_id %r in use", heart, exc_info=True)
                 content = error.wrap_exception()
         else:
             for h, pack in self.incoming_registrations.iteritems():
                 if heart == h:
                     try:
-                        raise KeyError("heart_id %r in use"%heart)
+                        raise KeyError("heart_id %r in use" % heart)
                     except:
-                        self.log.error("heart_id %r in use"%heart, exc_info=True)
+                        self.log.error("heart_id %r in use", heart, exc_info=True)
                         content = error.wrap_exception()
                     break
                 elif queue == pack[1]:
                     try:
-                        raise KeyError("queue_id %r in use"%queue)
+                        raise KeyError("queue_id %r in use" % queue)
                     except:
-                        self.log.error("queue_id %r in use"%queue, exc_info=True)
+                        self.log.error("queue_id %r in use", queue, exc_info=True)
                         content = error.wrap_exception()
                     break
 
@@ -901,7 +904,7 @@ class Hub(SessionFactory):
                 dc.start()
                 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
         else:
-            self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
+            self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
         return eid
 
     def unregister_engine(self, ident, msg):
@@ -909,9 +912,9 @@ class Hub(SessionFactory):
         try:
             eid = msg['content']['id']
         except:
-            self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
+            self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
             return
-        self.log.info("registration::unregister_engine(%r)"%eid)
+        self.log.info("registration::unregister_engine(%r)", eid)
         # print (eid)
         uuid = self.keytable[eid]
         content=dict(id=eid, queue=uuid.decode('ascii'))
@@ -945,7 +948,7 @@ class Hub(SessionFactory):
             self.pending.remove(msg_id)
             self.all_completed.add(msg_id)
             try:
-                raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
+                raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
             except:
                 content = error.wrap_exception()
             # build a fake header:
@@ -958,7 +961,7 @@ class Hub(SessionFactory):
             try:
                 self.db.update_record(msg_id, rec)
             except Exception:
-                self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
+                self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
 
 
     def finish_registration(self, heart):
@@ -969,7 +972,7 @@ class Hub(SessionFactory):
         except KeyError:
             self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
             return
-        self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
+        self.log.info("registration::finished registering engine %i:%r", eid, queue)
         if purge is not None:
             purge.stop()
         control = queue
@@ -985,12 +988,12 @@ class Hub(SessionFactory):
         content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
         if self.notifier:
             self.session.send(self.notifier, "registration_notification", content=content)
-        self.log.info("engine::Engine Connected: %i"%eid)
+        self.log.info("engine::Engine Connected: %i", eid)
 
     def _purge_stalled_registration(self, heart):
         if heart in self.incoming_registrations:
             eid = self.incoming_registrations.pop(heart)[0]
-            self.log.info("registration::purging stalled registration: %i"%eid)
+            self.log.info("registration::purging stalled registration: %i", eid)
         else:
             pass
 
@@ -1077,7 +1080,7 @@ class Hub(SessionFactory):
             pending = filter(lambda m: m in self.pending, msg_ids)
             if pending:
                 try:
-                    raise IndexError("msg pending: %r"%pending[0])
+                    raise IndexError("msg pending: %r" % pending[0])
                 except:
                     reply = error.wrap_exception()
             else:
@@ -1091,7 +1094,7 @@ class Hub(SessionFactory):
                 for eid in eids:
                     if eid not in self.engines:
                         try:
-                            raise IndexError("No such engine: %i"%eid)
+                            raise IndexError("No such engine: %i" % eid)
                         except:
                             reply = error.wrap_exception()
                         break
@@ -1131,13 +1134,13 @@ class Hub(SessionFactory):
         elif len(records) < len(msg_ids):
             missing = [ m for m in msg_ids if m not in found_ids ]
             try:
-                raise KeyError("No such msg(s): %r"%missing)
+                raise KeyError("No such msg(s): %r" % missing)
             except KeyError:
                 return finish(error.wrap_exception())
         elif invalid_ids:
             msg_id = invalid_ids[0]
             try:
-                raise ValueError("Task %r appears to be inflight"%(msg_id))
+                raise ValueError("Task %r appears to be inflight" % msg_id)
             except Exception:
                 return finish(error.wrap_exception())
 
diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py
index 90e913e..26c6f96 100644
--- a/IPython/parallel/controller/scheduler.py
+++ b/IPython/parallel/controller/scheduler.py
@@ -225,7 +225,7 @@ class TaskScheduler(SessionFactory):
             try:
                 handler(asbytes(msg['content']['queue']))
             except Exception:
-                self.log.error("task::Invalid notification msg: %r",msg)
+                self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
 
     def _register_engine(self, uid):
         """New engine with ident `uid` became available."""