Show More
@@ -457,7 +457,7 b' class Hub(SessionFactory):' | |||||
457 | def dispatch_monitor_traffic(self, msg): |
|
457 | def dispatch_monitor_traffic(self, msg): | |
458 | """all ME and Task queue messages come through here, as well as |
|
458 | """all ME and Task queue messages come through here, as well as | |
459 | IOPub traffic.""" |
|
459 | IOPub traffic.""" | |
460 |
self.log.debug("monitor traffic: %r", msg[ |
|
460 | self.log.debug("monitor traffic: %r", msg[0]) | |
461 | switch = msg[0] |
|
461 | switch = msg[0] | |
462 | try: |
|
462 | try: | |
463 | idents, msg = self.session.feed_identities(msg[1:]) |
|
463 | idents, msg = self.session.feed_identities(msg[1:]) |
@@ -191,6 +191,8 b' class TaskScheduler(SessionFactory):' | |||||
191 |
|
191 | |||
192 | def start(self): |
|
192 | def start(self): | |
193 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
193 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
|
194 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |||
|
195 | ||||
194 | self._notification_handlers = dict( |
|
196 | self._notification_handlers = dict( | |
195 | registration_notification = self._register_engine, |
|
197 | registration_notification = self._register_engine, | |
196 | unregistration_notification = self._unregister_engine |
|
198 | unregistration_notification = self._unregister_engine | |
@@ -247,8 +249,7 b' class TaskScheduler(SessionFactory):' | |||||
247 | self.completed[uid] = set() |
|
249 | self.completed[uid] = set() | |
248 | self.failed[uid] = set() |
|
250 | self.failed[uid] = set() | |
249 | self.pending[uid] = {} |
|
251 | self.pending[uid] = {} | |
250 | if len(self.targets) == 1: |
|
252 | ||
251 | self.resume_receiving() |
|
|||
252 | # rescan the graph: |
|
253 | # rescan the graph: | |
253 | self.update_graph(None) |
|
254 | self.update_graph(None) | |
254 |
|
255 | |||
@@ -256,7 +257,7 b' class TaskScheduler(SessionFactory):' | |||||
256 | """Existing engine with ident `uid` became unavailable.""" |
|
257 | """Existing engine with ident `uid` became unavailable.""" | |
257 | if len(self.targets) == 1: |
|
258 | if len(self.targets) == 1: | |
258 | # this was our only engine |
|
259 | # this was our only engine | |
259 | self.stop_receiving() |
|
260 | pass | |
260 |
|
261 | |||
261 | # handle any potentially finished tasks: |
|
262 | # handle any potentially finished tasks: | |
262 | self.engine_stream.flush() |
|
263 | self.engine_stream.flush() | |
@@ -439,6 +440,11 b' class TaskScheduler(SessionFactory):' | |||||
439 |
|
440 | |||
440 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
441 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): | |
441 | """check location dependencies, and run if they are met.""" |
|
442 | """check location dependencies, and run if they are met.""" | |
|
443 | self.log.debug("Attempting to assign task %s", msg_id) | |||
|
444 | if not self.targets: | |||
|
445 | # no engines, definitely can't run | |||
|
446 | return False | |||
|
447 | ||||
442 | blacklist = self.blacklist.setdefault(msg_id, set()) |
|
448 | blacklist = self.blacklist.setdefault(msg_id, set()) | |
443 | if follow or targets or blacklist or self.hwm: |
|
449 | if follow or targets or blacklist or self.hwm: | |
444 | # we need a can_run filter |
|
450 | # we need a can_run filter |
General Comments 0
You need to be logged in to leave comments.
Login now