Show More
@@ -280,6 +280,25 b' class TestClient(ClusterTestCase):' | |||||
280 | time.sleep(0.25) |
|
280 | time.sleep(0.25) | |
281 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) |
|
281 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) | |
282 |
|
282 | |||
|
283 | def _wait_for_idle(self): | |||
|
284 | """wait for an engine to become idle, according to the Hub""" | |||
|
285 | rc = self.client | |||
|
286 | ||||
|
287 | # timeout 2s, polling every 100ms | |||
|
288 | for i in range(20): | |||
|
289 | qs = rc.queue_status() | |||
|
290 | if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids): | |||
|
291 | time.sleep(0.1) | |||
|
292 | else: | |||
|
293 | break | |||
|
294 | ||||
|
295 | # ensure Hub up to date: | |||
|
296 | qs = rc.queue_status() | |||
|
297 | self.assertEquals(qs['unassigned'], 0) | |||
|
298 | for eid in rc.ids: | |||
|
299 | self.assertEquals(qs[eid]['tasks'], 0) | |||
|
300 | ||||
|
301 | ||||
283 | def test_resubmit(self): |
|
302 | def test_resubmit(self): | |
284 | def f(): |
|
303 | def f(): | |
285 | import random |
|
304 | import random | |
@@ -288,11 +307,29 b' class TestClient(ClusterTestCase):' | |||||
288 | ar = v.apply_async(f) |
|
307 | ar = v.apply_async(f) | |
289 | r1 = ar.get(1) |
|
308 | r1 = ar.get(1) | |
290 | # give the Hub a chance to notice: |
|
309 | # give the Hub a chance to notice: | |
291 | time.sleep(0.5) |
|
310 | self._wait_for_idle() | |
292 | ahr = self.client.resubmit(ar.msg_ids) |
|
311 | ahr = self.client.resubmit(ar.msg_ids) | |
293 | r2 = ahr.get(1) |
|
312 | r2 = ahr.get(1) | |
294 | self.assertFalse(r1 == r2) |
|
313 | self.assertFalse(r1 == r2) | |
295 |
|
314 | |||
|
315 | def test_resubmit_aborted(self): | |||
|
316 | def f(): | |||
|
317 | import random | |||
|
318 | return random.random() | |||
|
319 | v = self.client.load_balanced_view() | |||
|
320 | # restrict to one engine, so we can put a sleep | |||
|
321 | # ahead of the task, so it will get aborted | |||
|
322 | eid = self.client.ids[-1] | |||
|
323 | v.targets = [eid] | |||
|
324 | sleep = v.apply_async(time.sleep, 0.5) | |||
|
325 | ar = v.apply_async(f) | |||
|
326 | ar.abort() | |||
|
327 | self.assertRaises(error.TaskAborted, ar.get) | |||
|
328 | # Give the Hub a chance to get up to date: | |||
|
329 | self._wait_for_idle() | |||
|
330 | ahr = self.client.resubmit(ar.msg_ids) | |||
|
331 | r2 = ahr.get(1) | |||
|
332 | ||||
296 | def test_resubmit_inflight(self): |
|
333 | def test_resubmit_inflight(self): | |
297 | """ensure ValueError on resubmit of inflight task""" |
|
334 | """ensure ValueError on resubmit of inflight task""" | |
298 | v = self.client.load_balanced_view() |
|
335 | v = self.client.load_balanced_view() |
General Comments 0
You need to be logged in to leave comments.
Login now