From 3531213400ae2716a0ae28e2cae6d6398ee7522f 2012-05-13 22:09:38 From: MinRK Date: 2012-05-13 22:09:38 Subject: [PATCH] test resubmit of aborted tasks (#1647) --- diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 48ba9e9..2475cd9 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -280,6 +280,25 @@ class TestClient(ClusterTestCase): time.sleep(0.25) self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) + def _wait_for_idle(self): + """wait for an engine to become idle, according to the Hub""" + rc = self.client + + # timeout 2s, polling every 100ms + for i in range(20): + qs = rc.queue_status() + if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids): + time.sleep(0.1) + else: + break + + # ensure Hub up to date: + qs = rc.queue_status() + self.assertEquals(qs['unassigned'], 0) + for eid in rc.ids: + self.assertEquals(qs[eid]['tasks'], 0) + + def test_resubmit(self): def f(): import random @@ -288,11 +307,29 @@ class TestClient(ClusterTestCase): ar = v.apply_async(f) r1 = ar.get(1) # give the Hub a chance to notice: - time.sleep(0.5) + self._wait_for_idle() ahr = self.client.resubmit(ar.msg_ids) r2 = ahr.get(1) self.assertFalse(r1 == r2) + def test_resubmit_aborted(self): + def f(): + import random + return random.random() + v = self.client.load_balanced_view() + # restrict to one engine, so we can put a sleep + # ahead of the task, so it will get aborted + eid = self.client.ids[-1] + v.targets = [eid] + sleep = v.apply_async(time.sleep, 0.5) + ar = v.apply_async(f) + ar.abort() + self.assertRaises(error.TaskAborted, ar.get) + # Give the Hub a chance to get up to date: + self._wait_for_idle() + ahr = self.client.resubmit(ar.msg_ids) + r2 = ahr.get(1) + def test_resubmit_inflight(self): """ensure ValueError on resubmit of inflight task""" v = self.client.load_balanced_view()