##// END OF EJS Templates
test AsyncResult.get_dict with single results and invalid input
MinRK -
Show More
@@ -1,294 +1,311 b''
1 """Tests for asyncresult.py
1 """Tests for asyncresult.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import time
19 import time
20
20
21 import nose.tools as nt
21 import nose.tools as nt
22
22
23 from IPython.utils.io import capture_output
23 from IPython.utils.io import capture_output
24
24
25 from IPython.parallel.error import TimeoutError
25 from IPython.parallel.error import TimeoutError
26 from IPython.parallel import error, Client
26 from IPython.parallel import error, Client
27 from IPython.parallel.tests import add_engines
27 from IPython.parallel.tests import add_engines
28 from .clienttest import ClusterTestCase
28 from .clienttest import ClusterTestCase
29
29
30 def setup():
30 def setup():
31 add_engines(2, total=True)
31 add_engines(2, total=True)
32
32
33 def wait(n):
33 def wait(n):
34 import time
34 import time
35 time.sleep(n)
35 time.sleep(n)
36 return n
36 return n
37
37
38 def echo(x):
39 return x
40
38 class AsyncResultTest(ClusterTestCase):
41 class AsyncResultTest(ClusterTestCase):
39
42
40 def test_single_result_view(self):
43 def test_single_result_view(self):
41 """various one-target views get the right value for single_result"""
44 """various one-target views get the right value for single_result"""
42 eid = self.client.ids[-1]
45 eid = self.client.ids[-1]
43 ar = self.client[eid].apply_async(lambda : 42)
46 ar = self.client[eid].apply_async(lambda : 42)
44 self.assertEqual(ar.get(), 42)
47 self.assertEqual(ar.get(), 42)
45 ar = self.client[[eid]].apply_async(lambda : 42)
48 ar = self.client[[eid]].apply_async(lambda : 42)
46 self.assertEqual(ar.get(), [42])
49 self.assertEqual(ar.get(), [42])
47 ar = self.client[-1:].apply_async(lambda : 42)
50 ar = self.client[-1:].apply_async(lambda : 42)
48 self.assertEqual(ar.get(), [42])
51 self.assertEqual(ar.get(), [42])
49
52
50 def test_get_after_done(self):
53 def test_get_after_done(self):
51 ar = self.client[-1].apply_async(lambda : 42)
54 ar = self.client[-1].apply_async(lambda : 42)
52 ar.wait()
55 ar.wait()
53 self.assertTrue(ar.ready())
56 self.assertTrue(ar.ready())
54 self.assertEqual(ar.get(), 42)
57 self.assertEqual(ar.get(), 42)
55 self.assertEqual(ar.get(), 42)
58 self.assertEqual(ar.get(), 42)
56
59
57 def test_get_before_done(self):
60 def test_get_before_done(self):
58 ar = self.client[-1].apply_async(wait, 0.1)
61 ar = self.client[-1].apply_async(wait, 0.1)
59 self.assertRaises(TimeoutError, ar.get, 0)
62 self.assertRaises(TimeoutError, ar.get, 0)
60 ar.wait(0)
63 ar.wait(0)
61 self.assertFalse(ar.ready())
64 self.assertFalse(ar.ready())
62 self.assertEqual(ar.get(), 0.1)
65 self.assertEqual(ar.get(), 0.1)
63
66
64 def test_get_after_error(self):
67 def test_get_after_error(self):
65 ar = self.client[-1].apply_async(lambda : 1/0)
68 ar = self.client[-1].apply_async(lambda : 1/0)
66 ar.wait(10)
69 ar.wait(10)
67 self.assertRaisesRemote(ZeroDivisionError, ar.get)
70 self.assertRaisesRemote(ZeroDivisionError, ar.get)
68 self.assertRaisesRemote(ZeroDivisionError, ar.get)
71 self.assertRaisesRemote(ZeroDivisionError, ar.get)
69 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
72 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
70
73
71 def test_get_dict(self):
74 def test_get_dict(self):
72 n = len(self.client)
75 n = len(self.client)
73 ar = self.client[:].apply_async(lambda : 5)
76 ar = self.client[:].apply_async(lambda : 5)
74 self.assertEqual(ar.get(), [5]*n)
77 self.assertEqual(ar.get(), [5]*n)
75 d = ar.get_dict()
78 d = ar.get_dict()
76 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
79 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
77 for eid,r in d.iteritems():
80 for eid,r in d.iteritems():
78 self.assertEqual(r, 5)
81 self.assertEqual(r, 5)
79
82
83 def test_get_dict_single(self):
84 view = self.client[-1]
85 for v in (range(5), 5, ('abc', 'def'), 'string'):
86 ar = view.apply_async(echo, v)
87 self.assertEqual(ar.get(), v)
88 d = ar.get_dict()
89 self.assertEqual(d, {view.targets : v})
90
91 def test_get_dict_bad(self):
92 ar = self.client[:].apply_async(lambda : 5)
93 ar2 = self.client[:].apply_async(lambda : 5)
94 ar = self.client.get_result(ar.msg_ids + ar2.msg_ids)
95 self.assertRaises(ValueError, ar.get_dict)
96
80 def test_list_amr(self):
97 def test_list_amr(self):
81 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
98 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
82 rlist = list(ar)
99 rlist = list(ar)
83
100
84 def test_getattr(self):
101 def test_getattr(self):
85 ar = self.client[:].apply_async(wait, 0.5)
102 ar = self.client[:].apply_async(wait, 0.5)
86 self.assertEqual(ar.engine_id, [None] * len(ar))
103 self.assertEqual(ar.engine_id, [None] * len(ar))
87 self.assertRaises(AttributeError, lambda : ar._foo)
104 self.assertRaises(AttributeError, lambda : ar._foo)
88 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
105 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
89 self.assertRaises(AttributeError, lambda : ar.foo)
106 self.assertRaises(AttributeError, lambda : ar.foo)
90 self.assertFalse(hasattr(ar, '__length_hint__'))
107 self.assertFalse(hasattr(ar, '__length_hint__'))
91 self.assertFalse(hasattr(ar, 'foo'))
108 self.assertFalse(hasattr(ar, 'foo'))
92 self.assertTrue(hasattr(ar, 'engine_id'))
109 self.assertTrue(hasattr(ar, 'engine_id'))
93 ar.get(5)
110 ar.get(5)
94 self.assertRaises(AttributeError, lambda : ar._foo)
111 self.assertRaises(AttributeError, lambda : ar._foo)
95 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
112 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
96 self.assertRaises(AttributeError, lambda : ar.foo)
113 self.assertRaises(AttributeError, lambda : ar.foo)
97 self.assertTrue(isinstance(ar.engine_id, list))
114 self.assertTrue(isinstance(ar.engine_id, list))
98 self.assertEqual(ar.engine_id, ar['engine_id'])
115 self.assertEqual(ar.engine_id, ar['engine_id'])
99 self.assertFalse(hasattr(ar, '__length_hint__'))
116 self.assertFalse(hasattr(ar, '__length_hint__'))
100 self.assertFalse(hasattr(ar, 'foo'))
117 self.assertFalse(hasattr(ar, 'foo'))
101 self.assertTrue(hasattr(ar, 'engine_id'))
118 self.assertTrue(hasattr(ar, 'engine_id'))
102
119
103 def test_getitem(self):
120 def test_getitem(self):
104 ar = self.client[:].apply_async(wait, 0.5)
121 ar = self.client[:].apply_async(wait, 0.5)
105 self.assertEqual(ar['engine_id'], [None] * len(ar))
122 self.assertEqual(ar['engine_id'], [None] * len(ar))
106 self.assertRaises(KeyError, lambda : ar['foo'])
123 self.assertRaises(KeyError, lambda : ar['foo'])
107 ar.get(5)
124 ar.get(5)
108 self.assertRaises(KeyError, lambda : ar['foo'])
125 self.assertRaises(KeyError, lambda : ar['foo'])
109 self.assertTrue(isinstance(ar['engine_id'], list))
126 self.assertTrue(isinstance(ar['engine_id'], list))
110 self.assertEqual(ar.engine_id, ar['engine_id'])
127 self.assertEqual(ar.engine_id, ar['engine_id'])
111
128
112 def test_single_result(self):
129 def test_single_result(self):
113 ar = self.client[-1].apply_async(wait, 0.5)
130 ar = self.client[-1].apply_async(wait, 0.5)
114 self.assertRaises(KeyError, lambda : ar['foo'])
131 self.assertRaises(KeyError, lambda : ar['foo'])
115 self.assertEqual(ar['engine_id'], None)
132 self.assertEqual(ar['engine_id'], None)
116 self.assertTrue(ar.get(5) == 0.5)
133 self.assertTrue(ar.get(5) == 0.5)
117 self.assertTrue(isinstance(ar['engine_id'], int))
134 self.assertTrue(isinstance(ar['engine_id'], int))
118 self.assertTrue(isinstance(ar.engine_id, int))
135 self.assertTrue(isinstance(ar.engine_id, int))
119 self.assertEqual(ar.engine_id, ar['engine_id'])
136 self.assertEqual(ar.engine_id, ar['engine_id'])
120
137
121 def test_abort(self):
138 def test_abort(self):
122 e = self.client[-1]
139 e = self.client[-1]
123 ar = e.execute('import time; time.sleep(1)', block=False)
140 ar = e.execute('import time; time.sleep(1)', block=False)
124 ar2 = e.apply_async(lambda : 2)
141 ar2 = e.apply_async(lambda : 2)
125 ar2.abort()
142 ar2.abort()
126 self.assertRaises(error.TaskAborted, ar2.get)
143 self.assertRaises(error.TaskAborted, ar2.get)
127 ar.get()
144 ar.get()
128
145
129 def test_len(self):
146 def test_len(self):
130 v = self.client.load_balanced_view()
147 v = self.client.load_balanced_view()
131 ar = v.map_async(lambda x: x, range(10))
148 ar = v.map_async(lambda x: x, range(10))
132 self.assertEqual(len(ar), 10)
149 self.assertEqual(len(ar), 10)
133 ar = v.apply_async(lambda x: x, range(10))
150 ar = v.apply_async(lambda x: x, range(10))
134 self.assertEqual(len(ar), 1)
151 self.assertEqual(len(ar), 1)
135 ar = self.client[:].apply_async(lambda x: x, range(10))
152 ar = self.client[:].apply_async(lambda x: x, range(10))
136 self.assertEqual(len(ar), len(self.client.ids))
153 self.assertEqual(len(ar), len(self.client.ids))
137
154
138 def test_wall_time_single(self):
155 def test_wall_time_single(self):
139 v = self.client.load_balanced_view()
156 v = self.client.load_balanced_view()
140 ar = v.apply_async(time.sleep, 0.25)
157 ar = v.apply_async(time.sleep, 0.25)
141 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
158 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
142 ar.get(2)
159 ar.get(2)
143 self.assertTrue(ar.wall_time < 1.)
160 self.assertTrue(ar.wall_time < 1.)
144 self.assertTrue(ar.wall_time > 0.2)
161 self.assertTrue(ar.wall_time > 0.2)
145
162
146 def test_wall_time_multi(self):
163 def test_wall_time_multi(self):
147 self.minimum_engines(4)
164 self.minimum_engines(4)
148 v = self.client[:]
165 v = self.client[:]
149 ar = v.apply_async(time.sleep, 0.25)
166 ar = v.apply_async(time.sleep, 0.25)
150 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
167 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
151 ar.get(2)
168 ar.get(2)
152 self.assertTrue(ar.wall_time < 1.)
169 self.assertTrue(ar.wall_time < 1.)
153 self.assertTrue(ar.wall_time > 0.2)
170 self.assertTrue(ar.wall_time > 0.2)
154
171
155 def test_serial_time_single(self):
172 def test_serial_time_single(self):
156 v = self.client.load_balanced_view()
173 v = self.client.load_balanced_view()
157 ar = v.apply_async(time.sleep, 0.25)
174 ar = v.apply_async(time.sleep, 0.25)
158 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
175 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
159 ar.get(2)
176 ar.get(2)
160 self.assertTrue(ar.serial_time < 1.)
177 self.assertTrue(ar.serial_time < 1.)
161 self.assertTrue(ar.serial_time > 0.2)
178 self.assertTrue(ar.serial_time > 0.2)
162
179
163 def test_serial_time_multi(self):
180 def test_serial_time_multi(self):
164 self.minimum_engines(4)
181 self.minimum_engines(4)
165 v = self.client[:]
182 v = self.client[:]
166 ar = v.apply_async(time.sleep, 0.25)
183 ar = v.apply_async(time.sleep, 0.25)
167 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
184 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
168 ar.get(2)
185 ar.get(2)
169 self.assertTrue(ar.serial_time < 2.)
186 self.assertTrue(ar.serial_time < 2.)
170 self.assertTrue(ar.serial_time > 0.8)
187 self.assertTrue(ar.serial_time > 0.8)
171
188
172 def test_elapsed_single(self):
189 def test_elapsed_single(self):
173 v = self.client.load_balanced_view()
190 v = self.client.load_balanced_view()
174 ar = v.apply_async(time.sleep, 0.25)
191 ar = v.apply_async(time.sleep, 0.25)
175 while not ar.ready():
192 while not ar.ready():
176 time.sleep(0.01)
193 time.sleep(0.01)
177 self.assertTrue(ar.elapsed < 1)
194 self.assertTrue(ar.elapsed < 1)
178 self.assertTrue(ar.elapsed < 1)
195 self.assertTrue(ar.elapsed < 1)
179 ar.get(2)
196 ar.get(2)
180
197
181 def test_elapsed_multi(self):
198 def test_elapsed_multi(self):
182 v = self.client[:]
199 v = self.client[:]
183 ar = v.apply_async(time.sleep, 0.25)
200 ar = v.apply_async(time.sleep, 0.25)
184 while not ar.ready():
201 while not ar.ready():
185 time.sleep(0.01)
202 time.sleep(0.01)
186 self.assertTrue(ar.elapsed < 1)
203 self.assertTrue(ar.elapsed < 1)
187 self.assertTrue(ar.elapsed < 1)
204 self.assertTrue(ar.elapsed < 1)
188 ar.get(2)
205 ar.get(2)
189
206
190 def test_hubresult_timestamps(self):
207 def test_hubresult_timestamps(self):
191 self.minimum_engines(4)
208 self.minimum_engines(4)
192 v = self.client[:]
209 v = self.client[:]
193 ar = v.apply_async(time.sleep, 0.25)
210 ar = v.apply_async(time.sleep, 0.25)
194 ar.get(2)
211 ar.get(2)
195 rc2 = Client(profile='iptest')
212 rc2 = Client(profile='iptest')
196 # must have try/finally to close second Client, otherwise
213 # must have try/finally to close second Client, otherwise
197 # will have dangling sockets causing problems
214 # will have dangling sockets causing problems
198 try:
215 try:
199 time.sleep(0.25)
216 time.sleep(0.25)
200 hr = rc2.get_result(ar.msg_ids)
217 hr = rc2.get_result(ar.msg_ids)
201 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
218 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
202 hr.get(1)
219 hr.get(1)
203 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
220 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
204 self.assertEqual(hr.serial_time, ar.serial_time)
221 self.assertEqual(hr.serial_time, ar.serial_time)
205 finally:
222 finally:
206 rc2.close()
223 rc2.close()
207
224
208 def test_display_empty_streams_single(self):
225 def test_display_empty_streams_single(self):
209 """empty stdout/err are not displayed (single result)"""
226 """empty stdout/err are not displayed (single result)"""
210 self.minimum_engines(1)
227 self.minimum_engines(1)
211
228
212 v = self.client[-1]
229 v = self.client[-1]
213 ar = v.execute("print (5555)")
230 ar = v.execute("print (5555)")
214 ar.get(5)
231 ar.get(5)
215 with capture_output() as io:
232 with capture_output() as io:
216 ar.display_outputs()
233 ar.display_outputs()
217 self.assertEqual(io.stderr, '')
234 self.assertEqual(io.stderr, '')
218 self.assertEqual('5555\n', io.stdout)
235 self.assertEqual('5555\n', io.stdout)
219
236
220 ar = v.execute("a=5")
237 ar = v.execute("a=5")
221 ar.get(5)
238 ar.get(5)
222 with capture_output() as io:
239 with capture_output() as io:
223 ar.display_outputs()
240 ar.display_outputs()
224 self.assertEqual(io.stderr, '')
241 self.assertEqual(io.stderr, '')
225 self.assertEqual(io.stdout, '')
242 self.assertEqual(io.stdout, '')
226
243
227 def test_display_empty_streams_type(self):
244 def test_display_empty_streams_type(self):
228 """empty stdout/err are not displayed (groupby type)"""
245 """empty stdout/err are not displayed (groupby type)"""
229 self.minimum_engines(1)
246 self.minimum_engines(1)
230
247
231 v = self.client[:]
248 v = self.client[:]
232 ar = v.execute("print (5555)")
249 ar = v.execute("print (5555)")
233 ar.get(5)
250 ar.get(5)
234 with capture_output() as io:
251 with capture_output() as io:
235 ar.display_outputs()
252 ar.display_outputs()
236 self.assertEqual(io.stderr, '')
253 self.assertEqual(io.stderr, '')
237 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
254 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
238 self.assertFalse('\n\n' in io.stdout, io.stdout)
255 self.assertFalse('\n\n' in io.stdout, io.stdout)
239 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
256 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
240
257
241 ar = v.execute("a=5")
258 ar = v.execute("a=5")
242 ar.get(5)
259 ar.get(5)
243 with capture_output() as io:
260 with capture_output() as io:
244 ar.display_outputs()
261 ar.display_outputs()
245 self.assertEqual(io.stderr, '')
262 self.assertEqual(io.stderr, '')
246 self.assertEqual(io.stdout, '')
263 self.assertEqual(io.stdout, '')
247
264
248 def test_display_empty_streams_engine(self):
265 def test_display_empty_streams_engine(self):
249 """empty stdout/err are not displayed (groupby engine)"""
266 """empty stdout/err are not displayed (groupby engine)"""
250 self.minimum_engines(1)
267 self.minimum_engines(1)
251
268
252 v = self.client[:]
269 v = self.client[:]
253 ar = v.execute("print (5555)")
270 ar = v.execute("print (5555)")
254 ar.get(5)
271 ar.get(5)
255 with capture_output() as io:
272 with capture_output() as io:
256 ar.display_outputs('engine')
273 ar.display_outputs('engine')
257 self.assertEqual(io.stderr, '')
274 self.assertEqual(io.stderr, '')
258 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
275 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
259 self.assertFalse('\n\n' in io.stdout, io.stdout)
276 self.assertFalse('\n\n' in io.stdout, io.stdout)
260 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
277 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
261
278
262 ar = v.execute("a=5")
279 ar = v.execute("a=5")
263 ar.get(5)
280 ar.get(5)
264 with capture_output() as io:
281 with capture_output() as io:
265 ar.display_outputs('engine')
282 ar.display_outputs('engine')
266 self.assertEqual(io.stderr, '')
283 self.assertEqual(io.stderr, '')
267 self.assertEqual(io.stdout, '')
284 self.assertEqual(io.stdout, '')
268
285
269 def test_await_data(self):
286 def test_await_data(self):
270 """asking for ar.data flushes outputs"""
287 """asking for ar.data flushes outputs"""
271 self.minimum_engines(1)
288 self.minimum_engines(1)
272
289
273 v = self.client[-1]
290 v = self.client[-1]
274 ar = v.execute('\n'.join([
291 ar = v.execute('\n'.join([
275 "import time",
292 "import time",
276 "from IPython.kernel.zmq.datapub import publish_data",
293 "from IPython.kernel.zmq.datapub import publish_data",
277 "for i in range(5):",
294 "for i in range(5):",
278 " publish_data(dict(i=i))",
295 " publish_data(dict(i=i))",
279 " time.sleep(0.1)",
296 " time.sleep(0.1)",
280 ]), block=False)
297 ]), block=False)
281 found = set()
298 found = set()
282 tic = time.time()
299 tic = time.time()
283 # timeout after 10s
300 # timeout after 10s
284 while time.time() <= tic + 10:
301 while time.time() <= tic + 10:
285 if ar.data:
302 if ar.data:
286 found.add(ar.data['i'])
303 found.add(ar.data['i'])
287 if ar.data['i'] == 4:
304 if ar.data['i'] == 4:
288 break
305 break
289 time.sleep(0.05)
306 time.sleep(0.05)
290
307
291 ar.get(5)
308 ar.get(5)
292 nt.assert_in(4, found)
309 nt.assert_in(4, found)
293 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
310 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
294
311
General Comments 0
You need to be logged in to leave comments. Login now