diff --git a/IPython/parallel/hub.py b/IPython/parallel/hub.py
index 2610002..7e66a27 100755
--- a/IPython/parallel/hub.py
+++ b/IPython/parallel/hub.py
@@ -45,6 +45,30 @@ def _printer(*args, **kwargs):
     print (args)
     print (kwargs)
 
+def empty_record():
+    """Return an empty dict with all record keys."""
+    return {
+        'msg_id' : None,
+        'header' : None,
+        'content': None,
+        'buffers': None,
+        'submitted': None,
+        'client_uuid' : None,
+        'engine_uuid' : None,
+        'started': None,
+        'completed': None,
+        'resubmitted': None,
+        'result_header' : None,
+        'result_content' : None,
+        'result_buffers' : None,
+        'queue' : None,
+        'pyin' : None,
+        'pyout': None,
+        'pyerr': None,
+        'stdout': '',
+        'stderr': '',
+    }
+    
 def init_record(msg):
     """Initialize a TaskRecord based on a request."""
     header = msg['header']
@@ -283,6 +307,7 @@ class Hub(LoggingFactory):
     tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
     completed=Dict() # completed msg_ids keyed by engine_id
     all_completed=Set() # completed msg_ids keyed by engine_id
+    dead_engines=Set() # completed msg_ids keyed by engine_id
     # mia=None
     incoming_registrations=Dict()
     registration_timeout=Int()
@@ -531,9 +556,21 @@ class Hub(LoggingFactory):
         record['client_uuid'] = client_id
         record['queue'] = 'mux'
 
+        try:
+            # it's posible iopub arrived first:
+            existing = self.db.get_record(msg_id)
+            for key,evalue in existing.iteritems():
+                rvalue = record[key]
+                if evalue and rvalue and evalue != rvalue:
+                    self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
+                elif evalue and not rvalue:
+                    record[key] = evalue
+            self.db.update_record(msg_id, record)
+        except KeyError:
+            self.db.add_record(msg_id, record)
+        
         self.pending.add(msg_id)
         self.queues[eid].append(msg_id)
-        self.db.add_record(msg_id, record)
     
     def save_queue_result(self, idents, msg):
         if len(idents) < 2:
@@ -551,7 +588,7 @@ class Hub(LoggingFactory):
         eid = self.by_ident.get(queue_id, None)
         if eid is None:
             self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
-            self.log.debug("queue::       %s"%msg[2:])
+            # self.log.debug("queue::       %s"%msg[2:])
             return
         
         parent = msg['parent_header']
@@ -604,7 +641,18 @@ class Hub(LoggingFactory):
         header = msg['header']
         msg_id = header['msg_id']
         self.pending.add(msg_id)
-        self.db.add_record(msg_id, record)
+        try:
+            # it's posible iopub arrived first:
+            existing = self.db.get_record(msg_id)
+            for key,evalue in existing.iteritems():
+                rvalue = record[key]
+                if evalue and rvalue and evalue != rvalue:
+                    self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
+                elif evalue and not rvalue:
+                    record[key] = evalue
+            self.db.update_record(msg_id, record)
+        except KeyError:
+            self.db.add_record(msg_id, record)
     
     def save_task_result(self, idents, msg):
         """save the result of a completed task."""
@@ -704,9 +752,10 @@ class Hub(LoggingFactory):
         # ensure msg_id is in db
         try:
             rec = self.db.get_record(msg_id)
-        except:
-            self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
-            return
+        except KeyError:
+            rec = empty_record()
+            rec['msg_id'] = msg_id
+            self.db.add_record(msg_id, rec)
         # stream
         d = {}
         if msg_type == 'stream':
@@ -734,7 +783,8 @@ class Hub(LoggingFactory):
         content.update(self.client_info)
         jsonable = {}
         for k,v in self.keytable.iteritems():
-            jsonable[str(k)] = v
+            if v not in self.dead_engines:
+                jsonable[str(k)] = v
         content['engines'] = jsonable
         self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
     
@@ -812,15 +862,20 @@ class Hub(LoggingFactory):
             return
         self.log.info("registration::unregister_engine(%s)"%eid)
         # print (eid)
-        content=dict(id=eid, queue=self.engines[eid].queue)
-        self.ids.remove(eid)
-        uuid = self.keytable.pop(eid)
-        ec = self.engines.pop(eid)
-        self.hearts.pop(ec.heartbeat)
-        self.by_ident.pop(ec.queue)
-        self.completed.pop(eid)
-        self._handle_stranded_msgs(eid, uuid)
-            ############## TODO: HANDLE IT ################
+        uuid = self.keytable[eid]
+        content=dict(id=eid, queue=uuid)
+        self.dead_engines.add(uuid)
+        # self.ids.remove(eid)
+        # uuid = self.keytable.pop(eid)
+        # 
+        # ec = self.engines.pop(eid)
+        # self.hearts.pop(ec.heartbeat)
+        # self.by_ident.pop(ec.queue)
+        # self.completed.pop(eid)
+        handleit = lambda : self._handle_stranded_msgs(eid, uuid)
+        dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
+        dc.start()
+        ############## TODO: HANDLE IT ################
         
         if self.notifier:
             self.session.send(self.notifier, "unregistration_notification", content=content)
