diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index bbc7a87..8a8c568 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -1305,14 +1305,15 @@ class Client(HasTraits): Individual results can be purged by msg_id, or the entire history of specific targets can be purged. + Use `purge_results('all')` to scrub everything from the Hub's db. + Parameters ---------- jobs : str or list of str or AsyncResult objects the msg_ids whose results should be forgotten. targets : int/str/list of ints/strs - The targets, by uuid or int_id, whose entire history is to be purged. - Use `targets='all'` to scrub everything from the Hub's memory. + The targets, by int_id, whose entire history is to be purged. default : None """ @@ -1322,19 +1323,22 @@ class Client(HasTraits): targets = self._build_targets(targets)[1] # construct msg_ids from jobs - msg_ids = [] - if isinstance(jobs, (basestring,AsyncResult)): - jobs = [jobs] - bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) - if bad_ids: - raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) - for j in jobs: - if isinstance(j, AsyncResult): - msg_ids.extend(j.msg_ids) - else: - msg_ids.append(j) - - content = dict(targets=targets, msg_ids=msg_ids) + if jobs == 'all': + msg_ids = jobs + else: + msg_ids = [] + if isinstance(jobs, (basestring,AsyncResult)): + jobs = [jobs] + bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) + if bad_ids: + raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) + for j in jobs: + if isinstance(j, AsyncResult): + msg_ids.extend(j.msg_ids) + else: + msg_ids.append(j) + + content = dict(engine_ids=targets, msg_ids=msg_ids) self.session.send(self._query_socket, "purge_request", content=content) idents, msg = self.session.recv(self._query_socket, 0) if self.debug: diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index ab41713..7ee5977 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -1063,6 +1063,7 @@ class Hub(SessionFactory): """Purge results from memory. This method is more valuable before we move to a DB based message storage mechanism.""" content = msg['content'] + self.log.info("Dropping records with %s", content) msg_ids = content.get('msg_ids', []) reply = dict(status='ok') if msg_ids == 'all': @@ -1092,7 +1093,6 @@ class Hub(SessionFactory): except: reply = error.wrap_exception() break - msg_ids = self.completed.pop(eid) uid = self.engines[eid].queue try: self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 99c8105..2153ba8 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -242,8 +242,16 @@ class TestClient(ClusterTestCase): self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) def test_purge_results(self): + # ensure there are some tasks + for i in range(5): + self.client[:].apply_sync(lambda : 1) hist = self.client.hub_history() - self.client.purge_results(hist) + self.client.purge_results(hist[-1]) newhist = self.client.hub_history() - self.assertTrue(len(newhist) == 0) + self.assertEquals(len(newhist)+1,len(hist)) + + def test_purge_all_results(self): + self.client.purge_results('all') + hist = self.client.hub_history() + self.assertEquals(len(hist), 0)