Show More
@@ -457,7 +457,7 b' class Hub(SessionFactory):' | |||
|
457 | 457 | def dispatch_monitor_traffic(self, msg): |
|
458 | 458 | """all ME and Task queue messages come through here, as well as |
|
459 | 459 | IOPub traffic.""" |
|
460 |
self.log.debug("monitor traffic: %r", msg[ |
|
|
460 | self.log.debug("monitor traffic: %r", msg[0]) | |
|
461 | 461 | switch = msg[0] |
|
462 | 462 | try: |
|
463 | 463 | idents, msg = self.session.feed_identities(msg[1:]) |
@@ -191,6 +191,8 b' class TaskScheduler(SessionFactory):' | |||
|
191 | 191 | |
|
192 | 192 | def start(self): |
|
193 | 193 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
194 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |
|
195 | ||
|
194 | 196 | self._notification_handlers = dict( |
|
195 | 197 | registration_notification = self._register_engine, |
|
196 | 198 | unregistration_notification = self._unregister_engine |
@@ -247,8 +249,7 b' class TaskScheduler(SessionFactory):' | |||
|
247 | 249 | self.completed[uid] = set() |
|
248 | 250 | self.failed[uid] = set() |
|
249 | 251 | self.pending[uid] = {} |
|
250 | if len(self.targets) == 1: | |
|
251 | self.resume_receiving() | |
|
252 | ||
|
252 | 253 | # rescan the graph: |
|
253 | 254 | self.update_graph(None) |
|
254 | 255 | |
@@ -256,7 +257,7 b' class TaskScheduler(SessionFactory):' | |||
|
256 | 257 | """Existing engine with ident `uid` became unavailable.""" |
|
257 | 258 | if len(self.targets) == 1: |
|
258 | 259 | # this was our only engine |
|
259 | self.stop_receiving() | |
|
260 | pass | |
|
260 | 261 | |
|
261 | 262 | # handle any potentially finished tasks: |
|
262 | 263 | self.engine_stream.flush() |
@@ -439,6 +440,11 b' class TaskScheduler(SessionFactory):' | |||
|
439 | 440 | |
|
440 | 441 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
441 | 442 | """check location dependencies, and run if they are met.""" |
|
443 | self.log.debug("Attempting to assign task %s", msg_id) | |
|
444 | if not self.targets: | |
|
445 | # no engines, definitely can't run | |
|
446 | return False | |
|
447 | ||
|
442 | 448 | blacklist = self.blacklist.setdefault(msg_id, set()) |
|
443 | 449 | if follow or targets or blacklist or self.hwm: |
|
444 | 450 | # we need a can_run filter |
General Comments 0
You need to be logged in to leave comments.
Login now