|
|
"""Tests for asyncresult.py
|
|
|
|
|
|
Authors:
|
|
|
|
|
|
* Min RK
|
|
|
"""
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Copyright (C) 2011 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
from IPython.parallel.error import TimeoutError
|
|
|
|
|
|
from IPython.parallel import error
|
|
|
from IPython.parallel.tests import add_engines
|
|
|
from .clienttest import ClusterTestCase
|
|
|
|
|
|
def setup():
|
|
|
add_engines(2, total=True)
|
|
|
|
|
|
def wait(n):
|
|
|
import time
|
|
|
time.sleep(n)
|
|
|
return n
|
|
|
|
|
|
class AsyncResultTest(ClusterTestCase):
|
|
|
|
|
|
def test_single_result_view(self):
|
|
|
"""various one-target views get the right value for single_result"""
|
|
|
eid = self.client.ids[-1]
|
|
|
ar = self.client[eid].apply_async(lambda : 42)
|
|
|
self.assertEquals(ar.get(), 42)
|
|
|
ar = self.client[[eid]].apply_async(lambda : 42)
|
|
|
self.assertEquals(ar.get(), [42])
|
|
|
ar = self.client[-1:].apply_async(lambda : 42)
|
|
|
self.assertEquals(ar.get(), [42])
|
|
|
|
|
|
def test_get_after_done(self):
|
|
|
ar = self.client[-1].apply_async(lambda : 42)
|
|
|
ar.wait()
|
|
|
self.assertTrue(ar.ready())
|
|
|
self.assertEquals(ar.get(), 42)
|
|
|
self.assertEquals(ar.get(), 42)
|
|
|
|
|
|
def test_get_before_done(self):
|
|
|
ar = self.client[-1].apply_async(wait, 0.1)
|
|
|
self.assertRaises(TimeoutError, ar.get, 0)
|
|
|
ar.wait(0)
|
|
|
self.assertFalse(ar.ready())
|
|
|
self.assertEquals(ar.get(), 0.1)
|
|
|
|
|
|
def test_get_after_error(self):
|
|
|
ar = self.client[-1].apply_async(lambda : 1/0)
|
|
|
ar.wait(10)
|
|
|
self.assertRaisesRemote(ZeroDivisionError, ar.get)
|
|
|
self.assertRaisesRemote(ZeroDivisionError, ar.get)
|
|
|
self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
|
|
|
|
|
|
def test_get_dict(self):
|
|
|
n = len(self.client)
|
|
|
ar = self.client[:].apply_async(lambda : 5)
|
|
|
self.assertEquals(ar.get(), [5]*n)
|
|
|
d = ar.get_dict()
|
|
|
self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
|
|
|
for eid,r in d.iteritems():
|
|
|
self.assertEquals(r, 5)
|
|
|
|
|
|
def test_list_amr(self):
|
|
|
ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
|
|
|
rlist = list(ar)
|
|
|
|
|
|
def test_getattr(self):
|
|
|
ar = self.client[:].apply_async(wait, 0.5)
|
|
|
self.assertRaises(AttributeError, lambda : ar._foo)
|
|
|
self.assertRaises(AttributeError, lambda : ar.__length_hint__())
|
|
|
self.assertRaises(AttributeError, lambda : ar.foo)
|
|
|
self.assertRaises(AttributeError, lambda : ar.engine_id)
|
|
|
self.assertFalse(hasattr(ar, '__length_hint__'))
|
|
|
self.assertFalse(hasattr(ar, 'foo'))
|
|
|
self.assertFalse(hasattr(ar, 'engine_id'))
|
|
|
ar.get(5)
|
|
|
self.assertRaises(AttributeError, lambda : ar._foo)
|
|
|
self.assertRaises(AttributeError, lambda : ar.__length_hint__())
|
|
|
self.assertRaises(AttributeError, lambda : ar.foo)
|
|
|
self.assertTrue(isinstance(ar.engine_id, list))
|
|
|
self.assertEquals(ar.engine_id, ar['engine_id'])
|
|
|
self.assertFalse(hasattr(ar, '__length_hint__'))
|
|
|
self.assertFalse(hasattr(ar, 'foo'))
|
|
|
self.assertTrue(hasattr(ar, 'engine_id'))
|
|
|
|
|
|
def test_getitem(self):
|
|
|
ar = self.client[:].apply_async(wait, 0.5)
|
|
|
self.assertRaises(TimeoutError, lambda : ar['foo'])
|
|
|
self.assertRaises(TimeoutError, lambda : ar['engine_id'])
|
|
|
ar.get(5)
|
|
|
self.assertRaises(KeyError, lambda : ar['foo'])
|
|
|
self.assertTrue(isinstance(ar['engine_id'], list))
|
|
|
self.assertEquals(ar.engine_id, ar['engine_id'])
|
|
|
|
|
|
def test_single_result(self):
|
|
|
ar = self.client[-1].apply_async(wait, 0.5)
|
|
|
self.assertRaises(TimeoutError, lambda : ar['foo'])
|
|
|
self.assertRaises(TimeoutError, lambda : ar['engine_id'])
|
|
|
self.assertTrue(ar.get(5) == 0.5)
|
|
|
self.assertTrue(isinstance(ar['engine_id'], int))
|
|
|
self.assertTrue(isinstance(ar.engine_id, int))
|
|
|
self.assertEquals(ar.engine_id, ar['engine_id'])
|
|
|
|
|
|
def test_abort(self):
|
|
|
e = self.client[-1]
|
|
|
ar = e.execute('import time; time.sleep(1)', block=False)
|
|
|
ar2 = e.apply_async(lambda : 2)
|
|
|
ar2.abort()
|
|
|
self.assertRaises(error.TaskAborted, ar2.get)
|
|
|
ar.get()
|
|
|
|
|
|
|
|
|
|