diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 66576e7..b87b6a5 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -1389,7 +1389,7 @@ class Client(HasTraits): raise TypeError("indices must be str or int, not %r"%id) theids.append(id) - local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids) + local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids) remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) if remote_ids: @@ -1558,7 +1558,7 @@ class Client(HasTraits): elif header['msg_type'] == 'execute_reply': res = ExecuteReply(msg_id, rcontent, md) else: - raise KeyError("unhandled msg type: %r" % header[msg_type]) + raise KeyError("unhandled msg type: %r" % header['msg_type']) else: res = self._unwrap_exception(rcontent) failures.append(res) @@ -1605,8 +1605,78 @@ class Client(HasTraits): else: return content + def _build_msgids_from_target(self, targets=None): + """Build a list of msg_ids from the list of engine targets""" + if targets == None: # needed as _build_targets otherwise uses all engines + return [] + target_ids = self._build_targets(targets)[0] + return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata) + + def _build_msgids_from_jobs(self, jobs=None): + """Build a list of msg_ids from "jobs" """ + msg_ids = [] + if jobs == None: + return msg_ids + + if isinstance(jobs, (basestring,AsyncResult)): + jobs = [jobs] + bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) + if bad_ids: + raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) + for j in jobs: + if isinstance(j, AsyncResult): + msg_ids.extend(j.msg_ids) + else: + msg_ids.append(j) + return msg_ids + + def purge_local_results(self, jobs=[], targets=[]): + """Clears the client caches of results and frees such memory. + + Individual results can be purged by msg_id, or the entire + history of specific targets can be purged. + + Use `purge_local_results('all')` to scrub everything from the Clients's db. + + The client must have no outstanding tasks before purging the caches. + Raises `AssertionError` if there are still outstanding tasks. + + After this call all `AsyncResults` are invalid and should be discarded. + + If you must "reget" the results, you can still do so by using + `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will + redownload the results from the hub if they are still available + (i.e `client.purge_hub_results(...)` has not been called. + + Parameters + ---------- + + jobs : str or list of str or AsyncResult objects + the msg_ids whose results should be purged. + targets : int/str/list of ints/strs + The targets, by int_id, whose entire results are to be purged. + + default : None + """ + assert not self.outstanding, "Can't purge a client with outstanding tasks!" + + if not targets and not jobs: + raise ValueError("Must specify at least one of `targets` and `jobs`") + + if jobs == 'all': + self.results.clear() + self.metadata.clear() + return + else: + msg_ids = [] + msg_ids.extend(self._build_msgids_from_target(targets)) + msg_ids.extend(self._build_msgids_from_jobs(jobs)) + map(self.results.pop, msg_ids) + map(self.metadata.pop, msg_ids) + + @spin_first - def purge_results(self, jobs=[], targets=[]): + def purge_hub_results(self, jobs=[], targets=[]): """Tell the Hub to forget results. Individual results can be purged by msg_id, or the entire @@ -1633,17 +1703,7 @@ class Client(HasTraits): if jobs == 'all': msg_ids = jobs else: - msg_ids = [] - if isinstance(jobs, (basestring,AsyncResult)): - jobs = [jobs] - bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) - if bad_ids: - raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) - for j in jobs: - if isinstance(j, AsyncResult): - msg_ids.extend(j.msg_ids) - else: - msg_ids.append(j) + msg_ids = self._build_msgids_from_jobs(jobs) content = dict(engine_ids=targets, msg_ids=msg_ids) self.session.send(self._query_socket, "purge_request", content=content) @@ -1654,6 +1714,41 @@ class Client(HasTraits): if content['status'] != 'ok': raise self._unwrap_exception(content) + def purge_results(self, jobs=[], targets=[]): + """Clears the cached results from both the hub and the local client + + Individual results can be purged by msg_id, or the entire + history of specific targets can be purged. + + Use `purge_results('all')` to scrub every cached result from both the Hub's and + the Client's db. + + Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with + the same arguments. + + Parameters + ---------- + + jobs : str or list of str or AsyncResult objects + the msg_ids whose results should be forgotten. + targets : int/str/list of ints/strs + The targets, by int_id, whose entire history is to be purged. + + default : None + """ + self.purge_local_results(jobs=jobs, targets=targets) + self.purge_hub_results(jobs=jobs, targets=targets) + + def purge_everything(self): + """Clears all content from previous Tasks from both the hub and the local client + + In addition to calling `purge_results("all")` it also deletes the history and + other bookkeeping lists. + """ + self.purge_results("all") + self.history = [] + self.session.digest_history.clear() + @spin_first def hub_history(self): """Get the Hub's history