diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index 39fe4cf..dfc3d36 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -16,10 +16,11 @@ Authors: # Imports #------------------------------------------------------------------------------- +import time from IPython.parallel.error import TimeoutError -from IPython.parallel import error +from IPython.parallel import error, Client from IPython.parallel.tests import add_engines from .clienttest import ClusterTestCase @@ -121,5 +122,84 @@ class AsyncResultTest(ClusterTestCase): ar2.abort() self.assertRaises(error.TaskAborted, ar2.get) ar.get() + + def test_len(self): + v = self.client.load_balanced_view() + ar = v.map_async(lambda x: x, range(10)) + self.assertEquals(len(ar), 10) + ar = v.apply_async(lambda x: x, range(10)) + self.assertEquals(len(ar), 1) + ar = self.client[:].apply_async(lambda x: x, range(10)) + self.assertEquals(len(ar), len(self.client.ids)) + + def test_wall_time_single(self): + v = self.client.load_balanced_view() + ar = v.apply_async(time.sleep, 0.25) + self.assertRaises(TimeoutError, getattr, ar, 'wall_time') + ar.get(2) + self.assertTrue(ar.wall_time < 1.) + self.assertTrue(ar.wall_time > 0.2) + + def test_wall_time_multi(self): + self.minimum_engines(4) + v = self.client[:] + ar = v.apply_async(time.sleep, 0.25) + self.assertRaises(TimeoutError, getattr, ar, 'wall_time') + ar.get(2) + self.assertTrue(ar.wall_time < 1.) + self.assertTrue(ar.wall_time > 0.2) + + def test_serial_time_single(self): + v = self.client.load_balanced_view() + ar = v.apply_async(time.sleep, 0.25) + self.assertRaises(TimeoutError, getattr, ar, 'serial_time') + ar.get(2) + self.assertTrue(ar.serial_time < 0.5) + self.assertTrue(ar.serial_time > 0.2) + + def test_serial_time_multi(self): + self.minimum_engines(4) + v = self.client[:] + ar = v.apply_async(time.sleep, 0.25) + self.assertRaises(TimeoutError, getattr, ar, 'serial_time') + ar.get(2) + self.assertTrue(ar.serial_time < 2.) + self.assertTrue(ar.serial_time > 0.8) + + def test_elapsed_single(self): + v = self.client.load_balanced_view() + ar = v.apply_async(time.sleep, 0.25) + while not ar.ready(): + time.sleep(0.01) + self.assertTrue(ar.elapsed < 0.3) + self.assertTrue(ar.elapsed < 0.3) + ar.get(2) + + def test_elapsed_multi(self): + v = self.client[:] + ar = v.apply_async(time.sleep, 0.25) + while not ar.ready(): + time.sleep(0.01) + self.assertTrue(ar.elapsed < 0.3) + self.assertTrue(ar.elapsed < 0.3) + ar.get(2) + + def test_hubresult_timestamps(self): + self.minimum_engines(4) + v = self.client[:] + ar = v.apply_async(time.sleep, 0.25) + ar.get(2) + rc2 = Client(profile='iptest') + # must have try/finally to close second Client, otherwise + # will have dangling sockets causing problems + try: + time.sleep(0.25) + hr = rc2.get_result(ar.msg_ids) + self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed) + hr.get(1) + self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time)) + self.assertEquals(hr.serial_time, ar.serial_time) + finally: + rc2.close()