Show More
@@ -1661,15 +1661,13 b' class Client(HasTraits):' | |||||
1661 | return msg_ids |
|
1661 | return msg_ids | |
1662 |
|
1662 | |||
1663 | def purge_local_results(self, jobs=[], targets=[]): |
|
1663 | def purge_local_results(self, jobs=[], targets=[]): | |
1664 |
"""Clears the client caches of results and |
|
1664 | """Clears the client caches of results and their metadata. | |
1665 |
|
1665 | |||
1666 | Individual results can be purged by msg_id, or the entire |
|
1666 | Individual results can be purged by msg_id, or the entire | |
1667 | history of specific targets can be purged. |
|
1667 | history of specific targets can be purged. | |
1668 |
|
1668 | |||
1669 |
Use `purge_local_results('all')` to scrub everything from the Clients's |
|
1669 | Use `purge_local_results('all')` to scrub everything from the Clients's | |
1670 |
|
1670 | results and metadata caches. | ||
1671 | The client must have no outstanding tasks before purging the caches. |
|
|||
1672 | Raises `AssertionError` if there are still outstanding tasks. |
|
|||
1673 |
|
1671 | |||
1674 | After this call all `AsyncResults` are invalid and should be discarded. |
|
1672 | After this call all `AsyncResults` are invalid and should be discarded. | |
1675 |
|
1673 | |||
@@ -1683,24 +1681,30 b' class Client(HasTraits):' | |||||
1683 |
|
1681 | |||
1684 | jobs : str or list of str or AsyncResult objects |
|
1682 | jobs : str or list of str or AsyncResult objects | |
1685 | the msg_ids whose results should be purged. |
|
1683 | the msg_ids whose results should be purged. | |
1686 |
targets : int/ |
|
1684 | targets : int/list of ints | |
1687 |
The |
|
1685 | The engines, by integer ID, whose entire result histories are to be purged. | |
1688 |
|
1686 | |||
1689 | default : None |
|
1687 | Raises | |
1690 |
|
|
1688 | ------ | |
1691 | assert not self.outstanding, "Can't purge a client with outstanding tasks!" |
|
1689 | ||
|
1690 | RuntimeError : if any of the tasks to be purged are still outstanding. | |||
1692 |
|
1691 | |||
|
1692 | """ | |||
1693 | if not targets and not jobs: |
|
1693 | if not targets and not jobs: | |
1694 | raise ValueError("Must specify at least one of `targets` and `jobs`") |
|
1694 | raise ValueError("Must specify at least one of `targets` and `jobs`") | |
1695 |
|
|
1695 | ||
1696 | if jobs == 'all': |
|
1696 | if jobs == 'all': | |
|
1697 | if self.outstanding: | |||
|
1698 | raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding) | |||
1697 | self.results.clear() |
|
1699 | self.results.clear() | |
1698 | self.metadata.clear() |
|
1700 | self.metadata.clear() | |
1699 | return |
|
|||
1700 | else: |
|
1701 | else: | |
1701 |
msg_ids = |
|
1702 | msg_ids = set() | |
1702 |
msg_ids. |
|
1703 | msg_ids.update(self._build_msgids_from_target(targets)) | |
1703 |
msg_ids. |
|
1704 | msg_ids.update(self._build_msgids_from_jobs(jobs)) | |
|
1705 | still_outstanding = self.outstanding.intersection(msg_ids) | |||
|
1706 | if still_outstanding: | |||
|
1707 | raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding) | |||
1704 | for mid in msg_ids: |
|
1708 | for mid in msg_ids: | |
1705 | self.results.pop(mid) |
|
1709 | self.results.pop(mid) | |
1706 | self.metadata.pop(mid) |
|
1710 | self.metadata.pop(mid) |
@@ -450,6 +450,31 b' class TestClient(ClusterTestCase):' | |||||
450 | self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results") |
|
450 | self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results") | |
451 | self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata") |
|
451 | self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata") | |
452 |
|
|
452 | ||
|
453 | def test_purge_local_results_outstanding(self): | |||
|
454 | v = self.client[-1] | |||
|
455 | ar = v.apply_async(lambda : 1) | |||
|
456 | msg_id = ar.msg_ids[0] | |||
|
457 | ar.get() | |||
|
458 | self._wait_for_idle() | |||
|
459 | ar2 = v.apply_async(time.sleep, 1) | |||
|
460 | self.assertIn(msg_id, self.client.results) | |||
|
461 | self.assertIn(msg_id, self.client.metadata) | |||
|
462 | self.client.purge_local_results(ar) | |||
|
463 | self.assertNotIn(msg_id, self.client.results) | |||
|
464 | self.assertNotIn(msg_id, self.client.metadata) | |||
|
465 | with self.assertRaises(RuntimeError): | |||
|
466 | self.client.purge_local_results(ar2) | |||
|
467 | ar2.get() | |||
|
468 | self.client.purge_local_results(ar2) | |||
|
469 | ||||
|
470 | def test_purge_all_local_results_outstanding(self): | |||
|
471 | v = self.client[-1] | |||
|
472 | ar = v.apply_async(time.sleep, 1) | |||
|
473 | with self.assertRaises(RuntimeError): | |||
|
474 | self.client.purge_local_results('all') | |||
|
475 | ar.get() | |||
|
476 | self.client.purge_local_results('all') | |||
|
477 | ||||
453 | def test_purge_all_hub_results(self): |
|
478 | def test_purge_all_hub_results(self): | |
454 | self.client.purge_hub_results('all') |
|
479 | self.client.purge_hub_results('all') | |
455 | hist = self.client.hub_history() |
|
480 | hist = self.client.hub_history() |
General Comments 0
You need to be logged in to leave comments.
Login now