##// END OF EJS Templates
test new AsyncResult properties
MinRK -
Show More
@@ -1,125 +1,205
1 1 """Tests for asyncresult.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 import time
19 20
20 21 from IPython.parallel.error import TimeoutError
21 22
22 from IPython.parallel import error
23 from IPython.parallel import error, Client
23 24 from IPython.parallel.tests import add_engines
24 25 from .clienttest import ClusterTestCase
25 26
26 27 def setup():
27 28 add_engines(2, total=True)
28 29
29 30 def wait(n):
30 31 import time
31 32 time.sleep(n)
32 33 return n
33 34
34 35 class AsyncResultTest(ClusterTestCase):
35 36
36 37 def test_single_result_view(self):
37 38 """various one-target views get the right value for single_result"""
38 39 eid = self.client.ids[-1]
39 40 ar = self.client[eid].apply_async(lambda : 42)
40 41 self.assertEquals(ar.get(), 42)
41 42 ar = self.client[[eid]].apply_async(lambda : 42)
42 43 self.assertEquals(ar.get(), [42])
43 44 ar = self.client[-1:].apply_async(lambda : 42)
44 45 self.assertEquals(ar.get(), [42])
45 46
46 47 def test_get_after_done(self):
47 48 ar = self.client[-1].apply_async(lambda : 42)
48 49 ar.wait()
49 50 self.assertTrue(ar.ready())
50 51 self.assertEquals(ar.get(), 42)
51 52 self.assertEquals(ar.get(), 42)
52 53
53 54 def test_get_before_done(self):
54 55 ar = self.client[-1].apply_async(wait, 0.1)
55 56 self.assertRaises(TimeoutError, ar.get, 0)
56 57 ar.wait(0)
57 58 self.assertFalse(ar.ready())
58 59 self.assertEquals(ar.get(), 0.1)
59 60
60 61 def test_get_after_error(self):
61 62 ar = self.client[-1].apply_async(lambda : 1/0)
62 63 ar.wait(10)
63 64 self.assertRaisesRemote(ZeroDivisionError, ar.get)
64 65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
65 66 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
66 67
67 68 def test_get_dict(self):
68 69 n = len(self.client)
69 70 ar = self.client[:].apply_async(lambda : 5)
70 71 self.assertEquals(ar.get(), [5]*n)
71 72 d = ar.get_dict()
72 73 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
73 74 for eid,r in d.iteritems():
74 75 self.assertEquals(r, 5)
75 76
76 77 def test_list_amr(self):
77 78 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
78 79 rlist = list(ar)
79 80
80 81 def test_getattr(self):
81 82 ar = self.client[:].apply_async(wait, 0.5)
82 83 self.assertRaises(AttributeError, lambda : ar._foo)
83 84 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
84 85 self.assertRaises(AttributeError, lambda : ar.foo)
85 86 self.assertRaises(AttributeError, lambda : ar.engine_id)
86 87 self.assertFalse(hasattr(ar, '__length_hint__'))
87 88 self.assertFalse(hasattr(ar, 'foo'))
88 89 self.assertFalse(hasattr(ar, 'engine_id'))
89 90 ar.get(5)
90 91 self.assertRaises(AttributeError, lambda : ar._foo)
91 92 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
92 93 self.assertRaises(AttributeError, lambda : ar.foo)
93 94 self.assertTrue(isinstance(ar.engine_id, list))
94 95 self.assertEquals(ar.engine_id, ar['engine_id'])
95 96 self.assertFalse(hasattr(ar, '__length_hint__'))
96 97 self.assertFalse(hasattr(ar, 'foo'))
97 98 self.assertTrue(hasattr(ar, 'engine_id'))
98 99
99 100 def test_getitem(self):
100 101 ar = self.client[:].apply_async(wait, 0.5)
101 102 self.assertRaises(TimeoutError, lambda : ar['foo'])
102 103 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
103 104 ar.get(5)
104 105 self.assertRaises(KeyError, lambda : ar['foo'])
105 106 self.assertTrue(isinstance(ar['engine_id'], list))
106 107 self.assertEquals(ar.engine_id, ar['engine_id'])
107 108
108 109 def test_single_result(self):
109 110 ar = self.client[-1].apply_async(wait, 0.5)
110 111 self.assertRaises(TimeoutError, lambda : ar['foo'])
111 112 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
112 113 self.assertTrue(ar.get(5) == 0.5)
113 114 self.assertTrue(isinstance(ar['engine_id'], int))
114 115 self.assertTrue(isinstance(ar.engine_id, int))
115 116 self.assertEquals(ar.engine_id, ar['engine_id'])
116 117
117 118 def test_abort(self):
118 119 e = self.client[-1]
119 120 ar = e.execute('import time; time.sleep(1)', block=False)
120 121 ar2 = e.apply_async(lambda : 2)
121 122 ar2.abort()
122 123 self.assertRaises(error.TaskAborted, ar2.get)
123 124 ar.get()
125
126 def test_len(self):
127 v = self.client.load_balanced_view()
128 ar = v.map_async(lambda x: x, range(10))
129 self.assertEquals(len(ar), 10)
130 ar = v.apply_async(lambda x: x, range(10))
131 self.assertEquals(len(ar), 1)
132 ar = self.client[:].apply_async(lambda x: x, range(10))
133 self.assertEquals(len(ar), len(self.client.ids))
134
135 def test_wall_time_single(self):
136 v = self.client.load_balanced_view()
137 ar = v.apply_async(time.sleep, 0.25)
138 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
139 ar.get(2)
140 self.assertTrue(ar.wall_time < 1.)
141 self.assertTrue(ar.wall_time > 0.2)
142
143 def test_wall_time_multi(self):
144 self.minimum_engines(4)
145 v = self.client[:]
146 ar = v.apply_async(time.sleep, 0.25)
147 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
148 ar.get(2)
149 self.assertTrue(ar.wall_time < 1.)
150 self.assertTrue(ar.wall_time > 0.2)
151
152 def test_serial_time_single(self):
153 v = self.client.load_balanced_view()
154 ar = v.apply_async(time.sleep, 0.25)
155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
156 ar.get(2)
157 self.assertTrue(ar.serial_time < 0.5)
158 self.assertTrue(ar.serial_time > 0.2)
159
160 def test_serial_time_multi(self):
161 self.minimum_engines(4)
162 v = self.client[:]
163 ar = v.apply_async(time.sleep, 0.25)
164 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
165 ar.get(2)
166 self.assertTrue(ar.serial_time < 2.)
167 self.assertTrue(ar.serial_time > 0.8)
168
169 def test_elapsed_single(self):
170 v = self.client.load_balanced_view()
171 ar = v.apply_async(time.sleep, 0.25)
172 while not ar.ready():
173 time.sleep(0.01)
174 self.assertTrue(ar.elapsed < 0.3)
175 self.assertTrue(ar.elapsed < 0.3)
176 ar.get(2)
177
178 def test_elapsed_multi(self):
179 v = self.client[:]
180 ar = v.apply_async(time.sleep, 0.25)
181 while not ar.ready():
182 time.sleep(0.01)
183 self.assertTrue(ar.elapsed < 0.3)
184 self.assertTrue(ar.elapsed < 0.3)
185 ar.get(2)
186
187 def test_hubresult_timestamps(self):
188 self.minimum_engines(4)
189 v = self.client[:]
190 ar = v.apply_async(time.sleep, 0.25)
191 ar.get(2)
192 rc2 = Client(profile='iptest')
193 # must have try/finally to close second Client, otherwise
194 # will have dangling sockets causing problems
195 try:
196 time.sleep(0.25)
197 hr = rc2.get_result(ar.msg_ids)
198 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
199 hr.get(1)
200 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
201 self.assertEquals(hr.serial_time, ar.serial_time)
202 finally:
203 rc2.close()
124 204
125 205
General Comments 0
You need to be logged in to leave comments. Login now