test_asyncresult.py
325 lines
| 11.4 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Tests for asyncresult.py | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3664 | |||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r6471 | import time | ||
MinRK
|
r3664 | |||
MinRK
|
r8140 | import nose.tools as nt | ||
MinRK
|
r7324 | from IPython.utils.io import capture_output | ||
MinRK
|
r3664 | |||
MinRK
|
r7324 | from IPython.parallel.error import TimeoutError | ||
MinRK
|
r6471 | from IPython.parallel import error, Client | ||
MinRK
|
r3666 | from IPython.parallel.tests import add_engines | ||
MinRK
|
r7324 | from .clienttest import ClusterTestCase | ||
MinRK
|
r3664 | |||
def setup(): | ||||
MinRK
|
r6162 | add_engines(2, total=True) | ||
MinRK
|
r3664 | |||
def wait(n): | ||||
import time | ||||
time.sleep(n) | ||||
return n | ||||
MinRK
|
r11362 | def echo(x): | ||
return x | ||||
MinRK
|
r3664 | class AsyncResultTest(ClusterTestCase): | ||
MinRK
|
r6426 | def test_single_result_view(self): | ||
"""various one-target views get the right value for single_result""" | ||||
MinRK
|
r3664 | eid = self.client.ids[-1] | ||
ar = self.client[eid].apply_async(lambda : 42) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.get(), 42) | ||
MinRK
|
r3664 | ar = self.client[[eid]].apply_async(lambda : 42) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.get(), [42]) | ||
MinRK
|
r3664 | ar = self.client[-1:].apply_async(lambda : 42) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.get(), [42]) | ||
MinRK
|
r3664 | |||
def test_get_after_done(self): | ||||
ar = self.client[-1].apply_async(lambda : 42) | ||||
ar.wait() | ||||
self.assertTrue(ar.ready()) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.get(), 42) | ||
self.assertEqual(ar.get(), 42) | ||||
MinRK
|
r3664 | |||
def test_get_before_done(self): | ||||
ar = self.client[-1].apply_async(wait, 0.1) | ||||
self.assertRaises(TimeoutError, ar.get, 0) | ||||
ar.wait(0) | ||||
self.assertFalse(ar.ready()) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.get(), 0.1) | ||
MinRK
|
r3664 | |||
def test_get_after_error(self): | ||||
ar = self.client[-1].apply_async(lambda : 1/0) | ||||
MinRK
|
r4155 | ar.wait(10) | ||
MinRK
|
r3664 | self.assertRaisesRemote(ZeroDivisionError, ar.get) | ||
self.assertRaisesRemote(ZeroDivisionError, ar.get) | ||||
self.assertRaisesRemote(ZeroDivisionError, ar.get_dict) | ||||
def test_get_dict(self): | ||||
n = len(self.client) | ||||
ar = self.client[:].apply_async(lambda : 5) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.get(), [5]*n) | ||
MinRK
|
r3664 | d = ar.get_dict() | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(sorted(d.keys()), sorted(self.client.ids)) | ||
MinRK
|
r3664 | for eid,r in d.iteritems(): | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, 5) | ||
MinRK
|
r5222 | |||
MinRK
|
r11362 | def test_get_dict_single(self): | ||
view = self.client[-1] | ||||
for v in (range(5), 5, ('abc', 'def'), 'string'): | ||||
ar = view.apply_async(echo, v) | ||||
self.assertEqual(ar.get(), v) | ||||
d = ar.get_dict() | ||||
self.assertEqual(d, {view.targets : v}) | ||||
def test_get_dict_bad(self): | ||||
ar = self.client[:].apply_async(lambda : 5) | ||||
ar2 = self.client[:].apply_async(lambda : 5) | ||||
ar = self.client.get_result(ar.msg_ids + ar2.msg_ids) | ||||
self.assertRaises(ValueError, ar.get_dict) | ||||
MinRK
|
r5222 | def test_list_amr(self): | ||
ar = self.client.load_balanced_view().map_async(wait, [0.1]*5) | ||||
rlist = list(ar) | ||||
def test_getattr(self): | ||||
ar = self.client[:].apply_async(wait, 0.5) | ||||
MinRK
|
r8105 | self.assertEqual(ar.engine_id, [None] * len(ar)) | ||
MinRK
|
r5222 | self.assertRaises(AttributeError, lambda : ar._foo) | ||
self.assertRaises(AttributeError, lambda : ar.__length_hint__()) | ||||
self.assertRaises(AttributeError, lambda : ar.foo) | ||||
self.assertFalse(hasattr(ar, '__length_hint__')) | ||||
self.assertFalse(hasattr(ar, 'foo')) | ||||
MinRK
|
r8105 | self.assertTrue(hasattr(ar, 'engine_id')) | ||
MinRK
|
r5222 | ar.get(5) | ||
self.assertRaises(AttributeError, lambda : ar._foo) | ||||
self.assertRaises(AttributeError, lambda : ar.__length_hint__()) | ||||
self.assertRaises(AttributeError, lambda : ar.foo) | ||||
self.assertTrue(isinstance(ar.engine_id, list)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.engine_id, ar['engine_id']) | ||
MinRK
|
r5222 | self.assertFalse(hasattr(ar, '__length_hint__')) | ||
self.assertFalse(hasattr(ar, 'foo')) | ||||
self.assertTrue(hasattr(ar, 'engine_id')) | ||||
def test_getitem(self): | ||||
ar = self.client[:].apply_async(wait, 0.5) | ||||
MinRK
|
r8105 | self.assertEqual(ar['engine_id'], [None] * len(ar)) | ||
self.assertRaises(KeyError, lambda : ar['foo']) | ||||
MinRK
|
r5222 | ar.get(5) | ||
self.assertRaises(KeyError, lambda : ar['foo']) | ||||
self.assertTrue(isinstance(ar['engine_id'], list)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.engine_id, ar['engine_id']) | ||
MinRK
|
r5222 | |||
def test_single_result(self): | ||||
ar = self.client[-1].apply_async(wait, 0.5) | ||||
MinRK
|
r8105 | self.assertRaises(KeyError, lambda : ar['foo']) | ||
self.assertEqual(ar['engine_id'], None) | ||||
MinRK
|
r5222 | self.assertTrue(ar.get(5) == 0.5) | ||
self.assertTrue(isinstance(ar['engine_id'], int)) | ||||
self.assertTrue(isinstance(ar.engine_id, int)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.engine_id, ar['engine_id']) | ||
MinRK
|
r6426 | |||
def test_abort(self): | ||||
e = self.client[-1] | ||||
ar = e.execute('import time; time.sleep(1)', block=False) | ||||
ar2 = e.apply_async(lambda : 2) | ||||
ar2.abort() | ||||
self.assertRaises(error.TaskAborted, ar2.get) | ||||
ar.get() | ||||
MinRK
|
r6471 | |||
def test_len(self): | ||||
v = self.client.load_balanced_view() | ||||
ar = v.map_async(lambda x: x, range(10)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(ar), 10) | ||
MinRK
|
r6471 | ar = v.apply_async(lambda x: x, range(10)) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(ar), 1) | ||
MinRK
|
r6471 | ar = self.client[:].apply_async(lambda x: x, range(10)) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(ar), len(self.client.ids)) | ||
MinRK
|
r6471 | |||
def test_wall_time_single(self): | ||||
v = self.client.load_balanced_view() | ||||
ar = v.apply_async(time.sleep, 0.25) | ||||
self.assertRaises(TimeoutError, getattr, ar, 'wall_time') | ||||
ar.get(2) | ||||
self.assertTrue(ar.wall_time < 1.) | ||||
self.assertTrue(ar.wall_time > 0.2) | ||||
def test_wall_time_multi(self): | ||||
self.minimum_engines(4) | ||||
v = self.client[:] | ||||
ar = v.apply_async(time.sleep, 0.25) | ||||
self.assertRaises(TimeoutError, getattr, ar, 'wall_time') | ||||
ar.get(2) | ||||
self.assertTrue(ar.wall_time < 1.) | ||||
self.assertTrue(ar.wall_time > 0.2) | ||||
def test_serial_time_single(self): | ||||
v = self.client.load_balanced_view() | ||||
ar = v.apply_async(time.sleep, 0.25) | ||||
self.assertRaises(TimeoutError, getattr, ar, 'serial_time') | ||||
ar.get(2) | ||||
MinRK
|
r6890 | self.assertTrue(ar.serial_time < 1.) | ||
MinRK
|
r6471 | self.assertTrue(ar.serial_time > 0.2) | ||
def test_serial_time_multi(self): | ||||
self.minimum_engines(4) | ||||
v = self.client[:] | ||||
ar = v.apply_async(time.sleep, 0.25) | ||||
self.assertRaises(TimeoutError, getattr, ar, 'serial_time') | ||||
ar.get(2) | ||||
self.assertTrue(ar.serial_time < 2.) | ||||
self.assertTrue(ar.serial_time > 0.8) | ||||
def test_elapsed_single(self): | ||||
v = self.client.load_balanced_view() | ||||
ar = v.apply_async(time.sleep, 0.25) | ||||
while not ar.ready(): | ||||
time.sleep(0.01) | ||||
MinRK
|
r6890 | self.assertTrue(ar.elapsed < 1) | ||
self.assertTrue(ar.elapsed < 1) | ||||
MinRK
|
r6471 | ar.get(2) | ||
def test_elapsed_multi(self): | ||||
v = self.client[:] | ||||
ar = v.apply_async(time.sleep, 0.25) | ||||
while not ar.ready(): | ||||
time.sleep(0.01) | ||||
MinRK
|
r6890 | self.assertTrue(ar.elapsed < 1) | ||
self.assertTrue(ar.elapsed < 1) | ||||
MinRK
|
r6471 | ar.get(2) | ||
def test_hubresult_timestamps(self): | ||||
self.minimum_engines(4) | ||||
v = self.client[:] | ||||
ar = v.apply_async(time.sleep, 0.25) | ||||
ar.get(2) | ||||
rc2 = Client(profile='iptest') | ||||
# must have try/finally to close second Client, otherwise | ||||
# will have dangling sockets causing problems | ||||
try: | ||||
time.sleep(0.25) | ||||
hr = rc2.get_result(ar.msg_ids) | ||||
self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed) | ||||
hr.get(1) | ||||
self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(hr.serial_time, ar.serial_time) | ||
MinRK
|
r6471 | finally: | ||
rc2.close() | ||||
MinRK
|
r5222 | |||
MinRK
|
r7129 | def test_display_empty_streams_single(self): | ||
"""empty stdout/err are not displayed (single result)""" | ||||
self.minimum_engines(1) | ||||
v = self.client[-1] | ||||
ar = v.execute("print (5555)") | ||||
ar.get(5) | ||||
with capture_output() as io: | ||||
ar.display_outputs() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stderr, '') | ||
self.assertEqual('5555\n', io.stdout) | ||||
MinRK
|
r7129 | |||
ar = v.execute("a=5") | ||||
ar.get(5) | ||||
with capture_output() as io: | ||||
ar.display_outputs() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stderr, '') | ||
self.assertEqual(io.stdout, '') | ||||
MinRK
|
r7129 | |||
def test_display_empty_streams_type(self): | ||||
"""empty stdout/err are not displayed (groupby type)""" | ||||
self.minimum_engines(1) | ||||
v = self.client[:] | ||||
ar = v.execute("print (5555)") | ||||
ar.get(5) | ||||
with capture_output() as io: | ||||
ar.display_outputs() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stderr, '') | ||
self.assertEqual(io.stdout.count('5555'), len(v), io.stdout) | ||||
MinRK
|
r7239 | self.assertFalse('\n\n' in io.stdout, io.stdout) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout) | ||
MinRK
|
r7129 | |||
ar = v.execute("a=5") | ||||
ar.get(5) | ||||
with capture_output() as io: | ||||
ar.display_outputs() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stderr, '') | ||
self.assertEqual(io.stdout, '') | ||||
MinRK
|
r7129 | |||
def test_display_empty_streams_engine(self): | ||||
"""empty stdout/err are not displayed (groupby engine)""" | ||||
self.minimum_engines(1) | ||||
v = self.client[:] | ||||
ar = v.execute("print (5555)") | ||||
ar.get(5) | ||||
with capture_output() as io: | ||||
ar.display_outputs('engine') | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stderr, '') | ||
self.assertEqual(io.stdout.count('5555'), len(v), io.stdout) | ||||
MinRK
|
r7239 | self.assertFalse('\n\n' in io.stdout, io.stdout) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout) | ||
MinRK
|
r7129 | |||
ar = v.execute("a=5") | ||||
ar.get(5) | ||||
with capture_output() as io: | ||||
ar.display_outputs('engine') | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(io.stderr, '') | ||
self.assertEqual(io.stdout, '') | ||||
MinRK
|
r8140 | |||
def test_await_data(self): | ||||
"""asking for ar.data flushes outputs""" | ||||
self.minimum_engines(1) | ||||
MinRK
|
r7129 | |||
MinRK
|
r8140 | v = self.client[-1] | ||
ar = v.execute('\n'.join([ | ||||
"import time", | ||||
MinRK
|
r9372 | "from IPython.kernel.zmq.datapub import publish_data", | ||
MinRK
|
r8140 | "for i in range(5):", | ||
" publish_data(dict(i=i))", | ||||
" time.sleep(0.1)", | ||||
]), block=False) | ||||
found = set() | ||||
tic = time.time() | ||||
# timeout after 10s | ||||
while time.time() <= tic + 10: | ||||
if ar.data: | ||||
found.add(ar.data['i']) | ||||
if ar.data['i'] == 4: | ||||
break | ||||
time.sleep(0.05) | ||||
ar.get(5) | ||||
nt.assert_in(4, found) | ||||
self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found) | ||||
MinRK
|
r11415 | |||
def test_not_single_result(self): | ||||
save_build = self.client._build_targets | ||||
def single_engine(*a, **kw): | ||||
idents, targets = save_build(*a, **kw) | ||||
return idents[:1], targets[:1] | ||||
ids = single_engine('all')[1] | ||||
self.client._build_targets = single_engine | ||||
for targets in ('all', None, ids): | ||||
dv = self.client.direct_view(targets=targets) | ||||
ar = dv.apply_async(lambda : 5) | ||||
self.assertEqual(ar.get(10), [5]) | ||||
self.client._build_targets = save_build | ||||
MinRK
|
r3664 | |||