diff --git a/IPython/parallel/controller/dictdb.py b/IPython/parallel/controller/dictdb.py index bffcc2a..057d6ee 100644 --- a/IPython/parallel/controller/dictdb.py +++ b/IPython/parallel/controller/dictdb.py @@ -15,10 +15,11 @@ TaskRecords are dicts of the form:: 'header' : dict(header), 'content': dict(content), 'buffers': list(buffers), - 'submitted': datetime, + 'submitted': datetime or None, 'started': datetime or None, 'completed': datetime or None, - 'resubmitted': datetime or None, + 'received': datetime or None, + 'resubmitted': str(uuid) or None, 'result_header' : dict(header) or None, 'result_content' : dict(content) or None, 'result_buffers' : list(buffers) or None, @@ -31,16 +32,7 @@ e.g.: * client's outstanding: client_uuid = uuid && completed is None * MIA: arrived is None (and completed is None) -EngineRecords are dicts of the form:: - - { - 'eid' : int(id), - 'uuid': str(uuid) - } - -This may be extended, but is currently. - -We support a subset of mongodb operators:: +DictDB supports a subset of mongodb operators:: $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists """ @@ -210,12 +202,19 @@ class DictDB(BaseDB): before_count, before, self.size_limit, culled ) + def _check_dates(self, rec): + for key in ('submitted', 'started', 'completed'): + value = rec.get(key, None) + if value is not None and not isinstance(value, datetime): + raise ValueError("%s must be None or datetime, not %r" % (key, value)) + # public API methods: def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" if msg_id in self._records: raise KeyError("Already have msg_id %r"%(msg_id)) + self._check_dates(rec) self._records[msg_id] = rec self._add_bytes(rec) self._maybe_cull() @@ -232,6 +231,7 @@ class DictDB(BaseDB): """Update the data in an existing record.""" if msg_id in self._culled_ids: raise KeyError("Record %r has been culled for size" % msg_id) + self._check_dates(rec) _rec = self._records[msg_id] self._drop_bytes(_rec) _rec.update(rec)