@@ -833,7 +888,7 @@ class Hub(LoggingFactory):
         that the result failed and later receive the actual result.
         """
         
-        outstanding = self.queues.pop(eid)
+        outstanding = self.queues[eid]
         
         for msg_id in outstanding:
             self.pending.remove(msg_id)
diff --git a/IPython/parallel/launcher.py b/IPython/parallel/launcher.py
index 13c3c78..4069e9b 100644
--- a/IPython/parallel/launcher.py
+++ b/IPython/parallel/launcher.py
@@ -904,7 +904,7 @@ class PBSEngineSetLauncher(PBSLauncher):
 
     def start(self, n, cluster_dir):
         """Start n engines by profile or cluster_dir."""
-        self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
+        self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
         return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
 
 #SGE is very similar to PBS
@@ -942,7 +942,7 @@ class SGEEngineSetLauncher(SGELauncher):
 
     def start(self, n, cluster_dir):
         """Start n engines by profile or cluster_dir."""
-        self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
+        self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
         return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
 
 
diff --git a/IPython/parallel/sqlitedb.py b/IPython/parallel/sqlitedb.py
index 7de6341..f7ab22b 100644
--- a/IPython/parallel/sqlitedb.py
+++ b/IPython/parallel/sqlitedb.py
@@ -13,6 +13,8 @@ from datetime import datetime
 
 import sqlite3
 
+from zmq.eventloop import ioloop
+
 from IPython.utils.traitlets import CUnicode, CStr, Instance, List
 from .dictdb import BaseDB
 from .util import ISO8601
@@ -114,6 +116,13 @@ class SQLiteDB(BaseDB):
             else:
                 self.location = '.'
         self._init_db()
+        
+        # register db commit as 2s periodic callback
+        # to prevent clogging pipes
+        # assumes we are being run in a zmq ioloop app
+        loop = ioloop.IOLoop.instance()
+        pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
+        pc.start()
     
     def _defaults(self):
         """create an empty record"""
@@ -133,7 +142,9 @@ class SQLiteDB(BaseDB):
         sqlite3.register_converter('bufs', _convert_bufs)
         # connect to the db
         dbfile = os.path.join(self.location, self.filename)
-        self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, cached_statements=16)
+        self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, 
+            # isolation_level = None)#,
+             cached_statements=64)
         # print dir(self._db)
         
         self._db.execute("""CREATE TABLE IF NOT EXISTS %s 
@@ -218,7 +229,7 @@ class SQLiteDB(BaseDB):
         line = self._dict_to_list(d)
         tups = '(%s)'%(','.join(['?']*len(line)))
         self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
-        self._db.commit()
+        # self._db.commit()
     
     def get_record(self, msg_id):
         """Get a specific Task Record, by msg_id."""
@@ -240,19 +251,19 @@ class SQLiteDB(BaseDB):
         query += ', '.join(sets)
         query += ' WHERE msg_id == %r'%msg_id
         self._db.execute(query, values)
-        self._db.commit()
+        # self._db.commit()
     
     def drop_record(self, msg_id):
         """Remove a record from the DB."""
         self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,))
-        self._db.commit()
+        # self._db.commit()
     
     def drop_matching_records(self, check):
         """Remove a record from the DB."""
         expr,args = self._render_expression(check)
         query = "DELETE FROM %s WHERE %s"%(self.table, expr)
         self._db.execute(query,args)
-        self._db.commit()
+        # self._db.commit()
         
     def find_records(self, check, id_only=False):
         """Find records matching a query dict."""
diff --git a/docs/examples/newparallel/wave2D/parallelwave-mpi.py b/docs/examples/newparallel/wave2D/parallelwave-mpi.py
index 1a0f793..f747217 100755
--- a/docs/examples/newparallel/wave2D/parallelwave-mpi.py
+++ b/docs/examples/newparallel/wave2D/parallelwave-mpi.py
@@ -28,7 +28,7 @@ import time
 from numpy import exp, zeros, newaxis, sqrt
 
 from IPython.external import argparse
-from IPython.parallel.client import Client, Reference
+from IPython.parallel import Client, Reference
 
 def setup_partitioner(index, num_procs, gnum_cells, parts):
     """create a partitioner in the engine namespace"""
diff --git a/docs/examples/newparallel/wave2D/parallelwave.py b/docs/examples/newparallel/wave2D/parallelwave.py
index 2692d5e..af2f15c 100755
--- a/docs/examples/newparallel/wave2D/parallelwave.py
+++ b/docs/examples/newparallel/wave2D/parallelwave.py
@@ -28,7 +28,7 @@ import time
 from numpy import exp, zeros, newaxis, sqrt
 
 from IPython.external import argparse
-from IPython.parallel.client import Client, Reference
+from IPython.parallel import Client, Reference
 
 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
     """create a partitioner in the engine namespace"""