##// END OF EJS Templates
test resubmit of aborted tasks (#1647)
MinRK -
Show More
@@ -280,6 +280,25 b' class TestClient(ClusterTestCase):'
280 280 time.sleep(0.25)
281 281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282 282
283 def _wait_for_idle(self):
284 """wait for an engine to become idle, according to the Hub"""
285 rc = self.client
286
287 # timeout 2s, polling every 100ms
288 for i in range(20):
289 qs = rc.queue_status()
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 time.sleep(0.1)
292 else:
293 break
294
295 # ensure Hub up to date:
296 qs = rc.queue_status()
297 self.assertEquals(qs['unassigned'], 0)
298 for eid in rc.ids:
299 self.assertEquals(qs[eid]['tasks'], 0)
300
301
283 302 def test_resubmit(self):
284 303 def f():
285 304 import random
@@ -288,11 +307,29 b' class TestClient(ClusterTestCase):'
288 307 ar = v.apply_async(f)
289 308 r1 = ar.get(1)
290 309 # give the Hub a chance to notice:
291 time.sleep(0.5)
310 self._wait_for_idle()
292 311 ahr = self.client.resubmit(ar.msg_ids)
293 312 r2 = ahr.get(1)
294 313 self.assertFalse(r1 == r2)
295 314
315 def test_resubmit_aborted(self):
316 def f():
317 import random
318 return random.random()
319 v = self.client.load_balanced_view()
320 # restrict to one engine, so we can put a sleep
321 # ahead of the task, so it will get aborted
322 eid = self.client.ids[-1]
323 v.targets = [eid]
324 sleep = v.apply_async(time.sleep, 0.5)
325 ar = v.apply_async(f)
326 ar.abort()
327 self.assertRaises(error.TaskAborted, ar.get)
328 # Give the Hub a chance to get up to date:
329 self._wait_for_idle()
330 ahr = self.client.resubmit(ar.msg_ids)
331 r2 = ahr.get(1)
332
296 333 def test_resubmit_inflight(self):
297 334 """ensure ValueError on resubmit of inflight task"""
298 335 v = self.client.load_balanced_view()
General Comments 0
You need to be logged in to leave comments. Login now