##// END OF EJS Templates
remove debug statements from Scheduler...
MinRK -
Show More
@@ -51,7 +51,7 b' from .dependency import Dependency'
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
@@ -266,7 +266,6 b' class TaskScheduler(SessionFactory):'
266 self.failed.pop(uid)
266 self.failed.pop(uid)
267
267
268
268
269 @logged
270 def handle_stranded_tasks(self, engine):
269 def handle_stranded_tasks(self, engine):
271 """Deal with jobs resident in an engine that died."""
270 """Deal with jobs resident in an engine that died."""
272 lost = self.pending[engine]
271 lost = self.pending[engine]
@@ -298,7 +297,6 b' class TaskScheduler(SessionFactory):'
298 #-----------------------------------------------------------------------
297 #-----------------------------------------------------------------------
299 # Job Submission
298 # Job Submission
300 #-----------------------------------------------------------------------
299 #-----------------------------------------------------------------------
301 @logged
302 def dispatch_submission(self, raw_msg):
300 def dispatch_submission(self, raw_msg):
303 """Dispatch job submission to appropriate handlers."""
301 """Dispatch job submission to appropriate handlers."""
304 # ensure targets up to date:
302 # ensure targets up to date:
@@ -366,7 +364,6 b' class TaskScheduler(SessionFactory):'
366 else:
364 else:
367 self.save_unmet(msg_id, *args)
365 self.save_unmet(msg_id, *args)
368
366
369 # @logged
370 def audit_timeouts(self):
367 def audit_timeouts(self):
371 """Audit all waiting tasks for expired timeouts."""
368 """Audit all waiting tasks for expired timeouts."""
372 now = datetime.now()
369 now = datetime.now()
@@ -377,12 +374,11 b' class TaskScheduler(SessionFactory):'
377 if timeout and timeout < now:
374 if timeout and timeout < now:
378 self.fail_unreachable(msg_id, error.TaskTimeout)
375 self.fail_unreachable(msg_id, error.TaskTimeout)
379
376
380 @logged
381 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
377 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
382 """a task has become unreachable, send a reply with an ImpossibleDependency
378 """a task has become unreachable, send a reply with an ImpossibleDependency
383 error."""
379 error."""
384 if msg_id not in self.depending:
380 if msg_id not in self.depending:
385 self.log.error("msg %r already failed!"%msg_id)
381 self.log.error("msg %r already failed!", msg_id)
386 return
382 return
387 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
383 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
388 for mid in follow.union(after):
384 for mid in follow.union(after):
@@ -407,7 +403,6 b' class TaskScheduler(SessionFactory):'
407
403
408 self.update_graph(msg_id, success=False)
404 self.update_graph(msg_id, success=False)
409
405
410 @logged
411 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
406 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
412 """check location dependencies, and run if they are met."""
407 """check location dependencies, and run if they are met."""
413 blacklist = self.blacklist.setdefault(msg_id, set())
408 blacklist = self.blacklist.setdefault(msg_id, set())
@@ -459,7 +454,6 b' class TaskScheduler(SessionFactory):'
459 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
454 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
460 return True
455 return True
461
456
462 @logged
463 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
457 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
464 """Save a message for later submission when its dependencies are met."""
458 """Save a message for later submission when its dependencies are met."""
465 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
459 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
@@ -469,7 +463,6 b' class TaskScheduler(SessionFactory):'
469 self.graph[dep_id] = set()
463 self.graph[dep_id] = set()
470 self.graph[dep_id].add(msg_id)
464 self.graph[dep_id].add(msg_id)
471
465
472 @logged
473 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
466 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
474 """Submit a task to any of a subset of our targets."""
467 """Submit a task to any of a subset of our targets."""
475 if indices:
468 if indices:
@@ -496,7 +489,6 b' class TaskScheduler(SessionFactory):'
496 #-----------------------------------------------------------------------
489 #-----------------------------------------------------------------------
497 # Result Handling
490 # Result Handling
498 #-----------------------------------------------------------------------
491 #-----------------------------------------------------------------------
499 @logged
500 def dispatch_result(self, raw_msg):
492 def dispatch_result(self, raw_msg):
501 """dispatch method for result replies"""
493 """dispatch method for result replies"""
502 try:
494 try:
@@ -510,7 +502,7 b' class TaskScheduler(SessionFactory):'
510 else:
502 else:
511 self.finish_job(idx)
503 self.finish_job(idx)
512 except Exception:
504 except Exception:
513 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
505 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
514 return
506 return
515
507
516 header = msg['header']
508 header = msg['header']
@@ -532,7 +524,6 b' class TaskScheduler(SessionFactory):'
532 else:
524 else:
533 self.handle_unmet_dependency(idents, parent)
525 self.handle_unmet_dependency(idents, parent)
534
526
535 @logged
536 def handle_result(self, idents, parent, raw_msg, success=True):
527 def handle_result(self, idents, parent, raw_msg, success=True):
537 """handle a real task result, either success or failure"""
528 """handle a real task result, either success or failure"""
538 # first, relay result to client
529 # first, relay result to client
@@ -557,7 +548,6 b' class TaskScheduler(SessionFactory):'
557
548
558 self.update_graph(msg_id, success)
549 self.update_graph(msg_id, success)
559
550
560 @logged
561 def handle_unmet_dependency(self, idents, parent):
551 def handle_unmet_dependency(self, idents, parent):
562 """handle an unmet dependency"""
552 """handle an unmet dependency"""
563 engine = idents[0]
553 engine = idents[0]
@@ -590,7 +580,6 b' class TaskScheduler(SessionFactory):'
590
580
591
581
592
582
593 @logged
594 def update_graph(self, dep_id=None, success=True):
583 def update_graph(self, dep_id=None, success=True):
595 """dep_id just finished. Update our dependency
584 """dep_id just finished. Update our dependency
596 graph and submit any jobs that just became runable.
585 graph and submit any jobs that just became runable.
General Comments 0
You need to be logged in to leave comments. Login now