##// END OF EJS Templates
Merge pull request #1548 from minrk/ar_sugar...
Merge pull request #1548 from minrk/ar_sugar Add sugar methods/properties to AsyncResult that are generically useful: * `ar.wall_time` = received - submitted * `ar.serial_time` = sum of serial computation time * `ar.elapsed` = time since submission (wall_time if done) * `ar.progress` = (int) number of sub-tasks that have completed * `len(ar)` = # of tasks * `ar.wait_interactive()`: prints progress These are simple methods derived from the metadata/timestamps already created. But I've been persuaded by @wesm's practice of including simple methods that do useful (and/or cool) things. This also required/revealed some minor fixes/cleanup to clear_output in some cases: * dedent base `core.displaypub.clear_output`, so it's actually defined in the class * clear_output publishes `'\r\b'`, so it will clear terminal-like frontends that don't understand full clear_output behavior. * `core.display.clear_output()` still works, even outside an IPython session. Added a new notebook that shows how to use these new methods and how to do simple animations/progress bars using `clear_output()`. Added `Client.spin_thread(interval)` / `stop_spin_thread()` for running spin in a background thread, to keep zmq queue clear. This can be used to ensure that timing information is as accurate as possible (at the cost of having a background thread active).

File last commit:

r6471:66cc8f83
r6485:e0b43119 merge
Show More
test_asyncresult.py
205 lines | 7.4 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Tests for asyncresult.py
Authors:
* Min RK
"""
MinRK
update API after sagedays29...
r3664
#-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
test new AsyncResult properties
r6471 import time
MinRK
update API after sagedays29...
r3664
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel.error import TimeoutError
MinRK
update API after sagedays29...
r3664
MinRK
test new AsyncResult properties
r6471 from IPython.parallel import error, Client
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel.tests import add_engines
MinRK
update API after sagedays29...
r3664 from .clienttest import ClusterTestCase
def setup():
MinRK
expedite IPython.parallel tests...
r6162 add_engines(2, total=True)
MinRK
update API after sagedays29...
r3664
def wait(n):
import time
time.sleep(n)
return n
class AsyncResultTest(ClusterTestCase):
MinRK
fix AsyncResult.abort...
r6426 def test_single_result_view(self):
"""various one-target views get the right value for single_result"""
MinRK
update API after sagedays29...
r3664 eid = self.client.ids[-1]
ar = self.client[eid].apply_async(lambda : 42)
self.assertEquals(ar.get(), 42)
ar = self.client[[eid]].apply_async(lambda : 42)
self.assertEquals(ar.get(), [42])
ar = self.client[-1:].apply_async(lambda : 42)
self.assertEquals(ar.get(), [42])
def test_get_after_done(self):
ar = self.client[-1].apply_async(lambda : 42)
ar.wait()
self.assertTrue(ar.ready())
self.assertEquals(ar.get(), 42)
self.assertEquals(ar.get(), 42)
def test_get_before_done(self):
ar = self.client[-1].apply_async(wait, 0.1)
self.assertRaises(TimeoutError, ar.get, 0)
ar.wait(0)
self.assertFalse(ar.ready())
self.assertEquals(ar.get(), 0.1)
def test_get_after_error(self):
ar = self.client[-1].apply_async(lambda : 1/0)
MinRK
update parallel code for py3k...
r4155 ar.wait(10)
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(ZeroDivisionError, ar.get)
self.assertRaisesRemote(ZeroDivisionError, ar.get)
self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
def test_get_dict(self):
n = len(self.client)
ar = self.client[:].apply_async(lambda : 5)
self.assertEquals(ar.get(), [5]*n)
d = ar.get_dict()
self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
for eid,r in d.iteritems():
self.assertEquals(r, 5)
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222
def test_list_amr(self):
ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
rlist = list(ar)
def test_getattr(self):
ar = self.client[:].apply_async(wait, 0.5)
self.assertRaises(AttributeError, lambda : ar._foo)
self.assertRaises(AttributeError, lambda : ar.__length_hint__())
self.assertRaises(AttributeError, lambda : ar.foo)
self.assertRaises(AttributeError, lambda : ar.engine_id)
self.assertFalse(hasattr(ar, '__length_hint__'))
self.assertFalse(hasattr(ar, 'foo'))
self.assertFalse(hasattr(ar, 'engine_id'))
ar.get(5)
self.assertRaises(AttributeError, lambda : ar._foo)
self.assertRaises(AttributeError, lambda : ar.__length_hint__())
self.assertRaises(AttributeError, lambda : ar.foo)
self.assertTrue(isinstance(ar.engine_id, list))
self.assertEquals(ar.engine_id, ar['engine_id'])
self.assertFalse(hasattr(ar, '__length_hint__'))
self.assertFalse(hasattr(ar, 'foo'))
self.assertTrue(hasattr(ar, 'engine_id'))
def test_getitem(self):
ar = self.client[:].apply_async(wait, 0.5)
self.assertRaises(TimeoutError, lambda : ar['foo'])
self.assertRaises(TimeoutError, lambda : ar['engine_id'])
ar.get(5)
self.assertRaises(KeyError, lambda : ar['foo'])
self.assertTrue(isinstance(ar['engine_id'], list))
self.assertEquals(ar.engine_id, ar['engine_id'])
def test_single_result(self):
ar = self.client[-1].apply_async(wait, 0.5)
self.assertRaises(TimeoutError, lambda : ar['foo'])
self.assertRaises(TimeoutError, lambda : ar['engine_id'])
self.assertTrue(ar.get(5) == 0.5)
self.assertTrue(isinstance(ar['engine_id'], int))
self.assertTrue(isinstance(ar.engine_id, int))
self.assertEquals(ar.engine_id, ar['engine_id'])
MinRK
fix AsyncResult.abort...
r6426
def test_abort(self):
e = self.client[-1]
ar = e.execute('import time; time.sleep(1)', block=False)
ar2 = e.apply_async(lambda : 2)
ar2.abort()
self.assertRaises(error.TaskAborted, ar2.get)
ar.get()
MinRK
test new AsyncResult properties
r6471
def test_len(self):
v = self.client.load_balanced_view()
ar = v.map_async(lambda x: x, range(10))
self.assertEquals(len(ar), 10)
ar = v.apply_async(lambda x: x, range(10))
self.assertEquals(len(ar), 1)
ar = self.client[:].apply_async(lambda x: x, range(10))
self.assertEquals(len(ar), len(self.client.ids))
def test_wall_time_single(self):
v = self.client.load_balanced_view()
ar = v.apply_async(time.sleep, 0.25)
self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
ar.get(2)
self.assertTrue(ar.wall_time < 1.)
self.assertTrue(ar.wall_time > 0.2)
def test_wall_time_multi(self):
self.minimum_engines(4)
v = self.client[:]
ar = v.apply_async(time.sleep, 0.25)
self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
ar.get(2)
self.assertTrue(ar.wall_time < 1.)
self.assertTrue(ar.wall_time > 0.2)
def test_serial_time_single(self):
v = self.client.load_balanced_view()
ar = v.apply_async(time.sleep, 0.25)
self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
ar.get(2)
self.assertTrue(ar.serial_time < 0.5)
self.assertTrue(ar.serial_time > 0.2)
def test_serial_time_multi(self):
self.minimum_engines(4)
v = self.client[:]
ar = v.apply_async(time.sleep, 0.25)
self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
ar.get(2)
self.assertTrue(ar.serial_time < 2.)
self.assertTrue(ar.serial_time > 0.8)
def test_elapsed_single(self):
v = self.client.load_balanced_view()
ar = v.apply_async(time.sleep, 0.25)
while not ar.ready():
time.sleep(0.01)
self.assertTrue(ar.elapsed < 0.3)
self.assertTrue(ar.elapsed < 0.3)
ar.get(2)
def test_elapsed_multi(self):
v = self.client[:]
ar = v.apply_async(time.sleep, 0.25)
while not ar.ready():
time.sleep(0.01)
self.assertTrue(ar.elapsed < 0.3)
self.assertTrue(ar.elapsed < 0.3)
ar.get(2)
def test_hubresult_timestamps(self):
self.minimum_engines(4)
v = self.client[:]
ar = v.apply_async(time.sleep, 0.25)
ar.get(2)
rc2 = Client(profile='iptest')
# must have try/finally to close second Client, otherwise
# will have dangling sockets causing problems
try:
time.sleep(0.25)
hr = rc2.get_result(ar.msg_ids)
self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
hr.get(1)
self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
self.assertEquals(hr.serial_time, ar.serial_time)
finally:
rc2.close()
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222
MinRK
update API after sagedays29...
r3664