Show More
@@ -51,7 +51,7 b' from .dependency import Dependency' | |||||
51 | @decorator |
|
51 | @decorator | |
52 | def logged(f,self,*args,**kwargs): |
|
52 | def logged(f,self,*args,**kwargs): | |
53 | # print ("#--------------------") |
|
53 | # print ("#--------------------") | |
54 |
self.log.debug("scheduler::%s(*%s,**%s)" |
|
54 | self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs) | |
55 | # print ("#--") |
|
55 | # print ("#--") | |
56 | return f(self,*args, **kwargs) |
|
56 | return f(self,*args, **kwargs) | |
57 |
|
57 | |||
@@ -266,7 +266,6 b' class TaskScheduler(SessionFactory):' | |||||
266 | self.failed.pop(uid) |
|
266 | self.failed.pop(uid) | |
267 |
|
267 | |||
268 |
|
268 | |||
269 | @logged |
|
|||
270 | def handle_stranded_tasks(self, engine): |
|
269 | def handle_stranded_tasks(self, engine): | |
271 | """Deal with jobs resident in an engine that died.""" |
|
270 | """Deal with jobs resident in an engine that died.""" | |
272 | lost = self.pending[engine] |
|
271 | lost = self.pending[engine] | |
@@ -298,7 +297,6 b' class TaskScheduler(SessionFactory):' | |||||
298 | #----------------------------------------------------------------------- |
|
297 | #----------------------------------------------------------------------- | |
299 | # Job Submission |
|
298 | # Job Submission | |
300 | #----------------------------------------------------------------------- |
|
299 | #----------------------------------------------------------------------- | |
301 | @logged |
|
|||
302 | def dispatch_submission(self, raw_msg): |
|
300 | def dispatch_submission(self, raw_msg): | |
303 | """Dispatch job submission to appropriate handlers.""" |
|
301 | """Dispatch job submission to appropriate handlers.""" | |
304 | # ensure targets up to date: |
|
302 | # ensure targets up to date: | |
@@ -366,7 +364,6 b' class TaskScheduler(SessionFactory):' | |||||
366 | else: |
|
364 | else: | |
367 | self.save_unmet(msg_id, *args) |
|
365 | self.save_unmet(msg_id, *args) | |
368 |
|
366 | |||
369 | # @logged |
|
|||
370 | def audit_timeouts(self): |
|
367 | def audit_timeouts(self): | |
371 | """Audit all waiting tasks for expired timeouts.""" |
|
368 | """Audit all waiting tasks for expired timeouts.""" | |
372 | now = datetime.now() |
|
369 | now = datetime.now() | |
@@ -377,12 +374,11 b' class TaskScheduler(SessionFactory):' | |||||
377 | if timeout and timeout < now: |
|
374 | if timeout and timeout < now: | |
378 | self.fail_unreachable(msg_id, error.TaskTimeout) |
|
375 | self.fail_unreachable(msg_id, error.TaskTimeout) | |
379 |
|
376 | |||
380 | @logged |
|
|||
381 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
377 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | |
382 | """a task has become unreachable, send a reply with an ImpossibleDependency |
|
378 | """a task has become unreachable, send a reply with an ImpossibleDependency | |
383 | error.""" |
|
379 | error.""" | |
384 | if msg_id not in self.depending: |
|
380 | if msg_id not in self.depending: | |
385 |
self.log.error("msg %r already failed!" |
|
381 | self.log.error("msg %r already failed!", msg_id) | |
386 | return |
|
382 | return | |
387 | raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id) |
|
383 | raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id) | |
388 | for mid in follow.union(after): |
|
384 | for mid in follow.union(after): | |
@@ -407,7 +403,6 b' class TaskScheduler(SessionFactory):' | |||||
407 |
|
403 | |||
408 | self.update_graph(msg_id, success=False) |
|
404 | self.update_graph(msg_id, success=False) | |
409 |
|
405 | |||
410 | @logged |
|
|||
411 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
406 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): | |
412 | """check location dependencies, and run if they are met.""" |
|
407 | """check location dependencies, and run if they are met.""" | |
413 | blacklist = self.blacklist.setdefault(msg_id, set()) |
|
408 | blacklist = self.blacklist.setdefault(msg_id, set()) | |
@@ -459,7 +454,6 b' class TaskScheduler(SessionFactory):' | |||||
459 | self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices) |
|
454 | self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices) | |
460 | return True |
|
455 | return True | |
461 |
|
456 | |||
462 | @logged |
|
|||
463 | def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
457 | def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout): | |
464 | """Save a message for later submission when its dependencies are met.""" |
|
458 | """Save a message for later submission when its dependencies are met.""" | |
465 | self.depending[msg_id] = [raw_msg,targets,after,follow,timeout] |
|
459 | self.depending[msg_id] = [raw_msg,targets,after,follow,timeout] | |
@@ -469,7 +463,6 b' class TaskScheduler(SessionFactory):' | |||||
469 | self.graph[dep_id] = set() |
|
463 | self.graph[dep_id] = set() | |
470 | self.graph[dep_id].add(msg_id) |
|
464 | self.graph[dep_id].add(msg_id) | |
471 |
|
465 | |||
472 | @logged |
|
|||
473 | def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None): |
|
466 | def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None): | |
474 | """Submit a task to any of a subset of our targets.""" |
|
467 | """Submit a task to any of a subset of our targets.""" | |
475 | if indices: |
|
468 | if indices: | |
@@ -496,7 +489,6 b' class TaskScheduler(SessionFactory):' | |||||
496 | #----------------------------------------------------------------------- |
|
489 | #----------------------------------------------------------------------- | |
497 | # Result Handling |
|
490 | # Result Handling | |
498 | #----------------------------------------------------------------------- |
|
491 | #----------------------------------------------------------------------- | |
499 | @logged |
|
|||
500 | def dispatch_result(self, raw_msg): |
|
492 | def dispatch_result(self, raw_msg): | |
501 | """dispatch method for result replies""" |
|
493 | """dispatch method for result replies""" | |
502 | try: |
|
494 | try: | |
@@ -510,7 +502,7 b' class TaskScheduler(SessionFactory):' | |||||
510 | else: |
|
502 | else: | |
511 | self.finish_job(idx) |
|
503 | self.finish_job(idx) | |
512 | except Exception: |
|
504 | except Exception: | |
513 |
self.log.error("task::Invaid result: %r" |
|
505 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) | |
514 | return |
|
506 | return | |
515 |
|
507 | |||
516 | header = msg['header'] |
|
508 | header = msg['header'] | |
@@ -532,7 +524,6 b' class TaskScheduler(SessionFactory):' | |||||
532 | else: |
|
524 | else: | |
533 | self.handle_unmet_dependency(idents, parent) |
|
525 | self.handle_unmet_dependency(idents, parent) | |
534 |
|
526 | |||
535 | @logged |
|
|||
536 | def handle_result(self, idents, parent, raw_msg, success=True): |
|
527 | def handle_result(self, idents, parent, raw_msg, success=True): | |
537 | """handle a real task result, either success or failure""" |
|
528 | """handle a real task result, either success or failure""" | |
538 | # first, relay result to client |
|
529 | # first, relay result to client | |
@@ -557,7 +548,6 b' class TaskScheduler(SessionFactory):' | |||||
557 |
|
548 | |||
558 | self.update_graph(msg_id, success) |
|
549 | self.update_graph(msg_id, success) | |
559 |
|
550 | |||
560 | @logged |
|
|||
561 | def handle_unmet_dependency(self, idents, parent): |
|
551 | def handle_unmet_dependency(self, idents, parent): | |
562 | """handle an unmet dependency""" |
|
552 | """handle an unmet dependency""" | |
563 | engine = idents[0] |
|
553 | engine = idents[0] | |
@@ -590,7 +580,6 b' class TaskScheduler(SessionFactory):' | |||||
590 |
|
580 | |||
591 |
|
581 | |||
592 |
|
582 | |||
593 | @logged |
|
|||
594 | def update_graph(self, dep_id=None, success=True): |
|
583 | def update_graph(self, dep_id=None, success=True): | |
595 | """dep_id just finished. Update our dependency |
|
584 | """dep_id just finished. Update our dependency | |
596 | graph and submit any jobs that just became runable. |
|
585 | graph and submit any jobs that just became runable. |
General Comments 0
You need to be logged in to leave comments.
Login now