##// END OF EJS Templates
Add purging of client result cache to parallel.Client...
Jan Schulz -
Show More
@@ -1389,7 +1389,7 b' class Client(HasTraits):'
1389 raise TypeError("indices must be str or int, not %r"%id)
1389 raise TypeError("indices must be str or int, not %r"%id)
1390 theids.append(id)
1390 theids.append(id)
1391
1391
1392 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1392 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1393 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1393 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1394
1394
1395 if remote_ids:
1395 if remote_ids:
@@ -1558,7 +1558,7 b' class Client(HasTraits):'
1558 elif header['msg_type'] == 'execute_reply':
1558 elif header['msg_type'] == 'execute_reply':
1559 res = ExecuteReply(msg_id, rcontent, md)
1559 res = ExecuteReply(msg_id, rcontent, md)
1560 else:
1560 else:
1561 raise KeyError("unhandled msg type: %r" % header[msg_type])
1561 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1562 else:
1562 else:
1563 res = self._unwrap_exception(rcontent)
1563 res = self._unwrap_exception(rcontent)
1564 failures.append(res)
1564 failures.append(res)
@@ -1605,8 +1605,78 b' class Client(HasTraits):'
1605 else:
1605 else:
1606 return content
1606 return content
1607
1607
1608 def _build_msgids_from_target(self, targets=None):
1609 """Build a list of msg_ids from the list of engine targets"""
1610 if targets == None: # needed as _build_targets otherwise uses all engines
1611 return []
1612 target_ids = self._build_targets(targets)[0]
1613 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1614
1615 def _build_msgids_from_jobs(self, jobs=None):
1616 """Build a list of msg_ids from "jobs" """
1617 msg_ids = []
1618 if jobs == None:
1619 return msg_ids
1620
1621 if isinstance(jobs, (basestring,AsyncResult)):
1622 jobs = [jobs]
1623 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1624 if bad_ids:
1625 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1626 for j in jobs:
1627 if isinstance(j, AsyncResult):
1628 msg_ids.extend(j.msg_ids)
1629 else:
1630 msg_ids.append(j)
1631 return msg_ids
1632
1633 def purge_local_results(self, jobs=[], targets=[]):
1634 """Clears the client caches of results and frees such memory.
1635
1636 Individual results can be purged by msg_id, or the entire
1637 history of specific targets can be purged.
1638
1639 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1640
1641 The client must have no outstanding tasks before purging the caches.
1642 Raises `AssertionError` if there are still outstanding tasks.
1643
1644 After this call all `AsyncResults` are invalid and should be discarded.
1645
1646 If you must "reget" the results, you can still do so by using
1647 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1648 redownload the results from the hub if they are still available
1649 (i.e `client.purge_hub_results(...)` has not been called.
1650
1651 Parameters
1652 ----------
1653
1654 jobs : str or list of str or AsyncResult objects
1655 the msg_ids whose results should be purged.
1656 targets : int/str/list of ints/strs
1657 The targets, by int_id, whose entire results are to be purged.
1658
1659 default : None
1660 """
1661 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1662
1663 if not targets and not jobs:
1664 raise ValueError("Must specify at least one of `targets` and `jobs`")
1665
1666 if jobs == 'all':
1667 self.results.clear()
1668 self.metadata.clear()
1669 return
1670 else:
1671 msg_ids = []
1672 msg_ids.extend(self._build_msgids_from_target(targets))
1673 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1674 map(self.results.pop, msg_ids)
1675 map(self.metadata.pop, msg_ids)
1676
1677
1608 @spin_first
1678 @spin_first
1609 def purge_results(self, jobs=[], targets=[]):
1679 def purge_hub_results(self, jobs=[], targets=[]):
1610 """Tell the Hub to forget results.
1680 """Tell the Hub to forget results.
1611
1681
1612 Individual results can be purged by msg_id, or the entire
1682 Individual results can be purged by msg_id, or the entire
@@ -1633,17 +1703,7 b' class Client(HasTraits):'
1633 if jobs == 'all':
1703 if jobs == 'all':
1634 msg_ids = jobs
1704 msg_ids = jobs
1635 else:
1705 else:
1636 msg_ids = []
1706 msg_ids = self._build_msgids_from_jobs(jobs)
1637 if isinstance(jobs, (basestring,AsyncResult)):
1638 jobs = [jobs]
1639 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1640 if bad_ids:
1641 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1642 for j in jobs:
1643 if isinstance(j, AsyncResult):
1644 msg_ids.extend(j.msg_ids)
1645 else:
1646 msg_ids.append(j)
1647
1707
1648 content = dict(engine_ids=targets, msg_ids=msg_ids)
1708 content = dict(engine_ids=targets, msg_ids=msg_ids)
1649 self.session.send(self._query_socket, "purge_request", content=content)
1709 self.session.send(self._query_socket, "purge_request", content=content)
@@ -1654,6 +1714,41 b' class Client(HasTraits):'
1654 if content['status'] != 'ok':
1714 if content['status'] != 'ok':
1655 raise self._unwrap_exception(content)
1715 raise self._unwrap_exception(content)
1656
1716
1717 def purge_results(self, jobs=[], targets=[]):
1718 """Clears the cached results from both the hub and the local client
1719
1720 Individual results can be purged by msg_id, or the entire
1721 history of specific targets can be purged.
1722
1723 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1724 the Client's db.
1725
1726 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1727 the same arguments.
1728
1729 Parameters
1730 ----------
1731
1732 jobs : str or list of str or AsyncResult objects
1733 the msg_ids whose results should be forgotten.
1734 targets : int/str/list of ints/strs
1735 The targets, by int_id, whose entire history is to be purged.
1736
1737 default : None
1738 """
1739 self.purge_local_results(jobs=jobs, targets=targets)
1740 self.purge_hub_results(jobs=jobs, targets=targets)
1741
1742 def purge_everything(self):
1743 """Clears all content from previous Tasks from both the hub and the local client
1744
1745 In addition to calling `purge_results("all")` it also deletes the history and
1746 other bookkeeping lists.
1747 """
1748 self.purge_results("all")
1749 self.history = []
1750 self.session.digest_history.clear()
1751
1657 @spin_first
1752 @spin_first
1658 def hub_history(self):
1753 def hub_history(self):
1659 """Get the Hub's history
1754 """Get the Hub's history
General Comments 0
You need to be logged in to leave comments. Login now