From fa181ff0df17c855dff3e4dfafb6c3928daf981e 2012-05-13 22:09:41 From: MinRK Date: 2012-05-13 22:09:41 Subject: [PATCH] resubmitted tasks are now wholly separate (new msg_ids) This removes much of the fragility due to trying to cram a resubmit into the original ID. The `resubmit` record is now the resubmitted task ID, rather than the timestamp. --- diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 3a9bfcd..15a08eb 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -1286,12 +1286,6 @@ class Client(HasTraits): raise TypeError("indices must be str or int, not %r"%id) theids.append(id) - for msg_id in theids: - self.outstanding.discard(msg_id) - if msg_id in self.history: - self.history.remove(msg_id) - self.results.pop(msg_id, None) - self.metadata.pop(msg_id, None) content = dict(msg_ids = theids) self.session.send(self._query_socket, 'resubmit_request', content) @@ -1303,8 +1297,10 @@ class Client(HasTraits): content = msg['content'] if content['status'] != 'ok': raise self._unwrap_exception(content) + mapping = content['resubmitted'] + new_ids = [ mapping[msg_id] for msg_id in theids ] - ar = AsyncHubResult(self, msg_ids=theids) + ar = AsyncHubResult(self, msg_ids=new_ids) if block: ar.wait() diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 622dff8..5f55824 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -1134,7 +1134,7 @@ class Hub(SessionFactory): # validate msg_ids found_ids = [ rec['msg_id'] for rec in records ] - invalid_ids = filter(lambda m: m in self.pending, found_ids) + pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ] if len(records) > len(msg_ids): try: raise RuntimeError("DB appears to be in an inconsistent state." @@ -1147,40 +1147,46 @@ class Hub(SessionFactory): raise KeyError("No such msg(s): %r" % missing) except KeyError: return finish(error.wrap_exception()) - elif invalid_ids: - msg_id = invalid_ids[0] + elif pending_ids: + pass + # no need to raise on resubmit of pending task, now that we + # resubmit under new ID, but do we want to raise anyway? + # msg_id = invalid_ids[0] + # try: + # raise ValueError("Task(s) %r appears to be inflight" % ) + # except Exception: + # return finish(error.wrap_exception()) + + # mapping of original IDs to resubmitted IDs + resubmitted = {} + + # send the messages + for rec in records: + header = rec['header'] + msg = self.session.msg(header['msg_type']) + msg_id = msg['msg_id'] + msg['content'] = rec['content'] + header.update(msg['header']) + msg['header'] = header + + self.session.send(self.resubmit, msg, buffers=rec['buffers']) + + resubmitted[rec['msg_id']] = msg_id + self.pending.add(msg_id) + msg['buffers'] = [] try: - raise ValueError("Task %r appears to be inflight" % msg_id) + self.db.add_record(msg_id, init_record(msg)) except Exception: - return finish(error.wrap_exception()) + self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) - # clear the existing records - now = datetime.now() - rec = empty_record() - map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) - rec['resubmitted'] = now - rec['queue'] = 'task' - rec['client_uuid'] = client_id[0] - try: - for msg_id in msg_ids: - self.all_completed.discard(msg_id) - self.db.update_record(msg_id, rec) - except Exception: - self.log.error('db::db error upating record', exc_info=True) - reply = error.wrap_exception() - else: - # send the messages - for rec in records: - header = rec['header'] - # include resubmitted in header to prevent digest collision - header['resubmitted'] = now - msg = self.session.msg(header['msg_type']) - msg['content'] = rec['content'] - msg['header'] = header - msg['header']['msg_id'] = rec['msg_id'] - self.session.send(self.resubmit, msg, buffers=rec['buffers']) - - finish(dict(status='ok')) + finish(dict(status='ok', resubmitted=resubmitted)) + + # store the new IDs in the Task DB + for msg_id, resubmit_id in resubmitted.iteritems(): + try: + self.db.update_record(msg_id, {'resubmitted' : resubmit_id}) + except Exception: + self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) def _extract_record(self, rec): diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index 088ca97..2b017d4 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -138,7 +138,7 @@ class SQLiteDB(BaseDB): 'engine_uuid' : 'text', 'started' : 'timestamp', 'completed' : 'timestamp', - 'resubmitted' : 'timestamp', + 'resubmitted' : 'text', 'received' : 'timestamp', 'result_header' : 'dict text', 'result_content' : 'dict text', @@ -247,7 +247,7 @@ class SQLiteDB(BaseDB): engine_uuid text, started timestamp, completed timestamp, - resubmitted timestamp, + resubmitted text, received timestamp, result_header dict text, result_content dict text, diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 2475cd9..99a0060 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -331,13 +331,14 @@ class TestClient(ClusterTestCase): r2 = ahr.get(1) def test_resubmit_inflight(self): - """ensure ValueError on resubmit of inflight task""" + """resubmit of inflight task""" v = self.client.load_balanced_view() ar = v.apply_async(time.sleep,1) # give the message a chance to arrive time.sleep(0.2) - self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids) + ahr = self.client.resubmit(ar.msg_ids) ar.get(2) + ahr.get(2) def test_resubmit_badkey(self): """ensure KeyError on resubmit of nonexistant task""" diff --git a/docs/source/parallel/parallel_db.txt b/docs/source/parallel/parallel_db.txt index 648223f..a742f57 100644 --- a/docs/source/parallel/parallel_db.txt +++ b/docs/source/parallel/parallel_db.txt @@ -34,7 +34,7 @@ TaskRecord keys: =============== =============== ============= Key Type Description =============== =============== ============= -msg_id uuid(bytes) The msg ID +msg_id uuid(ascii) The msg ID header dict The request header content dict The request content (likely empty) buffers list(bytes) buffers containing serialized request objects @@ -43,7 +43,7 @@ client_uuid uuid(bytes) IDENT of client's socket engine_uuid uuid(bytes) IDENT of engine's socket started datetime time task began execution on engine completed datetime time task finished execution (success or failure) on engine -resubmitted datetime time of resubmission (if applicable) +resubmitted uuid(ascii) msg_id of resubmitted task (if applicable) result_header dict header for result result_content dict content for result result_buffers list(bytes) buffers containing serialized request objects