diff --git a/IPython/parallel/hub.py b/IPython/parallel/hub.py index 2610002..7e66a27 100755 --- a/IPython/parallel/hub.py +++ b/IPython/parallel/hub.py @@ -45,6 +45,30 @@ def _printer(*args, **kwargs): print (args) print (kwargs) +def empty_record(): + """Return an empty dict with all record keys.""" + return { + 'msg_id' : None, + 'header' : None, + 'content': None, + 'buffers': None, + 'submitted': None, + 'client_uuid' : None, + 'engine_uuid' : None, + 'started': None, + 'completed': None, + 'resubmitted': None, + 'result_header' : None, + 'result_content' : None, + 'result_buffers' : None, + 'queue' : None, + 'pyin' : None, + 'pyout': None, + 'pyerr': None, + 'stdout': '', + 'stderr': '', + } + def init_record(msg): """Initialize a TaskRecord based on a request.""" header = msg['header'] @@ -283,6 +307,7 @@ class Hub(LoggingFactory): tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id completed=Dict() # completed msg_ids keyed by engine_id all_completed=Set() # completed msg_ids keyed by engine_id + dead_engines=Set() # completed msg_ids keyed by engine_id # mia=None incoming_registrations=Dict() registration_timeout=Int() @@ -531,9 +556,21 @@ class Hub(LoggingFactory): record['client_uuid'] = client_id record['queue'] = 'mux' + try: + # it's posible iopub arrived first: + existing = self.db.get_record(msg_id) + for key,evalue in existing.iteritems(): + rvalue = record[key] + if evalue and rvalue and evalue != rvalue: + self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue)) + elif evalue and not rvalue: + record[key] = evalue + self.db.update_record(msg_id, record) + except KeyError: + self.db.add_record(msg_id, record) + self.pending.add(msg_id) self.queues[eid].append(msg_id) - self.db.add_record(msg_id, record) def save_queue_result(self, idents, msg): if len(idents) < 2: @@ -551,7 +588,7 @@ class Hub(LoggingFactory): eid = self.by_ident.get(queue_id, None) if eid is None: self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) - self.log.debug("queue:: %s"%msg[2:]) + # self.log.debug("queue:: %s"%msg[2:]) return parent = msg['parent_header'] @@ -604,7 +641,18 @@ class Hub(LoggingFactory): header = msg['header'] msg_id = header['msg_id'] self.pending.add(msg_id) - self.db.add_record(msg_id, record) + try: + # it's posible iopub arrived first: + existing = self.db.get_record(msg_id) + for key,evalue in existing.iteritems(): + rvalue = record[key] + if evalue and rvalue and evalue != rvalue: + self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue)) + elif evalue and not rvalue: + record[key] = evalue + self.db.update_record(msg_id, record) + except KeyError: + self.db.add_record(msg_id, record) def save_task_result(self, idents, msg): """save the result of a completed task.""" @@ -704,9 +752,10 @@ class Hub(LoggingFactory): # ensure msg_id is in db try: rec = self.db.get_record(msg_id) - except: - self.log.error("iopub::IOPub message has invalid parent", exc_info=True) - return + except KeyError: + rec = empty_record() + rec['msg_id'] = msg_id + self.db.add_record(msg_id, rec) # stream d = {} if msg_type == 'stream': @@ -734,7 +783,8 @@ class Hub(LoggingFactory): content.update(self.client_info) jsonable = {} for k,v in self.keytable.iteritems(): - jsonable[str(k)] = v + if v not in self.dead_engines: + jsonable[str(k)] = v content['engines'] = jsonable self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) @@ -812,15 +862,20 @@ class Hub(LoggingFactory): return self.log.info("registration::unregister_engine(%s)"%eid) # print (eid) - content=dict(id=eid, queue=self.engines[eid].queue) - self.ids.remove(eid) - uuid = self.keytable.pop(eid) - ec = self.engines.pop(eid) - self.hearts.pop(ec.heartbeat) - self.by_ident.pop(ec.queue) - self.completed.pop(eid) - self._handle_stranded_msgs(eid, uuid) - ############## TODO: HANDLE IT ################ + uuid = self.keytable[eid] + content=dict(id=eid, queue=uuid) + self.dead_engines.add(uuid) + # self.ids.remove(eid) + # uuid = self.keytable.pop(eid) + # + # ec = self.engines.pop(eid) + # self.hearts.pop(ec.heartbeat) + # self.by_ident.pop(ec.queue) + # self.completed.pop(eid) + handleit = lambda : self._handle_stranded_msgs(eid, uuid) + dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) + dc.start() + ############## TODO: HANDLE IT ################ if self.notifier: self.session.send(self.notifier, "unregistration_notification", content=content) @@ -833,7 +888,7 @@ class Hub(LoggingFactory): that the result failed and later receive the actual result. """ - outstanding = self.queues.pop(eid) + outstanding = self.queues[eid] for msg_id in outstanding: self.pending.remove(msg_id) diff --git a/IPython/parallel/launcher.py b/IPython/parallel/launcher.py index 13c3c78..4069e9b 100644 --- a/IPython/parallel/launcher.py +++ b/IPython/parallel/launcher.py @@ -904,7 +904,7 @@ class PBSEngineSetLauncher(PBSLauncher): def start(self, n, cluster_dir): """Start n engines by profile or cluster_dir.""" - self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args)) + self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args)) return super(PBSEngineSetLauncher, self).start(n, cluster_dir) #SGE is very similar to PBS @@ -942,7 +942,7 @@ class SGEEngineSetLauncher(SGELauncher): def start(self, n, cluster_dir): """Start n engines by profile or cluster_dir.""" - self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args)) + self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args)) return super(SGEEngineSetLauncher, self).start(n, cluster_dir) diff --git a/IPython/parallel/sqlitedb.py b/IPython/parallel/sqlitedb.py index 7de6341..f7ab22b 100644 --- a/IPython/parallel/sqlitedb.py +++ b/IPython/parallel/sqlitedb.py @@ -13,6 +13,8 @@ from datetime import datetime import sqlite3 +from zmq.eventloop import ioloop + from IPython.utils.traitlets import CUnicode, CStr, Instance, List from .dictdb import BaseDB from .util import ISO8601 @@ -114,6 +116,13 @@ class SQLiteDB(BaseDB): else: self.location = '.' self._init_db() + + # register db commit as 2s periodic callback + # to prevent clogging pipes + # assumes we are being run in a zmq ioloop app + loop = ioloop.IOLoop.instance() + pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop) + pc.start() def _defaults(self): """create an empty record""" @@ -133,7 +142,9 @@ class SQLiteDB(BaseDB): sqlite3.register_converter('bufs', _convert_bufs) # connect to the db dbfile = os.path.join(self.location, self.filename) - self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, cached_statements=16) + self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, + # isolation_level = None)#, + cached_statements=64) # print dir(self._db) self._db.execute("""CREATE TABLE IF NOT EXISTS %s @@ -218,7 +229,7 @@ class SQLiteDB(BaseDB): line = self._dict_to_list(d) tups = '(%s)'%(','.join(['?']*len(line))) self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line) - self._db.commit() + # self._db.commit() def get_record(self, msg_id): """Get a specific Task Record, by msg_id.""" @@ -240,19 +251,19 @@ class SQLiteDB(BaseDB): query += ', '.join(sets) query += ' WHERE msg_id == %r'%msg_id self._db.execute(query, values) - self._db.commit() + # self._db.commit() def drop_record(self, msg_id): """Remove a record from the DB.""" self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,)) - self._db.commit() + # self._db.commit() def drop_matching_records(self, check): """Remove a record from the DB.""" expr,args = self._render_expression(check) query = "DELETE FROM %s WHERE %s"%(self.table, expr) self._db.execute(query,args) - self._db.commit() + # self._db.commit() def find_records(self, check, id_only=False): """Find records matching a query dict.""" diff --git a/docs/examples/newparallel/wave2D/parallelwave-mpi.py b/docs/examples/newparallel/wave2D/parallelwave-mpi.py index 1a0f793..f747217 100755 --- a/docs/examples/newparallel/wave2D/parallelwave-mpi.py +++ b/docs/examples/newparallel/wave2D/parallelwave-mpi.py @@ -28,7 +28,7 @@ import time from numpy import exp, zeros, newaxis, sqrt from IPython.external import argparse -from IPython.parallel.client import Client, Reference +from IPython.parallel import Client, Reference def setup_partitioner(index, num_procs, gnum_cells, parts): """create a partitioner in the engine namespace""" diff --git a/docs/examples/newparallel/wave2D/parallelwave.py b/docs/examples/newparallel/wave2D/parallelwave.py index 2692d5e..af2f15c 100755 --- a/docs/examples/newparallel/wave2D/parallelwave.py +++ b/docs/examples/newparallel/wave2D/parallelwave.py @@ -28,7 +28,7 @@ import time from numpy import exp, zeros, newaxis, sqrt from IPython.external import argparse -from IPython.parallel.client import Client, Reference +from IPython.parallel import Client, Reference def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): """create a partitioner in the engine namespace"""