From 821fac24f841ada02dca38f19cd593a326f081a9 2012-02-09 21:55:18 From: Min RK Date: 2012-02-09 21:55:18 Subject: [PATCH] Merge pull request #1391 from minrk/no_engines Improve Hub/Scheduler when no engines are registered 1. Tasks are pulled into the schedule, rather than left on the ZMQ queue, which means they enter the database. 2. queue_status will not raise NoEngines when there aren't any, instead it will still fetch the available information. Bug fixed in db_query, where behavior did not match docstring (buffers should be excluded if no keys are specified). closes #826 (again) --- diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index e7fd57d..cb8e55a 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -1308,7 +1308,11 @@ class Client(HasTraits): verbose : bool Whether to return lengths only, or lists of ids for each element """ - engine_ids = self._build_targets(targets)[1] + if targets == 'all': + # allow 'all' to be evaluated on the engine + engine_ids = None + else: + engine_ids = self._build_targets(targets)[1] content = dict(targets=engine_ids, verbose=verbose) self.session.send(self._query_socket, "queue_request", content=content) idents,msg = self.session.recv(self._query_socket, 0) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index f557214..eea28ed 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -430,7 +430,7 @@ class Hub(SessionFactory): """turn any valid targets argument into a list of integer ids""" if targets is None: # default to all - targets = self.ids + return self.ids if isinstance(targets, (int,str,unicode)): # only one target specified @@ -457,7 +457,7 @@ class Hub(SessionFactory): def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" - self.log.debug("monitor traffic: %r", msg[:2]) + self.log.debug("monitor traffic: %r", msg[0]) switch = msg[0] try: idents, msg = self.session.feed_identities(msg[1:]) @@ -1271,17 +1271,17 @@ class Hub(SessionFactory): buffer_lens = [] if 'buffers' in keys else None result_buffer_lens = [] if 'result_buffers' in keys else None else: - buffer_lens = [] - result_buffer_lens = [] + buffer_lens = None + result_buffer_lens = None for rec in records: # buffers may be None, so double check + b = rec.pop('buffers', empty) or empty if buffer_lens is not None: - b = rec.pop('buffers', empty) or empty buffer_lens.append(len(b)) buffers.extend(b) + rb = rec.pop('result_buffers', empty) or empty if result_buffer_lens is not None: - rb = rec.pop('result_buffers', empty) or empty result_buffer_lens.append(len(rb)) buffers.extend(rb) content = dict(status='ok', records=records, buffer_lens=buffer_lens, diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index bdfa3dc..2f49a90 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -191,6 +191,8 @@ class TaskScheduler(SessionFactory): def start(self): self.engine_stream.on_recv(self.dispatch_result, copy=False) + self.client_stream.on_recv(self.dispatch_submission, copy=False) + self._notification_handlers = dict( registration_notification = self._register_engine, unregistration_notification = self._unregister_engine @@ -247,8 +249,7 @@ class TaskScheduler(SessionFactory): self.completed[uid] = set() self.failed[uid] = set() self.pending[uid] = {} - if len(self.targets) == 1: - self.resume_receiving() + # rescan the graph: self.update_graph(None) @@ -256,7 +257,7 @@ class TaskScheduler(SessionFactory): """Existing engine with ident `uid` became unavailable.""" if len(self.targets) == 1: # this was our only engine - self.stop_receiving() + pass # handle any potentially finished tasks: self.engine_stream.flush() @@ -445,6 +446,11 @@ class TaskScheduler(SessionFactory): def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): """check location dependencies, and run if they are met.""" + self.log.debug("Attempting to assign task %s", msg_id) + if not self.targets: + # no engines, definitely can't run + return False + blacklist = self.blacklist.setdefault(msg_id, set()) if follow or targets or blacklist or self.hwm: # we need a can_run filter diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 9ad18b8..db58524 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -223,6 +223,14 @@ class TestClient(ClusterTestCase): for rec in found: self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed'])) + def test_db_query_default_keys(self): + """default db_query excludes buffers""" + found = self.client.db_query({'msg_id': {'$ne' : ''}}) + for rec in found: + keys = set(rec.keys()) + self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys) + self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys) + def test_db_query_msg_id(self): """ensure msg_id is always in db queries""" found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])