Show More
@@ -217,8 +217,6 b' class TaskScheduler(SessionFactory):' | |||||
217 | all_done = Set() # set of all finished tasks=union(completed,failed) |
|
217 | all_done = Set() # set of all finished tasks=union(completed,failed) | |
218 | all_ids = Set() # set of all submitted task IDs |
|
218 | all_ids = Set() # set of all submitted task IDs | |
219 |
|
219 | |||
220 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') |
|
|||
221 |
|
||||
222 | ident = CBytes() # ZMQ identity. This should just be self.session.session |
|
220 | ident = CBytes() # ZMQ identity. This should just be self.session.session | |
223 | # but ensure Bytes |
|
221 | # but ensure Bytes | |
224 | def _ident_default(self): |
|
222 | def _ident_default(self): | |
@@ -236,9 +234,7 b' class TaskScheduler(SessionFactory):' | |||||
236 | unregistration_notification = self._unregister_engine |
|
234 | unregistration_notification = self._unregister_engine | |
237 | ) |
|
235 | ) | |
238 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
236 | self.notifier_stream.on_recv(self.dispatch_notification) | |
239 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz |
|
237 | self.log.info("Scheduler started [%s]" % self.scheme_name) | |
240 | self.auditor.start() |
|
|||
241 | self.log.info("Scheduler started [%s]"%self.scheme_name) |
|
|||
242 |
|
238 | |||
243 | def resume_receiving(self): |
|
239 | def resume_receiving(self): | |
244 | """Resume accepting jobs.""" |
|
240 | """Resume accepting jobs.""" | |
@@ -438,15 +434,16 b' class TaskScheduler(SessionFactory):' | |||||
438 | # turn timeouts into datetime objects: |
|
434 | # turn timeouts into datetime objects: | |
439 | timeout = md.get('timeout', None) |
|
435 | timeout = md.get('timeout', None) | |
440 | if timeout: |
|
436 | if timeout: | |
441 | # cast to float, because jsonlib returns floats as decimal.Decimal, |
|
437 | timeout = time.time() + float(timeout) | |
442 | # which timedelta does not accept |
|
|||
443 | timeout = datetime.now() + timedelta(0,float(timeout),0) |
|
|||
444 |
|
438 | |||
445 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, |
|
439 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, | |
446 | header=header, targets=targets, after=after, follow=follow, |
|
440 | header=header, targets=targets, after=after, follow=follow, | |
447 | timeout=timeout, metadata=md, |
|
441 | timeout=timeout, metadata=md, | |
448 | ) |
|
442 | ) | |
449 |
|
443 | if timeout: | ||
|
444 | # schedule timeout callback | |||
|
445 | self.loop.add_timeout(timeout, lambda : self.job_timeout(job)) | |||
|
446 | ||||
450 | # validate and reduce dependencies: |
|
447 | # validate and reduce dependencies: | |
451 | for dep in after,follow: |
|
448 | for dep in after,follow: | |
452 | if not dep: # empty dependency |
|
449 | if not dep: # empty dependency | |
@@ -470,21 +467,25 b' class TaskScheduler(SessionFactory):' | |||||
470 | else: |
|
467 | else: | |
471 | self.save_unmet(job) |
|
468 | self.save_unmet(job) | |
472 |
|
469 | |||
473 |
def |
|
470 | def job_timeout(self, job): | |
474 |
""" |
|
471 | """callback for a job's timeout. | |
475 | now = datetime.now() |
|
472 | ||
476 | for msg_id in self.queue_map.keys(): |
|
473 | The job may or may not have been run at this point. | |
477 | # must recheck, in case one failure cascaded to another: |
|
474 | """ | |
478 | if msg_id in self.queue_map: |
|
475 | if job.timeout >= (time.time() + 1): | |
479 | job = self.queue_map[msg_id] |
|
476 | self.log.warn("task %s timeout fired prematurely: %s > %s", | |
480 |
|
|
477 | job.msg_id, job.timeout, now | |
481 | self.fail_unreachable(msg_id, error.TaskTimeout) |
|
478 | ) | |
|
479 | if job.msg_id in self.queue_map: | |||
|
480 | # still waiting, but ran out of time | |||
|
481 | self.log.info("task %r timed out", job.msg_id) | |||
|
482 | self.fail_unreachable(job.msg_id, error.TaskTimeout) | |||
482 |
|
483 | |||
483 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
484 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | |
484 | """a task has become unreachable, send a reply with an ImpossibleDependency |
|
485 | """a task has become unreachable, send a reply with an ImpossibleDependency | |
485 | error.""" |
|
486 | error.""" | |
486 | if msg_id not in self.queue_map: |
|
487 | if msg_id not in self.queue_map: | |
487 |
self.log.error(" |
|
488 | self.log.error("task %r already failed!", msg_id) | |
488 | return |
|
489 | return | |
489 | job = self.queue_map.pop(msg_id) |
|
490 | job = self.queue_map.pop(msg_id) | |
490 | # lazy-delete from the queue |
|
491 | # lazy-delete from the queue | |
@@ -497,6 +498,7 b' class TaskScheduler(SessionFactory):' | |||||
497 | raise why() |
|
498 | raise why() | |
498 | except: |
|
499 | except: | |
499 | content = error.wrap_exception() |
|
500 | content = error.wrap_exception() | |
|
501 | self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename']) | |||
500 |
|
502 | |||
501 | self.all_done.add(msg_id) |
|
503 | self.all_done.add(msg_id) | |
502 | self.all_failed.add(msg_id) |
|
504 | self.all_failed.add(msg_id) | |
@@ -791,7 +793,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non' | |||||
791 | identity=b'task', in_thread=False): |
|
793 | identity=b'task', in_thread=False): | |
792 |
|
794 | |||
793 | ZMQStream = zmqstream.ZMQStream |
|
795 | ZMQStream = zmqstream.ZMQStream | |
794 |
|
796 | loglevel = logging.DEBUG | ||
795 | if config: |
|
797 | if config: | |
796 | # unwrap dict back into Config |
|
798 | # unwrap dict back into Config | |
797 | config = Config(config) |
|
799 | config = Config(config) |
General Comments 0
You need to be logged in to leave comments.
Login now