diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 72d6a21..cd8456b 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -304,6 +304,20 @@ class TestClient(ClusterTestCase): """wait for an engine to become idle, according to the Hub""" rc = self.client + # step 1. wait for all requests to be noticed + # timeout 5s, polling every 100ms + msg_ids = set(rc.history) + hub_hist = rc.hub_history() + for i in range(50): + if msg_ids.difference(hub_hist): + time.sleep(0.1) + hub_hist = rc.hub_history() + else: + break + + self.assertEqual(len(msg_ids.difference(hub_hist)), 0) + + # step 2. wait for all requests to be done # timeout 5s, polling every 100ms qs = rc.queue_status() for i in range(50): @@ -407,7 +421,7 @@ class TestClient(ClusterTestCase): # Wait for the Hub to realise the result is done: # This prevents a race condition, where we # might purge a result the Hub still thinks is pending. - time.sleep(0.1) + self._wait_for_idle() rc2 = clientmod.Client(profile='iptest') hist = self.client.hub_history() ahr = rc2.get_result([hist[-1]]) @@ -423,7 +437,7 @@ class TestClient(ClusterTestCase): res = [] for i in range(5): res.append(self.client[:].apply_async(lambda : 1)) - time.sleep(0.1) + self._wait_for_idle() self.client.wait(10) # wait for the results to come back before = len(self.client.results) self.assertEqual(len(self.client.metadata),before) @@ -446,11 +460,11 @@ class TestClient(ClusterTestCase): for i in range(5): self.client[:].apply_sync(lambda : 1) self.client.wait(10) + self._wait_for_idle() self.client.purge_results('all') self.assertEqual(len(self.client.results), 0, msg="Results not empty") self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty") - time.sleep(0.1) - hist = self.client.hub_history()# + hist = self.client.hub_history() self.assertEqual(len(hist), 0, msg="hub history not empty") def test_purge_everything(self): @@ -458,16 +472,17 @@ class TestClient(ClusterTestCase): for i in range(5): self.client[:].apply_sync(lambda : 1) self.client.wait(10) + self._wait_for_idle() self.client.purge_everything() # The client results self.assertEqual(len(self.client.results), 0, msg="Results not empty") self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty") - # the hub results - hist = self.client.hub_history() - self.assertEqual(len(hist), 0, msg="hub history not empty") # The client "bookkeeping" self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty") self.assertEqual(len(self.client.history), 0, msg="client history not empty") + # the hub results + hist = self.client.hub_history() + self.assertEqual(len(hist), 0, msg="hub history not empty") def test_spin_thread(self):