Show More
@@ -1389,7 +1389,7 b' class Client(HasTraits):' | |||
|
1389 | 1389 | raise TypeError("indices must be str or int, not %r"%id) |
|
1390 | 1390 | theids.append(id) |
|
1391 | 1391 | |
|
1392 |
local_ids = filter(lambda msg_id: msg_id in self. |
|
|
1392 | local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids) | |
|
1393 | 1393 | remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) |
|
1394 | 1394 | |
|
1395 | 1395 | if remote_ids: |
@@ -1558,7 +1558,7 b' class Client(HasTraits):' | |||
|
1558 | 1558 | elif header['msg_type'] == 'execute_reply': |
|
1559 | 1559 | res = ExecuteReply(msg_id, rcontent, md) |
|
1560 | 1560 | else: |
|
1561 | raise KeyError("unhandled msg type: %r" % header[msg_type]) | |
|
1561 | raise KeyError("unhandled msg type: %r" % header['msg_type']) | |
|
1562 | 1562 | else: |
|
1563 | 1563 | res = self._unwrap_exception(rcontent) |
|
1564 | 1564 | failures.append(res) |
@@ -1605,8 +1605,78 b' class Client(HasTraits):' | |||
|
1605 | 1605 | else: |
|
1606 | 1606 | return content |
|
1607 | 1607 | |
|
1608 | def _build_msgids_from_target(self, targets=None): | |
|
1609 | """Build a list of msg_ids from the list of engine targets""" | |
|
1610 | if targets == None: # needed as _build_targets otherwise uses all engines | |
|
1611 | return [] | |
|
1612 | target_ids = self._build_targets(targets)[0] | |
|
1613 | return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata) | |
|
1614 | ||
|
1615 | def _build_msgids_from_jobs(self, jobs=None): | |
|
1616 | """Build a list of msg_ids from "jobs" """ | |
|
1617 | msg_ids = [] | |
|
1618 | if jobs == None: | |
|
1619 | return msg_ids | |
|
1620 | ||
|
1621 | if isinstance(jobs, (basestring,AsyncResult)): | |
|
1622 | jobs = [jobs] | |
|
1623 | bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | |
|
1624 | if bad_ids: | |
|
1625 | raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | |
|
1626 | for j in jobs: | |
|
1627 | if isinstance(j, AsyncResult): | |
|
1628 | msg_ids.extend(j.msg_ids) | |
|
1629 | else: | |
|
1630 | msg_ids.append(j) | |
|
1631 | return msg_ids | |
|
1632 | ||
|
1633 | def purge_local_results(self, jobs=[], targets=[]): | |
|
1634 | """Clears the client caches of results and frees such memory. | |
|
1635 | ||
|
1636 | Individual results can be purged by msg_id, or the entire | |
|
1637 | history of specific targets can be purged. | |
|
1638 | ||
|
1639 | Use `purge_local_results('all')` to scrub everything from the Clients's db. | |
|
1640 | ||
|
1641 | The client must have no outstanding tasks before purging the caches. | |
|
1642 | Raises `AssertionError` if there are still outstanding tasks. | |
|
1643 | ||
|
1644 | After this call all `AsyncResults` are invalid and should be discarded. | |
|
1645 | ||
|
1646 | If you must "reget" the results, you can still do so by using | |
|
1647 | `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will | |
|
1648 | redownload the results from the hub if they are still available | |
|
1649 | (i.e `client.purge_hub_results(...)` has not been called. | |
|
1650 | ||
|
1651 | Parameters | |
|
1652 | ---------- | |
|
1653 | ||
|
1654 | jobs : str or list of str or AsyncResult objects | |
|
1655 | the msg_ids whose results should be purged. | |
|
1656 | targets : int/str/list of ints/strs | |
|
1657 | The targets, by int_id, whose entire results are to be purged. | |
|
1658 | ||
|
1659 | default : None | |
|
1660 | """ | |
|
1661 | assert not self.outstanding, "Can't purge a client with outstanding tasks!" | |
|
1662 | ||
|
1663 | if not targets and not jobs: | |
|
1664 | raise ValueError("Must specify at least one of `targets` and `jobs`") | |
|
1665 | ||
|
1666 | if jobs == 'all': | |
|
1667 | self.results.clear() | |
|
1668 | self.metadata.clear() | |
|
1669 | return | |
|
1670 | else: | |
|
1671 | msg_ids = [] | |
|
1672 | msg_ids.extend(self._build_msgids_from_target(targets)) | |
|
1673 | msg_ids.extend(self._build_msgids_from_jobs(jobs)) | |
|
1674 | map(self.results.pop, msg_ids) | |
|
1675 | map(self.metadata.pop, msg_ids) | |
|
1676 | ||
|
1677 | ||
|
1608 | 1678 | @spin_first |
|
1609 | def purge_results(self, jobs=[], targets=[]): | |
|
1679 | def purge_hub_results(self, jobs=[], targets=[]): | |
|
1610 | 1680 | """Tell the Hub to forget results. |
|
1611 | 1681 | |
|
1612 | 1682 | Individual results can be purged by msg_id, or the entire |
@@ -1633,17 +1703,7 b' class Client(HasTraits):' | |||
|
1633 | 1703 | if jobs == 'all': |
|
1634 | 1704 | msg_ids = jobs |
|
1635 | 1705 | else: |
|
1636 | msg_ids = [] | |
|
1637 | if isinstance(jobs, (basestring,AsyncResult)): | |
|
1638 | jobs = [jobs] | |
|
1639 | bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | |
|
1640 | if bad_ids: | |
|
1641 | raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | |
|
1642 | for j in jobs: | |
|
1643 | if isinstance(j, AsyncResult): | |
|
1644 | msg_ids.extend(j.msg_ids) | |
|
1645 | else: | |
|
1646 | msg_ids.append(j) | |
|
1706 | msg_ids = self._build_msgids_from_jobs(jobs) | |
|
1647 | 1707 | |
|
1648 | 1708 | content = dict(engine_ids=targets, msg_ids=msg_ids) |
|
1649 | 1709 | self.session.send(self._query_socket, "purge_request", content=content) |
@@ -1654,6 +1714,41 b' class Client(HasTraits):' | |||
|
1654 | 1714 | if content['status'] != 'ok': |
|
1655 | 1715 | raise self._unwrap_exception(content) |
|
1656 | 1716 | |
|
1717 | def purge_results(self, jobs=[], targets=[]): | |
|
1718 | """Clears the cached results from both the hub and the local client | |
|
1719 | ||
|
1720 | Individual results can be purged by msg_id, or the entire | |
|
1721 | history of specific targets can be purged. | |
|
1722 | ||
|
1723 | Use `purge_results('all')` to scrub every cached result from both the Hub's and | |
|
1724 | the Client's db. | |
|
1725 | ||
|
1726 | Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with | |
|
1727 | the same arguments. | |
|
1728 | ||
|
1729 | Parameters | |
|
1730 | ---------- | |
|
1731 | ||
|
1732 | jobs : str or list of str or AsyncResult objects | |
|
1733 | the msg_ids whose results should be forgotten. | |
|
1734 | targets : int/str/list of ints/strs | |
|
1735 | The targets, by int_id, whose entire history is to be purged. | |
|
1736 | ||
|
1737 | default : None | |
|
1738 | """ | |
|
1739 | self.purge_local_results(jobs=jobs, targets=targets) | |
|
1740 | self.purge_hub_results(jobs=jobs, targets=targets) | |
|
1741 | ||
|
1742 | def purge_everything(self): | |
|
1743 | """Clears all content from previous Tasks from both the hub and the local client | |
|
1744 | ||
|
1745 | In addition to calling `purge_results("all")` it also deletes the history and | |
|
1746 | other bookkeeping lists. | |
|
1747 | """ | |
|
1748 | self.purge_results("all") | |
|
1749 | self.history = [] | |
|
1750 | self.session.digest_history.clear() | |
|
1751 | ||
|
1657 | 1752 | @spin_first |
|
1658 | 1753 | def hub_history(self): |
|
1659 | 1754 | """Get the Hub's history |
General Comments 0
You need to be logged in to leave comments.
Login now