diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index f557214..4e2e5a8 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -457,7 +457,7 @@ class Hub(SessionFactory): def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" - self.log.debug("monitor traffic: %r", msg[:2]) + self.log.debug("monitor traffic: %r", msg[0]) switch = msg[0] try: idents, msg = self.session.feed_identities(msg[1:]) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index bf9a32e..6349de2 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -191,6 +191,8 @@ class TaskScheduler(SessionFactory): def start(self): self.engine_stream.on_recv(self.dispatch_result, copy=False) + self.client_stream.on_recv(self.dispatch_submission, copy=False) + self._notification_handlers = dict( registration_notification = self._register_engine, unregistration_notification = self._unregister_engine @@ -247,8 +249,7 @@ class TaskScheduler(SessionFactory): self.completed[uid] = set() self.failed[uid] = set() self.pending[uid] = {} - if len(self.targets) == 1: - self.resume_receiving() + # rescan the graph: self.update_graph(None) @@ -256,7 +257,7 @@ class TaskScheduler(SessionFactory): """Existing engine with ident `uid` became unavailable.""" if len(self.targets) == 1: # this was our only engine - self.stop_receiving() + pass # handle any potentially finished tasks: self.engine_stream.flush() @@ -439,6 +440,11 @@ class TaskScheduler(SessionFactory): def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): """check location dependencies, and run if they are met.""" + self.log.debug("Attempting to assign task %s", msg_id) + if not self.targets: + # no engines, definitely can't run + return False + blacklist = self.blacklist.setdefault(msg_id, set()) if follow or targets or blacklist or self.hwm: # we need a can_run filter