Show More
@@ -280,6 +280,25 b' class TestClient(ClusterTestCase):' | |||
|
280 | 280 | time.sleep(0.25) |
|
281 | 281 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) |
|
282 | 282 | |
|
283 | def _wait_for_idle(self): | |
|
284 | """wait for an engine to become idle, according to the Hub""" | |
|
285 | rc = self.client | |
|
286 | ||
|
287 | # timeout 2s, polling every 100ms | |
|
288 | for i in range(20): | |
|
289 | qs = rc.queue_status() | |
|
290 | if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids): | |
|
291 | time.sleep(0.1) | |
|
292 | else: | |
|
293 | break | |
|
294 | ||
|
295 | # ensure Hub up to date: | |
|
296 | qs = rc.queue_status() | |
|
297 | self.assertEquals(qs['unassigned'], 0) | |
|
298 | for eid in rc.ids: | |
|
299 | self.assertEquals(qs[eid]['tasks'], 0) | |
|
300 | ||
|
301 | ||
|
283 | 302 | def test_resubmit(self): |
|
284 | 303 | def f(): |
|
285 | 304 | import random |
@@ -288,11 +307,29 b' class TestClient(ClusterTestCase):' | |||
|
288 | 307 | ar = v.apply_async(f) |
|
289 | 308 | r1 = ar.get(1) |
|
290 | 309 | # give the Hub a chance to notice: |
|
291 | time.sleep(0.5) | |
|
310 | self._wait_for_idle() | |
|
292 | 311 | ahr = self.client.resubmit(ar.msg_ids) |
|
293 | 312 | r2 = ahr.get(1) |
|
294 | 313 | self.assertFalse(r1 == r2) |
|
295 | 314 | |
|
315 | def test_resubmit_aborted(self): | |
|
316 | def f(): | |
|
317 | import random | |
|
318 | return random.random() | |
|
319 | v = self.client.load_balanced_view() | |
|
320 | # restrict to one engine, so we can put a sleep | |
|
321 | # ahead of the task, so it will get aborted | |
|
322 | eid = self.client.ids[-1] | |
|
323 | v.targets = [eid] | |
|
324 | sleep = v.apply_async(time.sleep, 0.5) | |
|
325 | ar = v.apply_async(f) | |
|
326 | ar.abort() | |
|
327 | self.assertRaises(error.TaskAborted, ar.get) | |
|
328 | # Give the Hub a chance to get up to date: | |
|
329 | self._wait_for_idle() | |
|
330 | ahr = self.client.resubmit(ar.msg_ids) | |
|
331 | r2 = ahr.get(1) | |
|
332 | ||
|
296 | 333 | def test_resubmit_inflight(self): |
|
297 | 334 | """ensure ValueError on resubmit of inflight task""" |
|
298 | 335 | v = self.client.load_balanced_view() |
General Comments 0
You need to be logged in to leave comments.
Login now