##// END OF EJS Templates
use per-timeout callback, rather than audit for timeouts
MinRK -
Show More
@@ -217,8 +217,6 b' class TaskScheduler(SessionFactory):'
217 217 all_done = Set() # set of all finished tasks=union(completed,failed)
218 218 all_ids = Set() # set of all submitted task IDs
219 219
220 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
221
222 220 ident = CBytes() # ZMQ identity. This should just be self.session.session
223 221 # but ensure Bytes
224 222 def _ident_default(self):
@@ -236,9 +234,7 b' class TaskScheduler(SessionFactory):'
236 234 unregistration_notification = self._unregister_engine
237 235 )
238 236 self.notifier_stream.on_recv(self.dispatch_notification)
239 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
240 self.auditor.start()
241 self.log.info("Scheduler started [%s]"%self.scheme_name)
237 self.log.info("Scheduler started [%s]" % self.scheme_name)
242 238
243 239 def resume_receiving(self):
244 240 """Resume accepting jobs."""
@@ -438,15 +434,16 b' class TaskScheduler(SessionFactory):'
438 434 # turn timeouts into datetime objects:
439 435 timeout = md.get('timeout', None)
440 436 if timeout:
441 # cast to float, because jsonlib returns floats as decimal.Decimal,
442 # which timedelta does not accept
443 timeout = datetime.now() + timedelta(0,float(timeout),0)
437 timeout = time.time() + float(timeout)
444 438
445 439 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
446 440 header=header, targets=targets, after=after, follow=follow,
447 441 timeout=timeout, metadata=md,
448 442 )
449
443 if timeout:
444 # schedule timeout callback
445 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
446
450 447 # validate and reduce dependencies:
451 448 for dep in after,follow:
452 449 if not dep: # empty dependency
@@ -470,21 +467,25 b' class TaskScheduler(SessionFactory):'
470 467 else:
471 468 self.save_unmet(job)
472 469
473 def audit_timeouts(self):
474 """Audit all waiting tasks for expired timeouts."""
475 now = datetime.now()
476 for msg_id in self.queue_map.keys():
477 # must recheck, in case one failure cascaded to another:
478 if msg_id in self.queue_map:
479 job = self.queue_map[msg_id]
480 if job.timeout and job.timeout < now:
481 self.fail_unreachable(msg_id, error.TaskTimeout)
470 def job_timeout(self, job):
471 """callback for a job's timeout.
472
473 The job may or may not have been run at this point.
474 """
475 if job.timeout >= (time.time() + 1):
476 self.log.warn("task %s timeout fired prematurely: %s > %s",
477 job.msg_id, job.timeout, now
478 )
479 if job.msg_id in self.queue_map:
480 # still waiting, but ran out of time
481 self.log.info("task %r timed out", job.msg_id)
482 self.fail_unreachable(job.msg_id, error.TaskTimeout)
482 483
483 484 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
484 485 """a task has become unreachable, send a reply with an ImpossibleDependency
485 486 error."""
486 487 if msg_id not in self.queue_map:
487 self.log.error("msg %r already failed!", msg_id)
488 self.log.error("task %r already failed!", msg_id)
488 489 return
489 490 job = self.queue_map.pop(msg_id)
490 491 # lazy-delete from the queue
@@ -497,6 +498,7 b' class TaskScheduler(SessionFactory):'
497 498 raise why()
498 499 except:
499 500 content = error.wrap_exception()
501 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
500 502
501 503 self.all_done.add(msg_id)
502 504 self.all_failed.add(msg_id)
@@ -791,7 +793,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non'
791 793 identity=b'task', in_thread=False):
792 794
793 795 ZMQStream = zmqstream.ZMQStream
794
796 loglevel = logging.DEBUG
795 797 if config:
796 798 # unwrap dict back into Config
797 799 config = Config(config)
General Comments 0
You need to be logged in to leave comments. Login now