Show More
@@ -51,7 +51,7 b' from .dependency import Dependency' | |||
|
51 | 51 | @decorator |
|
52 | 52 | def logged(f,self,*args,**kwargs): |
|
53 | 53 | # print ("#--------------------") |
|
54 |
self.log.debug("scheduler::%s(*%s,**%s)" |
|
|
54 | self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs) | |
|
55 | 55 | # print ("#--") |
|
56 | 56 | return f(self,*args, **kwargs) |
|
57 | 57 | |
@@ -266,7 +266,6 b' class TaskScheduler(SessionFactory):' | |||
|
266 | 266 | self.failed.pop(uid) |
|
267 | 267 | |
|
268 | 268 | |
|
269 | @logged | |
|
270 | 269 | def handle_stranded_tasks(self, engine): |
|
271 | 270 | """Deal with jobs resident in an engine that died.""" |
|
272 | 271 | lost = self.pending[engine] |
@@ -298,7 +297,6 b' class TaskScheduler(SessionFactory):' | |||
|
298 | 297 | #----------------------------------------------------------------------- |
|
299 | 298 | # Job Submission |
|
300 | 299 | #----------------------------------------------------------------------- |
|
301 | @logged | |
|
302 | 300 | def dispatch_submission(self, raw_msg): |
|
303 | 301 | """Dispatch job submission to appropriate handlers.""" |
|
304 | 302 | # ensure targets up to date: |
@@ -366,7 +364,6 b' class TaskScheduler(SessionFactory):' | |||
|
366 | 364 | else: |
|
367 | 365 | self.save_unmet(msg_id, *args) |
|
368 | 366 | |
|
369 | # @logged | |
|
370 | 367 | def audit_timeouts(self): |
|
371 | 368 | """Audit all waiting tasks for expired timeouts.""" |
|
372 | 369 | now = datetime.now() |
@@ -377,12 +374,11 b' class TaskScheduler(SessionFactory):' | |||
|
377 | 374 | if timeout and timeout < now: |
|
378 | 375 | self.fail_unreachable(msg_id, error.TaskTimeout) |
|
379 | 376 | |
|
380 | @logged | |
|
381 | 377 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
382 | 378 | """a task has become unreachable, send a reply with an ImpossibleDependency |
|
383 | 379 | error.""" |
|
384 | 380 | if msg_id not in self.depending: |
|
385 |
self.log.error("msg %r already failed!" |
|
|
381 | self.log.error("msg %r already failed!", msg_id) | |
|
386 | 382 | return |
|
387 | 383 | raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id) |
|
388 | 384 | for mid in follow.union(after): |
@@ -407,7 +403,6 b' class TaskScheduler(SessionFactory):' | |||
|
407 | 403 | |
|
408 | 404 | self.update_graph(msg_id, success=False) |
|
409 | 405 | |
|
410 | @logged | |
|
411 | 406 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
412 | 407 | """check location dependencies, and run if they are met.""" |
|
413 | 408 | blacklist = self.blacklist.setdefault(msg_id, set()) |
@@ -459,7 +454,6 b' class TaskScheduler(SessionFactory):' | |||
|
459 | 454 | self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices) |
|
460 | 455 | return True |
|
461 | 456 | |
|
462 | @logged | |
|
463 | 457 | def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
464 | 458 | """Save a message for later submission when its dependencies are met.""" |
|
465 | 459 | self.depending[msg_id] = [raw_msg,targets,after,follow,timeout] |
@@ -469,7 +463,6 b' class TaskScheduler(SessionFactory):' | |||
|
469 | 463 | self.graph[dep_id] = set() |
|
470 | 464 | self.graph[dep_id].add(msg_id) |
|
471 | 465 | |
|
472 | @logged | |
|
473 | 466 | def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None): |
|
474 | 467 | """Submit a task to any of a subset of our targets.""" |
|
475 | 468 | if indices: |
@@ -496,7 +489,6 b' class TaskScheduler(SessionFactory):' | |||
|
496 | 489 | #----------------------------------------------------------------------- |
|
497 | 490 | # Result Handling |
|
498 | 491 | #----------------------------------------------------------------------- |
|
499 | @logged | |
|
500 | 492 | def dispatch_result(self, raw_msg): |
|
501 | 493 | """dispatch method for result replies""" |
|
502 | 494 | try: |
@@ -510,7 +502,7 b' class TaskScheduler(SessionFactory):' | |||
|
510 | 502 | else: |
|
511 | 503 | self.finish_job(idx) |
|
512 | 504 | except Exception: |
|
513 |
self.log.error("task::Invaid result: %r" |
|
|
505 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) | |
|
514 | 506 | return |
|
515 | 507 | |
|
516 | 508 | header = msg['header'] |
@@ -532,7 +524,6 b' class TaskScheduler(SessionFactory):' | |||
|
532 | 524 | else: |
|
533 | 525 | self.handle_unmet_dependency(idents, parent) |
|
534 | 526 | |
|
535 | @logged | |
|
536 | 527 | def handle_result(self, idents, parent, raw_msg, success=True): |
|
537 | 528 | """handle a real task result, either success or failure""" |
|
538 | 529 | # first, relay result to client |
@@ -557,7 +548,6 b' class TaskScheduler(SessionFactory):' | |||
|
557 | 548 | |
|
558 | 549 | self.update_graph(msg_id, success) |
|
559 | 550 | |
|
560 | @logged | |
|
561 | 551 | def handle_unmet_dependency(self, idents, parent): |
|
562 | 552 | """handle an unmet dependency""" |
|
563 | 553 | engine = idents[0] |
@@ -590,7 +580,6 b' class TaskScheduler(SessionFactory):' | |||
|
590 | 580 | |
|
591 | 581 | |
|
592 | 582 | |
|
593 | @logged | |
|
594 | 583 | def update_graph(self, dep_id=None, success=True): |
|
595 | 584 | """dep_id just finished. Update our dependency |
|
596 | 585 | graph and submit any jobs that just became runable. |
General Comments 0
You need to be logged in to leave comments.
Login now