##// END OF EJS Templates
receive tasks, even when no engines are registered...
MinRK -
Show More
@@ -457,7 +457,7 b' class Hub(SessionFactory):'
457 def dispatch_monitor_traffic(self, msg):
457 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
458 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r", msg[:2])
460 self.log.debug("monitor traffic: %r", msg[0])
461 switch = msg[0]
461 switch = msg[0]
462 try:
462 try:
463 idents, msg = self.session.feed_identities(msg[1:])
463 idents, msg = self.session.feed_identities(msg[1:])
@@ -191,6 +191,8 b' class TaskScheduler(SessionFactory):'
191
191
192 def start(self):
192 def start(self):
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
194 self.client_stream.on_recv(self.dispatch_submission, copy=False)
195
194 self._notification_handlers = dict(
196 self._notification_handlers = dict(
195 registration_notification = self._register_engine,
197 registration_notification = self._register_engine,
196 unregistration_notification = self._unregister_engine
198 unregistration_notification = self._unregister_engine
@@ -247,8 +249,7 b' class TaskScheduler(SessionFactory):'
247 self.completed[uid] = set()
249 self.completed[uid] = set()
248 self.failed[uid] = set()
250 self.failed[uid] = set()
249 self.pending[uid] = {}
251 self.pending[uid] = {}
250 if len(self.targets) == 1:
252
251 self.resume_receiving()
252 # rescan the graph:
253 # rescan the graph:
253 self.update_graph(None)
254 self.update_graph(None)
254
255
@@ -256,7 +257,7 b' class TaskScheduler(SessionFactory):'
256 """Existing engine with ident `uid` became unavailable."""
257 """Existing engine with ident `uid` became unavailable."""
257 if len(self.targets) == 1:
258 if len(self.targets) == 1:
258 # this was our only engine
259 # this was our only engine
259 self.stop_receiving()
260 pass
260
261
261 # handle any potentially finished tasks:
262 # handle any potentially finished tasks:
262 self.engine_stream.flush()
263 self.engine_stream.flush()
@@ -439,6 +440,11 b' class TaskScheduler(SessionFactory):'
439
440
440 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
441 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
441 """check location dependencies, and run if they are met."""
442 """check location dependencies, and run if they are met."""
443 self.log.debug("Attempting to assign task %s", msg_id)
444 if not self.targets:
445 # no engines, definitely can't run
446 return False
447
442 blacklist = self.blacklist.setdefault(msg_id, set())
448 blacklist = self.blacklist.setdefault(msg_id, set())
443 if follow or targets or blacklist or self.hwm:
449 if follow or targets or blacklist or self.hwm:
444 # we need a can_run filter
450 # we need a can_run filter
General Comments 0
You need to be logged in to leave comments. Login now