diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index c1165f6..9c8f5a3 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -1163,7 +1163,7 @@ class Hub(SessionFactory): # send the messages for rec in records: header = rec['header'] - msg = self.session.msg(header['msg_type']) + msg = self.session.msg(header['msg_type'], parent=header) msg_id = msg['msg_id'] msg['content'] = rec['content'] @@ -1177,7 +1177,7 @@ class Hub(SessionFactory): resubmitted[rec['msg_id']] = msg_id self.pending.add(msg_id) - msg['buffers'] = [] + msg['buffers'] = rec['buffers'] try: self.db.add_record(msg_id, init_record(msg)) except Exception: diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 87e34db..263c0a4 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -312,6 +312,20 @@ class TestClient(ClusterTestCase): r2 = ahr.get(1) self.assertFalse(r1 == r2) + def test_resubmit_chain(self): + """resubmit resubmitted tasks""" + v = self.client.load_balanced_view() + ar = v.apply_async(lambda x: x, 'x'*1024) + ar.get() + self._wait_for_idle() + ars = [ar] + + for i in range(10): + ar = ars[-1] + ar2 = self.client.resubmit(ar.msg_ids) + + [ ar.get() for ar in ars ] + def test_resubmit_header(self): """resubmit shouldn't clobber the whole header""" def f():