diff --git a/IPython/parallel/controller/dictdb.py b/IPython/parallel/controller/dictdb.py index bffcc2a..057d6ee 100644 --- a/IPython/parallel/controller/dictdb.py +++ b/IPython/parallel/controller/dictdb.py @@ -15,10 +15,11 @@ TaskRecords are dicts of the form:: 'header' : dict(header), 'content': dict(content), 'buffers': list(buffers), - 'submitted': datetime, + 'submitted': datetime or None, 'started': datetime or None, 'completed': datetime or None, - 'resubmitted': datetime or None, + 'received': datetime or None, + 'resubmitted': str(uuid) or None, 'result_header' : dict(header) or None, 'result_content' : dict(content) or None, 'result_buffers' : list(buffers) or None, @@ -31,16 +32,7 @@ e.g.: * client's outstanding: client_uuid = uuid && completed is None * MIA: arrived is None (and completed is None) -EngineRecords are dicts of the form:: - - { - 'eid' : int(id), - 'uuid': str(uuid) - } - -This may be extended, but is currently. - -We support a subset of mongodb operators:: +DictDB supports a subset of mongodb operators:: $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists """ @@ -210,12 +202,19 @@ class DictDB(BaseDB): before_count, before, self.size_limit, culled ) + def _check_dates(self, rec): + for key in ('submitted', 'started', 'completed'): + value = rec.get(key, None) + if value is not None and not isinstance(value, datetime): + raise ValueError("%s must be None or datetime, not %r" % (key, value)) + # public API methods: def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" if msg_id in self._records: raise KeyError("Already have msg_id %r"%(msg_id)) + self._check_dates(rec) self._records[msg_id] = rec self._add_bytes(rec) self._maybe_cull() @@ -232,6 +231,7 @@ class DictDB(BaseDB): """Update the data in an existing record.""" if msg_id in self._culled_ids: raise KeyError("Record %r has been culled for size" % msg_id) + self._check_dates(rec) _rec = self._records[msg_id] self._drop_bytes(_rec) _rec.update(rec) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 176edfa..d5d7bd1 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -669,7 +669,7 @@ class Hub(SessionFactory): rheader = msg['header'] md = msg['metadata'] completed = rheader['date'] - started = md.get('started', None) + started = extract_dates(md.get('started', None)) result = { 'result_header' : rheader, 'result_metadata': md, @@ -775,7 +775,7 @@ class Hub(SessionFactory): if msg_id in self.tasks[eid]: self.tasks[eid].remove(msg_id) completed = header['date'] - started = md.get('started', None) + started = extract_dates(md.get('started', None)) result = { 'result_header' : header, 'result_metadata': msg['metadata'], @@ -1194,6 +1194,7 @@ class Hub(SessionFactory): self.db.drop_matching_records(dict(completed={'$ne':None})) except Exception: reply = error.wrap_exception() + self.log.exception("Error dropping records") else: pending = [m for m in msg_ids if (m in self.pending)] if pending: @@ -1201,11 +1202,13 @@ class Hub(SessionFactory): raise IndexError("msg pending: %r" % pending[0]) except: reply = error.wrap_exception() + self.log.exception("Error dropping records") else: try: self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) except Exception: reply = error.wrap_exception() + self.log.exception("Error dropping records") if reply['status'] == 'ok': eids = content.get('engine_ids', []) @@ -1215,12 +1218,14 @@ class Hub(SessionFactory): raise IndexError("No such engine: %i" % eid) except: reply = error.wrap_exception() + self.log.exception("Error dropping records") break uid = self.engines[eid].uuid try: self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) except Exception: reply = error.wrap_exception() + self.log.exception("Error dropping records") break self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) @@ -1248,12 +1253,14 @@ class Hub(SessionFactory): raise RuntimeError("DB appears to be in an inconsistent state." "More matching records were found than should exist") except Exception: + self.log.exception("Failed to resubmit task") return finish(error.wrap_exception()) elif len(records) < len(msg_ids): missing = [ m for m in msg_ids if m not in found_ids ] try: raise KeyError("No such msg(s): %r" % missing) except KeyError: + self.log.exception("Failed to resubmit task") return finish(error.wrap_exception()) elif pending_ids: pass @@ -1343,6 +1350,7 @@ class Hub(SessionFactory): records[rec['msg_id']] = rec except Exception: content = error.wrap_exception() + self.log.exception("Failed to get results") self.session.send(self.query, "result_reply", content=content, parent=msg, ident=client_id) return @@ -1381,6 +1389,7 @@ class Hub(SessionFactory): msg_ids = self.db.get_history() except Exception as e: content = error.wrap_exception() + self.log.exception("Failed to get history") else: content = dict(status='ok', history=msg_ids) @@ -1398,6 +1407,7 @@ class Hub(SessionFactory): records = self.db.find_records(query, keys) except Exception as e: content = error.wrap_exception() + self.log.exception("DB query failed") else: # extract buffers from reply content: if keys is not None: diff --git a/IPython/parallel/tests/__init__.py b/IPython/parallel/tests/__init__.py index 3a7d6c6..188f897 100644 --- a/IPython/parallel/tests/__init__.py +++ b/IPython/parallel/tests/__init__.py @@ -20,7 +20,7 @@ from subprocess import Popen, PIPE, STDOUT import nose from IPython.utils.path import get_ipython_dir -from IPython.parallel import Client +from IPython.parallel import Client, error from IPython.parallel.apps.launcher import (LocalProcessLauncher, ipengine_cmd_argv, ipcontroller_cmd_argv, @@ -53,6 +53,15 @@ class TestProcessLauncher(LocalProcessLauncher): # nose setup/teardown def setup(): + + # show tracebacks for RemoteErrors + class RemoteErrorWithTB(error.RemoteError): + def __str__(self): + s = super(RemoteErrorWithTB, self).__str__() + return '\n'.join([s, self.traceback or '']) + + error.RemoteError = RemoteErrorWithTB + cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest') engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json') client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json') @@ -109,7 +118,10 @@ def add_engines(n=1, profile='iptest', total=False): return eps def teardown(): - time.sleep(1) + try: + time.sleep(1) + except KeyboardInterrupt: + return while launchers: p = launchers.pop() if p.poll() is None: @@ -119,7 +131,10 @@ def teardown(): print(e) pass if p.poll() is None: - time.sleep(.25) + try: + time.sleep(.25) + except KeyboardInterrupt: + return if p.poll() is None: try: print('cleaning up test process...') diff --git a/IPython/parallel/tests/test_db.py b/IPython/parallel/tests/test_db.py index 8e95ad2..a00478d 100644 --- a/IPython/parallel/tests/test_db.py +++ b/IPython/parallel/tests/test_db.py @@ -247,7 +247,7 @@ class TestDictBackend(TaskDBTest, TestCase): self.load_records(1) self.assertEqual(len(self.db.get_history()), 17) - for i in range(100): + for i in range(25): self.load_records(1) self.assertTrue(len(self.db.get_history()) >= 17) self.assertTrue(len(self.db.get_history()) <= 20) diff --git a/IPython/utils/jsonutil.py b/IPython/utils/jsonutil.py index 9fed8d2..68630be 100644 --- a/IPython/utils/jsonutil.py +++ b/IPython/utils/jsonutil.py @@ -34,7 +34,7 @@ next_attr_name = '__next__' if py3compat.PY3 else 'next' # timestamp formats ISO8601 = "%Y-%m-%dT%H:%M:%S.%f" -ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6})Z?([\+\-]\d{2}:?\d{2})?$") +ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d{1,6})?Z?([\+\-]\d{2}:?\d{2})?$") #----------------------------------------------------------------------------- # Classes and functions @@ -75,7 +75,10 @@ def parse_date(s): if m: # FIXME: add actual timezone support # this just drops the timezone info - notz = m.groups()[0] + notz, ms, tz = m.groups() + if not ms: + ms = '.0' + notz = notz + ms return datetime.strptime(notz, ISO8601) return s diff --git a/IPython/utils/tests/test_jsonutil.py b/IPython/utils/tests/test_jsonutil.py index 012b168..398a981 100644 --- a/IPython/utils/tests/test_jsonutil.py +++ b/IPython/utils/tests/test_jsonutil.py @@ -96,8 +96,8 @@ def test_encode_images(): def test_lambda(): jc = json_clean(lambda : 1) - assert isinstance(jc, str) - assert '' in jc + nt.assert_is_instance(jc, str) + nt.assert_in('', jc) json.dumps(jc) def test_extract_dates(): @@ -120,16 +120,18 @@ def test_extract_dates(): nt.assert_equal(dt, ref) def test_parse_ms_precision(): - base = '2013-07-03T16:34:52.' + base = '2013-07-03T16:34:52' digits = '1234567890' + parsed = jsonutil.parse_date(base) + nt.assert_is_instance(parsed, datetime.datetime) for i in range(len(digits)): - ts = base + digits[:i] + ts = base + '.' + digits[:i] parsed = jsonutil.parse_date(ts) if i >= 1 and i <= 6: - assert isinstance(parsed, datetime.datetime) + nt.assert_is_instance(parsed, datetime.datetime) else: - assert isinstance(parsed, str) + nt.assert_is_instance(parsed, str) def test_date_default(): data = dict(today=datetime.datetime.now(), utcnow=tz.utcnow()) @@ -138,7 +140,7 @@ def test_date_default(): nt.assert_equal(jsondata.count("+00"), 1) extracted = jsonutil.extract_dates(json.loads(jsondata)) for dt in extracted.values(): - nt.assert_true(isinstance(dt, datetime.datetime)) + nt.assert_is_instance(dt, datetime.datetime) def test_exception(): bad_dicts = [{1:'number', '1':'string'},