##// END OF EJS Templates
use per-timeout callback, rather than audit for timeouts
MinRK -
Show More
@@ -217,8 +217,6 b' class TaskScheduler(SessionFactory):'
217 all_done = Set() # set of all finished tasks=union(completed,failed)
217 all_done = Set() # set of all finished tasks=union(completed,failed)
218 all_ids = Set() # set of all submitted task IDs
218 all_ids = Set() # set of all submitted task IDs
219
219
220 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
221
222 ident = CBytes() # ZMQ identity. This should just be self.session.session
220 ident = CBytes() # ZMQ identity. This should just be self.session.session
223 # but ensure Bytes
221 # but ensure Bytes
224 def _ident_default(self):
222 def _ident_default(self):
@@ -236,9 +234,7 b' class TaskScheduler(SessionFactory):'
236 unregistration_notification = self._unregister_engine
234 unregistration_notification = self._unregister_engine
237 )
235 )
238 self.notifier_stream.on_recv(self.dispatch_notification)
236 self.notifier_stream.on_recv(self.dispatch_notification)
239 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
237 self.log.info("Scheduler started [%s]" % self.scheme_name)
240 self.auditor.start()
241 self.log.info("Scheduler started [%s]"%self.scheme_name)
242
238
243 def resume_receiving(self):
239 def resume_receiving(self):
244 """Resume accepting jobs."""
240 """Resume accepting jobs."""
@@ -438,15 +434,16 b' class TaskScheduler(SessionFactory):'
438 # turn timeouts into datetime objects:
434 # turn timeouts into datetime objects:
439 timeout = md.get('timeout', None)
435 timeout = md.get('timeout', None)
440 if timeout:
436 if timeout:
441 # cast to float, because jsonlib returns floats as decimal.Decimal,
437 timeout = time.time() + float(timeout)
442 # which timedelta does not accept
443 timeout = datetime.now() + timedelta(0,float(timeout),0)
444
438
445 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
439 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
446 header=header, targets=targets, after=after, follow=follow,
440 header=header, targets=targets, after=after, follow=follow,
447 timeout=timeout, metadata=md,
441 timeout=timeout, metadata=md,
448 )
442 )
449
443 if timeout:
444 # schedule timeout callback
445 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
446
450 # validate and reduce dependencies:
447 # validate and reduce dependencies:
451 for dep in after,follow:
448 for dep in after,follow:
452 if not dep: # empty dependency
449 if not dep: # empty dependency
@@ -470,21 +467,25 b' class TaskScheduler(SessionFactory):'
470 else:
467 else:
471 self.save_unmet(job)
468 self.save_unmet(job)
472
469
473 def audit_timeouts(self):
470 def job_timeout(self, job):
474 """Audit all waiting tasks for expired timeouts."""
471 """callback for a job's timeout.
475 now = datetime.now()
472
476 for msg_id in self.queue_map.keys():
473 The job may or may not have been run at this point.
477 # must recheck, in case one failure cascaded to another:
474 """
478 if msg_id in self.queue_map:
475 if job.timeout >= (time.time() + 1):
479 job = self.queue_map[msg_id]
476 self.log.warn("task %s timeout fired prematurely: %s > %s",
480 if job.timeout and job.timeout < now:
477 job.msg_id, job.timeout, now
481 self.fail_unreachable(msg_id, error.TaskTimeout)
478 )
479 if job.msg_id in self.queue_map:
480 # still waiting, but ran out of time
481 self.log.info("task %r timed out", job.msg_id)
482 self.fail_unreachable(job.msg_id, error.TaskTimeout)
482
483
483 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
484 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
484 """a task has become unreachable, send a reply with an ImpossibleDependency
485 """a task has become unreachable, send a reply with an ImpossibleDependency
485 error."""
486 error."""
486 if msg_id not in self.queue_map:
487 if msg_id not in self.queue_map:
487 self.log.error("msg %r already failed!", msg_id)
488 self.log.error("task %r already failed!", msg_id)
488 return
489 return
489 job = self.queue_map.pop(msg_id)
490 job = self.queue_map.pop(msg_id)
490 # lazy-delete from the queue
491 # lazy-delete from the queue
@@ -497,6 +498,7 b' class TaskScheduler(SessionFactory):'
497 raise why()
498 raise why()
498 except:
499 except:
499 content = error.wrap_exception()
500 content = error.wrap_exception()
501 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
500
502
501 self.all_done.add(msg_id)
503 self.all_done.add(msg_id)
502 self.all_failed.add(msg_id)
504 self.all_failed.add(msg_id)
@@ -791,7 +793,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non'
791 identity=b'task', in_thread=False):
793 identity=b'task', in_thread=False):
792
794
793 ZMQStream = zmqstream.ZMQStream
795 ZMQStream = zmqstream.ZMQStream
794
796 loglevel = logging.DEBUG
795 if config:
797 if config:
796 # unwrap dict back into Config
798 # unwrap dict back into Config
797 config = Config(config)
799 config = Config(config)
General Comments 0
You need to be logged in to leave comments. Login now