From 11fd67e319833c2501e6974c28e5d170b5f527fc 2012-02-08 21:36:12 From: MinRK Date: 2012-02-08 21:36:12 Subject: [PATCH] receive tasks, even when no engines are registered Previously, tasks submitted when no engines were registered were left in the upstream ZMQ queue. This prevented the tasks being entered into the Hub's database. --- diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index f557214..4e2e5a8 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -457,7 +457,7 @@ class Hub(SessionFactory): def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" - self.log.debug("monitor traffic: %r", msg[:2]) + self.log.debug("monitor traffic: %r", msg[0]) switch = msg[0] try: idents, msg = self.session.feed_identities(msg[1:]) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index bf9a32e..6349de2 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -191,6 +191,8 @@ class TaskScheduler(SessionFactory): def start(self): self.engine_stream.on_recv(self.dispatch_result, copy=False) + self.client_stream.on_recv(self.dispatch_submission, copy=False) + self._notification_handlers = dict( registration_notification = self._register_engine, unregistration_notification = self._unregister_engine @@ -247,8 +249,7 @@ class TaskScheduler(SessionFactory): self.completed[uid] = set() self.failed[uid] = set() self.pending[uid] = {} - if len(self.targets) == 1: - self.resume_receiving() + # rescan the graph: self.update_graph(None) @@ -256,7 +257,7 @@ class TaskScheduler(SessionFactory): """Existing engine with ident `uid` became unavailable.""" if len(self.targets) == 1: # this was our only engine - self.stop_receiving() + pass # handle any potentially finished tasks: self.engine_stream.flush() @@ -439,6 +440,11 @@ class TaskScheduler(SessionFactory): def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): """check location dependencies, and run if they are met.""" + self.log.debug("Attempting to assign task %s", msg_id) + if not self.targets: + # no engines, definitely can't run + return False + blacklist = self.blacklist.setdefault(msg_id, set()) if follow or targets or blacklist or self.hwm: # we need a can_run filter