##// END OF EJS Templates
AsyncResult.__getattr__ shouldn't raise TimeoutError...
MinRK -
Show More
@@ -1,395 +1,396
1 1 """AsyncResult objects for the client
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import time
19 19
20 20 from zmq import MessageTracker
21 21
22 22 from IPython.external.decorator import decorator
23 23 from IPython.parallel import error
24 24
25 25 #-----------------------------------------------------------------------------
26 26 # Classes
27 27 #-----------------------------------------------------------------------------
28 28
29 29 # global empty tracker that's always done:
30 30 finished_tracker = MessageTracker()
31 31
32 32 @decorator
33 33 def check_ready(f, self, *args, **kwargs):
34 34 """Call spin() to sync state prior to calling the method."""
35 35 self.wait(0)
36 36 if not self._ready:
37 37 raise error.TimeoutError("result not ready")
38 38 return f(self, *args, **kwargs)
39 39
40 40 class AsyncResult(object):
41 41 """Class for representing results of non-blocking calls.
42 42
43 43 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
44 44 """
45 45
46 46 msg_ids = None
47 47 _targets = None
48 48 _tracker = None
49 49 _single_result = False
50 50
51 51 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
52 52 if isinstance(msg_ids, basestring):
53 53 # always a list
54 54 msg_ids = [msg_ids]
55 55 if tracker is None:
56 56 # default to always done
57 57 tracker = finished_tracker
58 58 self._client = client
59 59 self.msg_ids = msg_ids
60 60 self._fname=fname
61 61 self._targets = targets
62 62 self._tracker = tracker
63 63 self._ready = False
64 64 self._success = None
65 self._metadata = None
65 66 if len(msg_ids) == 1:
66 67 self._single_result = not isinstance(targets, (list, tuple))
67 68 else:
68 69 self._single_result = False
69 70
70 71 def __repr__(self):
71 72 if self._ready:
72 73 return "<%s: finished>"%(self.__class__.__name__)
73 74 else:
74 75 return "<%s: %s>"%(self.__class__.__name__,self._fname)
75 76
76 77
77 78 def _reconstruct_result(self, res):
78 79 """Reconstruct our result from actual result list (always a list)
79 80
80 81 Override me in subclasses for turning a list of results
81 82 into the expected form.
82 83 """
83 84 if self._single_result:
84 85 return res[0]
85 86 else:
86 87 return res
87 88
88 89 def get(self, timeout=-1):
89 90 """Return the result when it arrives.
90 91
91 92 If `timeout` is not ``None`` and the result does not arrive within
92 93 `timeout` seconds then ``TimeoutError`` is raised. If the
93 94 remote call raised an exception then that exception will be reraised
94 95 by get() inside a `RemoteError`.
95 96 """
96 97 if not self.ready():
97 98 self.wait(timeout)
98 99
99 100 if self._ready:
100 101 if self._success:
101 102 return self._result
102 103 else:
103 104 raise self._exception
104 105 else:
105 106 raise error.TimeoutError("Result not ready.")
106 107
107 108 def ready(self):
108 109 """Return whether the call has completed."""
109 110 if not self._ready:
110 111 self.wait(0)
111 112 return self._ready
112 113
113 114 def wait(self, timeout=-1):
114 115 """Wait until the result is available or until `timeout` seconds pass.
115 116
116 117 This method always returns None.
117 118 """
118 119 if self._ready:
119 120 return
120 121 self._ready = self._client.wait(self.msg_ids, timeout)
121 122 if self._ready:
122 123 try:
123 124 results = map(self._client.results.get, self.msg_ids)
124 125 self._result = results
125 126 if self._single_result:
126 127 r = results[0]
127 128 if isinstance(r, Exception):
128 129 raise r
129 130 else:
130 131 results = error.collect_exceptions(results, self._fname)
131 132 self._result = self._reconstruct_result(results)
132 133 except Exception, e:
133 134 self._exception = e
134 135 self._success = False
135 136 else:
136 137 self._success = True
137 138 finally:
138 139 self._metadata = map(self._client.metadata.get, self.msg_ids)
139 140
140 141
141 142 def successful(self):
142 143 """Return whether the call completed without raising an exception.
143 144
144 145 Will raise ``AssertionError`` if the result is not ready.
145 146 """
146 147 assert self.ready()
147 148 return self._success
148 149
149 150 #----------------------------------------------------------------
150 151 # Extra methods not in mp.pool.AsyncResult
151 152 #----------------------------------------------------------------
152 153
153 154 def get_dict(self, timeout=-1):
154 155 """Get the results as a dict, keyed by engine_id.
155 156
156 157 timeout behavior is described in `get()`.
157 158 """
158 159
159 160 results = self.get(timeout)
160 161 engine_ids = [ md['engine_id'] for md in self._metadata ]
161 162 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
162 163 maxcount = bycount.count(bycount[-1])
163 164 if maxcount > 1:
164 165 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
165 166 maxcount, bycount[-1]))
166 167
167 168 return dict(zip(engine_ids,results))
168 169
169 170 @property
170 171 def result(self):
171 172 """result property wrapper for `get(timeout=0)`."""
172 173 return self.get()
173 174
174 175 # abbreviated alias:
175 176 r = result
176 177
177 178 @property
178 179 @check_ready
179 180 def metadata(self):
180 181 """property for accessing execution metadata."""
181 182 if self._single_result:
182 183 return self._metadata[0]
183 184 else:
184 185 return self._metadata
185 186
186 187 @property
187 188 def result_dict(self):
188 189 """result property as a dict."""
189 190 return self.get_dict()
190 191
191 192 def __dict__(self):
192 193 return self.get_dict(0)
193 194
194 195 def abort(self):
195 196 """abort my tasks."""
196 197 assert not self.ready(), "Can't abort, I am already done!"
197 198 return self.client.abort(self.msg_ids, targets=self._targets, block=True)
198 199
199 200 @property
200 201 def sent(self):
201 202 """check whether my messages have been sent."""
202 203 return self._tracker.done
203 204
204 205 def wait_for_send(self, timeout=-1):
205 206 """wait for pyzmq send to complete.
206 207
207 208 This is necessary when sending arrays that you intend to edit in-place.
208 209 `timeout` is in seconds, and will raise TimeoutError if it is reached
209 210 before the send completes.
210 211 """
211 212 return self._tracker.wait(timeout)
212 213
213 214 #-------------------------------------
214 215 # dict-access
215 216 #-------------------------------------
216 217
217 218 @check_ready
218 219 def __getitem__(self, key):
219 220 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
220 221 """
221 222 if isinstance(key, int):
222 223 return error.collect_exceptions([self._result[key]], self._fname)[0]
223 224 elif isinstance(key, slice):
224 225 return error.collect_exceptions(self._result[key], self._fname)
225 226 elif isinstance(key, basestring):
226 227 values = [ md[key] for md in self._metadata ]
227 228 if self._single_result:
228 229 return values[0]
229 230 else:
230 231 return values
231 232 else:
232 233 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
233 234
234 @check_ready
235 235 def __getattr__(self, key):
236 236 """getattr maps to getitem for convenient attr access to metadata."""
237 if key not in self._metadata[0].keys():
237 try:
238 return self.__getitem__(key)
239 except (error.TimeoutError, KeyError):
238 240 raise AttributeError("%r object has no attribute %r"%(
239 241 self.__class__.__name__, key))
240 return self.__getitem__(key)
241 242
242 243 # asynchronous iterator:
243 244 def __iter__(self):
244 245 if self._single_result:
245 246 raise TypeError("AsyncResults with a single result are not iterable.")
246 247 try:
247 248 rlist = self.get(0)
248 249 except error.TimeoutError:
249 250 # wait for each result individually
250 251 for msg_id in self.msg_ids:
251 252 ar = AsyncResult(self._client, msg_id, self._fname)
252 253 yield ar.get()
253 254 else:
254 255 # already done
255 256 for r in rlist:
256 257 yield r
257 258
258 259
259 260
260 261 class AsyncMapResult(AsyncResult):
261 262 """Class for representing results of non-blocking gathers.
262 263
263 264 This will properly reconstruct the gather.
264 265
265 266 This class is iterable at any time, and will wait on results as they come.
266 267
267 268 If ordered=False, then the first results to arrive will come first, otherwise
268 269 results will be yielded in the order they were submitted.
269 270
270 271 """
271 272
272 273 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
273 274 AsyncResult.__init__(self, client, msg_ids, fname=fname)
274 275 self._mapObject = mapObject
275 276 self._single_result = False
276 277 self.ordered = ordered
277 278
278 279 def _reconstruct_result(self, res):
279 280 """Perform the gather on the actual results."""
280 281 return self._mapObject.joinPartitions(res)
281 282
282 283 # asynchronous iterator:
283 284 def __iter__(self):
284 285 it = self._ordered_iter if self.ordered else self._unordered_iter
285 286 for r in it():
286 287 yield r
287 288
288 289 # asynchronous ordered iterator:
289 290 def _ordered_iter(self):
290 291 """iterator for results *as they arrive*, preserving submission order."""
291 292 try:
292 293 rlist = self.get(0)
293 294 except error.TimeoutError:
294 295 # wait for each result individually
295 296 for msg_id in self.msg_ids:
296 297 ar = AsyncResult(self._client, msg_id, self._fname)
297 298 rlist = ar.get()
298 299 try:
299 300 for r in rlist:
300 301 yield r
301 302 except TypeError:
302 303 # flattened, not a list
303 304 # this could get broken by flattened data that returns iterables
304 305 # but most calls to map do not expose the `flatten` argument
305 306 yield rlist
306 307 else:
307 308 # already done
308 309 for r in rlist:
309 310 yield r
310 311
311 312 # asynchronous unordered iterator:
312 313 def _unordered_iter(self):
313 314 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
314 315 try:
315 316 rlist = self.get(0)
316 317 except error.TimeoutError:
317 318 pending = set(self.msg_ids)
318 319 while pending:
319 320 try:
320 321 self._client.wait(pending, 1e-3)
321 322 except error.TimeoutError:
322 323 # ignore timeout error, because that only means
323 324 # *some* jobs are outstanding
324 325 pass
325 326 # update ready set with those no longer outstanding:
326 327 ready = pending.difference(self._client.outstanding)
327 328 # update pending to exclude those that are finished
328 329 pending = pending.difference(ready)
329 330 while ready:
330 331 msg_id = ready.pop()
331 332 ar = AsyncResult(self._client, msg_id, self._fname)
332 333 rlist = ar.get()
333 334 try:
334 335 for r in rlist:
335 336 yield r
336 337 except TypeError:
337 338 # flattened, not a list
338 339 # this could get broken by flattened data that returns iterables
339 340 # but most calls to map do not expose the `flatten` argument
340 341 yield rlist
341 342 else:
342 343 # already done
343 344 for r in rlist:
344 345 yield r
345 346
346 347
347 348
348 349 class AsyncHubResult(AsyncResult):
349 350 """Class to wrap pending results that must be requested from the Hub.
350 351
351 352 Note that waiting/polling on these objects requires polling the Hubover the network,
352 353 so use `AsyncHubResult.wait()` sparingly.
353 354 """
354 355
355 356 def wait(self, timeout=-1):
356 357 """wait for result to complete."""
357 358 start = time.time()
358 359 if self._ready:
359 360 return
360 361 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
361 362 local_ready = self._client.wait(local_ids, timeout)
362 363 if local_ready:
363 364 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
364 365 if not remote_ids:
365 366 self._ready = True
366 367 else:
367 368 rdict = self._client.result_status(remote_ids, status_only=False)
368 369 pending = rdict['pending']
369 370 while pending and (timeout < 0 or time.time() < start+timeout):
370 371 rdict = self._client.result_status(remote_ids, status_only=False)
371 372 pending = rdict['pending']
372 373 if pending:
373 374 time.sleep(0.1)
374 375 if not pending:
375 376 self._ready = True
376 377 if self._ready:
377 378 try:
378 379 results = map(self._client.results.get, self.msg_ids)
379 380 self._result = results
380 381 if self._single_result:
381 382 r = results[0]
382 383 if isinstance(r, Exception):
383 384 raise r
384 385 else:
385 386 results = error.collect_exceptions(results, self._fname)
386 387 self._result = self._reconstruct_result(results)
387 388 except Exception, e:
388 389 self._exception = e
389 390 self._success = False
390 391 else:
391 392 self._success = True
392 393 finally:
393 394 self._metadata = map(self._client.metadata.get, self.msg_ids)
394 395
395 396 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
@@ -1,73 +1,115
1 1 """Tests for asyncresult.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19
20 20 from IPython.parallel.error import TimeoutError
21 21
22 22 from IPython.parallel.tests import add_engines
23 23 from .clienttest import ClusterTestCase
24 24
25 25 def setup():
26 26 add_engines(2)
27 27
28 28 def wait(n):
29 29 import time
30 30 time.sleep(n)
31 31 return n
32 32
33 33 class AsyncResultTest(ClusterTestCase):
34 34
35 35 def test_single_result(self):
36 36 eid = self.client.ids[-1]
37 37 ar = self.client[eid].apply_async(lambda : 42)
38 38 self.assertEquals(ar.get(), 42)
39 39 ar = self.client[[eid]].apply_async(lambda : 42)
40 40 self.assertEquals(ar.get(), [42])
41 41 ar = self.client[-1:].apply_async(lambda : 42)
42 42 self.assertEquals(ar.get(), [42])
43 43
44 44 def test_get_after_done(self):
45 45 ar = self.client[-1].apply_async(lambda : 42)
46 46 ar.wait()
47 47 self.assertTrue(ar.ready())
48 48 self.assertEquals(ar.get(), 42)
49 49 self.assertEquals(ar.get(), 42)
50 50
51 51 def test_get_before_done(self):
52 52 ar = self.client[-1].apply_async(wait, 0.1)
53 53 self.assertRaises(TimeoutError, ar.get, 0)
54 54 ar.wait(0)
55 55 self.assertFalse(ar.ready())
56 56 self.assertEquals(ar.get(), 0.1)
57 57
58 58 def test_get_after_error(self):
59 59 ar = self.client[-1].apply_async(lambda : 1/0)
60 60 ar.wait(10)
61 61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
63 63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
64 64
65 65 def test_get_dict(self):
66 66 n = len(self.client)
67 67 ar = self.client[:].apply_async(lambda : 5)
68 68 self.assertEquals(ar.get(), [5]*n)
69 69 d = ar.get_dict()
70 70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
71 71 for eid,r in d.iteritems():
72 72 self.assertEquals(r, 5)
73
74 def test_list_amr(self):
75 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
76 rlist = list(ar)
77
78 def test_getattr(self):
79 ar = self.client[:].apply_async(wait, 0.5)
80 self.assertRaises(AttributeError, lambda : ar._foo)
81 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
82 self.assertRaises(AttributeError, lambda : ar.foo)
83 self.assertRaises(AttributeError, lambda : ar.engine_id)
84 self.assertFalse(hasattr(ar, '__length_hint__'))
85 self.assertFalse(hasattr(ar, 'foo'))
86 self.assertFalse(hasattr(ar, 'engine_id'))
87 ar.get(5)
88 self.assertRaises(AttributeError, lambda : ar._foo)
89 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
90 self.assertRaises(AttributeError, lambda : ar.foo)
91 self.assertTrue(isinstance(ar.engine_id, list))
92 self.assertEquals(ar.engine_id, ar['engine_id'])
93 self.assertFalse(hasattr(ar, '__length_hint__'))
94 self.assertFalse(hasattr(ar, 'foo'))
95 self.assertTrue(hasattr(ar, 'engine_id'))
96
97 def test_getitem(self):
98 ar = self.client[:].apply_async(wait, 0.5)
99 self.assertRaises(TimeoutError, lambda : ar['foo'])
100 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
101 ar.get(5)
102 self.assertRaises(KeyError, lambda : ar['foo'])
103 self.assertTrue(isinstance(ar['engine_id'], list))
104 self.assertEquals(ar.engine_id, ar['engine_id'])
105
106 def test_single_result(self):
107 ar = self.client[-1].apply_async(wait, 0.5)
108 self.assertRaises(TimeoutError, lambda : ar['foo'])
109 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
110 self.assertTrue(ar.get(5) == 0.5)
111 self.assertTrue(isinstance(ar['engine_id'], int))
112 self.assertTrue(isinstance(ar.engine_id, int))
113 self.assertEquals(ar.engine_id, ar['engine_id'])
114
73 115
@@ -1,165 +1,167
1 1 # -*- coding: utf-8 -*-
2 2 """test LoadBalancedView objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21
22 22 import zmq
23 23 from nose import SkipTest
24 24
25 25 from IPython import parallel as pmod
26 26 from IPython.parallel import error
27 27
28 28 from IPython.parallel.tests import add_engines
29 29
30 30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31 31
32 32 def setup():
33 33 add_engines(3)
34 34
35 35 class TestLoadBalancedView(ClusterTestCase):
36 36
37 37 def setUp(self):
38 38 ClusterTestCase.setUp(self)
39 39 self.view = self.client.load_balanced_view()
40 40
41 41 def test_z_crash_task(self):
42 42 """test graceful handling of engine death (balanced)"""
43 43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
44 44 # self.add_engines(1)
45 45 ar = self.view.apply_async(crash)
46 46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 47 eid = ar.engine_id
48 48 tic = time.time()
49 49 while eid in self.client.ids and time.time()-tic < 5:
50 50 time.sleep(.01)
51 51 self.client.spin()
52 52 self.assertFalse(eid in self.client.ids, "Engine should have died")
53 53
54 54 def test_map(self):
55 55 def f(x):
56 56 return x**2
57 57 data = range(16)
58 58 r = self.view.map_sync(f, data)
59 59 self.assertEquals(r, map(f, data))
60 60
61 61 def test_map_unordered(self):
62 62 def f(x):
63 63 return x**2
64 64 def slow_f(x):
65 65 import time
66 66 time.sleep(0.05*x)
67 67 return x**2
68 68 data = range(16,0,-1)
69 69 reference = map(f, data)
70 70
71 amr = self.view.map_async(f, data, ordered=False)
71 amr = self.view.map_async(slow_f, data, ordered=False)
72 72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 # check individual elements, retrieved as they come (uses __iter__)
74 astheycame = list(amr)
73 # check individual elements, retrieved as they come
74 # list comprehension uses __iter__
75 astheycame = [ r for r in amr ]
75 76 # Ensure that at least one result came out of order:
76 77 self.assertNotEquals(astheycame, reference, "should not have preserved order")
77 78 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
78 79
79 80 def test_map_ordered(self):
80 81 def f(x):
81 82 return x**2
82 83 def slow_f(x):
83 84 import time
84 85 time.sleep(0.05*x)
85 86 return x**2
86 87 data = range(16,0,-1)
87 88 reference = map(f, data)
88 89
89 amr = self.view.map_async(f, data)
90 amr = self.view.map_async(slow_f, data)
90 91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
91 # check individual elements, retrieved as they come (uses __iter__)
92 # check individual elements, retrieved as they come
93 # list(amr) uses __iter__
92 94 astheycame = list(amr)
93 95 # Ensure that results came in order
94 96 self.assertEquals(astheycame, reference)
95 97 self.assertEquals(amr.result, reference)
96 98
97 99 def test_abort(self):
98 100 view = self.view
99 101 ar = self.client[:].apply_async(time.sleep, .5)
100 102 ar = self.client[:].apply_async(time.sleep, .5)
101 103 time.sleep(0.2)
102 104 ar2 = view.apply_async(lambda : 2)
103 105 ar3 = view.apply_async(lambda : 3)
104 106 view.abort(ar2)
105 107 view.abort(ar3.msg_ids)
106 108 self.assertRaises(error.TaskAborted, ar2.get)
107 109 self.assertRaises(error.TaskAborted, ar3.get)
108 110
109 111 def test_retries(self):
110 112 add_engines(3)
111 113 view = self.view
112 114 view.timeout = 1 # prevent hang if this doesn't behave
113 115 def fail():
114 116 assert False
115 117 for r in range(len(self.client)-1):
116 118 with view.temp_flags(retries=r):
117 119 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
118 120
119 121 with view.temp_flags(retries=len(self.client), timeout=0.25):
120 122 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
121 123
122 124 def test_invalid_dependency(self):
123 125 view = self.view
124 126 with view.temp_flags(after='12345'):
125 127 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
126 128
127 129 def test_impossible_dependency(self):
128 130 if len(self.client) < 2:
129 131 add_engines(2)
130 132 view = self.client.load_balanced_view()
131 133 ar1 = view.apply_async(lambda : 1)
132 134 ar1.get()
133 135 e1 = ar1.engine_id
134 136 e2 = e1
135 137 while e2 == e1:
136 138 ar2 = view.apply_async(lambda : 1)
137 139 ar2.get()
138 140 e2 = ar2.engine_id
139 141
140 142 with view.temp_flags(follow=[ar1, ar2]):
141 143 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
142 144
143 145
144 146 def test_follow(self):
145 147 ar = self.view.apply_async(lambda : 1)
146 148 ar.get()
147 149 ars = []
148 150 first_id = ar.engine_id
149 151
150 152 self.view.follow = ar
151 153 for i in range(5):
152 154 ars.append(self.view.apply_async(lambda : 1))
153 155 self.view.wait(ars)
154 156 for ar in ars:
155 157 self.assertEquals(ar.engine_id, first_id)
156 158
157 159 def test_after(self):
158 160 view = self.view
159 161 ar = view.apply_async(time.sleep, 0.5)
160 162 with view.temp_flags(after=ar):
161 163 ar2 = view.apply_async(lambda : 1)
162 164
163 165 ar.wait()
164 166 ar2.wait()
165 167 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
General Comments 0
You need to be logged in to leave comments. Login now