diff --git a/IPython/parallel/controller/dictdb.py b/IPython/parallel/controller/dictdb.py index 0b4a3c7..93fbd29 100644 --- a/IPython/parallel/controller/dictdb.py +++ b/IPython/parallel/controller/dictdb.py @@ -46,7 +46,7 @@ We support a subset of mongodb operators: # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- -from copy import copy +from copy import deepcopy as copy from datetime import datetime from IPython.config.configurable import LoggingConfigurable @@ -129,7 +129,7 @@ class DictDB(BaseDB): d['msg_id'] = rec['msg_id'] for key in keys: d[key] = rec[key] - return d + return copy(d) def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 892e5e5..c1165f6 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -1166,7 +1166,11 @@ class Hub(SessionFactory): msg = self.session.msg(header['msg_type']) msg_id = msg['msg_id'] msg['content'] = rec['content'] - header.update(msg['header']) + + # use the old header, but update msg_id and timestamp + fresh = msg['header'] + header['msg_id'] = fresh['msg_id'] + header['date'] = fresh['date'] msg['header'] = header self.session.send(self.resubmit, msg, buffers=rec['buffers']) diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 99a0060..87e34db 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -312,6 +312,28 @@ class TestClient(ClusterTestCase): r2 = ahr.get(1) self.assertFalse(r1 == r2) + def test_resubmit_header(self): + """resubmit shouldn't clobber the whole header""" + def f(): + import random + return random.random() + v = self.client.load_balanced_view() + v.retries = 1 + ar = v.apply_async(f) + r1 = ar.get(1) + # give the Hub a chance to notice: + self._wait_for_idle() + ahr = self.client.resubmit(ar.msg_ids) + ahr.get(1) + time.sleep(0.5) + records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header') + h1,h2 = [ r['header'] for r in records ] + for key in set(h1.keys()).union(set(h2.keys())): + if key in ('msg_id', 'date'): + self.assertNotEquals(h1[key], h2[key]) + else: + self.assertEquals(h1[key], h2[key]) + def test_resubmit_aborted(self): def f(): import random @@ -384,4 +406,3 @@ class TestClient(ClusterTestCase): "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time ) - diff --git a/IPython/parallel/tests/test_db.py b/IPython/parallel/tests/test_db.py index 01cc523..e1f2aed 100644 --- a/IPython/parallel/tests/test_db.py +++ b/IPython/parallel/tests/test_db.py @@ -197,9 +197,11 @@ class TestDictBackend(TestCase): rec = self.db.get_record(msg_id) rec.pop('buffers') rec['garbage'] = 'hello' + rec['header']['msg_id'] = 'fubar' rec2 = self.db.get_record(msg_id) self.assertTrue('buffers' in rec2) self.assertFalse('garbage' in rec2) + self.assertEquals(rec2['header']['msg_id'], msg_id) def test_pop_safe_find(self): """editing query results shouldn't affect record [find]""" @@ -207,19 +209,23 @@ class TestDictBackend(TestCase): rec = self.db.find_records({'msg_id' : msg_id})[0] rec.pop('buffers') rec['garbage'] = 'hello' + rec['header']['msg_id'] = 'fubar' rec2 = self.db.find_records({'msg_id' : msg_id})[0] self.assertTrue('buffers' in rec2) self.assertFalse('garbage' in rec2) + self.assertEquals(rec2['header']['msg_id'], msg_id) def test_pop_safe_find_keys(self): """editing query results shouldn't affect record [find+keys]""" msg_id = self.db.get_history()[-1] - rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers'])[0] + rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers', 'header'])[0] rec.pop('buffers') rec['garbage'] = 'hello' + rec['header']['msg_id'] = 'fubar' rec2 = self.db.find_records({'msg_id' : msg_id})[0] self.assertTrue('buffers' in rec2) self.assertFalse('garbage' in rec2) + self.assertEquals(rec2['header']['msg_id'], msg_id) class TestSQLiteBackend(TestDictBackend):