Show More
@@ -1659,15 +1659,13 b' class Client(HasTraits):' | |||||
1659 | return msg_ids |
|
1659 | return msg_ids | |
1660 |
|
1660 | |||
1661 | def purge_local_results(self, jobs=[], targets=[]): |
|
1661 | def purge_local_results(self, jobs=[], targets=[]): | |
1662 |
"""Clears the client caches of results and |
|
1662 | """Clears the client caches of results and their metadata. | |
1663 |
|
1663 | |||
1664 | Individual results can be purged by msg_id, or the entire |
|
1664 | Individual results can be purged by msg_id, or the entire | |
1665 | history of specific targets can be purged. |
|
1665 | history of specific targets can be purged. | |
1666 |
|
1666 | |||
1667 |
Use `purge_local_results('all')` to scrub everything from the Clients's |
|
1667 | Use `purge_local_results('all')` to scrub everything from the Clients's | |
1668 |
|
1668 | results and metadata caches. | ||
1669 | The client must have no outstanding tasks before purging the caches. |
|
|||
1670 | Raises `AssertionError` if there are still outstanding tasks. |
|
|||
1671 |
|
1669 | |||
1672 | After this call all `AsyncResults` are invalid and should be discarded. |
|
1670 | After this call all `AsyncResults` are invalid and should be discarded. | |
1673 |
|
1671 | |||
@@ -1681,24 +1679,30 b' class Client(HasTraits):' | |||||
1681 |
|
1679 | |||
1682 | jobs : str or list of str or AsyncResult objects |
|
1680 | jobs : str or list of str or AsyncResult objects | |
1683 | the msg_ids whose results should be purged. |
|
1681 | the msg_ids whose results should be purged. | |
1684 |
targets : int/ |
|
1682 | targets : int/list of ints | |
1685 |
The |
|
1683 | The engines, by integer ID, whose entire result histories are to be purged. | |
|
1684 | ||||
|
1685 | Raises | |||
|
1686 | ------ | |||
|
1687 | ||||
|
1688 | RuntimeError : if any of the tasks to be purged are still outstanding. | |||
1686 |
|
1689 | |||
1687 | default : None |
|
|||
1688 | """ |
|
1690 | """ | |
1689 | assert not self.outstanding, "Can't purge a client with outstanding tasks!" |
|
|||
1690 |
|
||||
1691 | if not targets and not jobs: |
|
1691 | if not targets and not jobs: | |
1692 | raise ValueError("Must specify at least one of `targets` and `jobs`") |
|
1692 | raise ValueError("Must specify at least one of `targets` and `jobs`") | |
1693 |
|
1693 | |||
1694 | if jobs == 'all': |
|
1694 | if jobs == 'all': | |
|
1695 | if self.outstanding: | |||
|
1696 | raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding) | |||
1695 | self.results.clear() |
|
1697 | self.results.clear() | |
1696 | self.metadata.clear() |
|
1698 | self.metadata.clear() | |
1697 | return |
|
|||
1698 | else: |
|
1699 | else: | |
1699 |
msg_ids = |
|
1700 | msg_ids = set() | |
1700 |
msg_ids. |
|
1701 | msg_ids.update(self._build_msgids_from_target(targets)) | |
1701 |
msg_ids. |
|
1702 | msg_ids.update(self._build_msgids_from_jobs(jobs)) | |
|
1703 | still_outstanding = self.outstanding.intersection(msg_ids) | |||
|
1704 | if still_outstanding: | |||
|
1705 | raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding) | |||
1702 | map(self.results.pop, msg_ids) |
|
1706 | map(self.results.pop, msg_ids) | |
1703 | map(self.metadata.pop, msg_ids) |
|
1707 | map(self.metadata.pop, msg_ids) | |
1704 |
|
1708 |
@@ -444,7 +444,32 b' class TestClient(ClusterTestCase):' | |||||
444 | self.client.purge_local_results(res[-1]) |
|
444 | self.client.purge_local_results(res[-1]) | |
445 | self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results") |
|
445 | self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results") | |
446 | self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata") |
|
446 | self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata") | |
447 |
|
447 | |||
|
448 | def test_purge_local_results_outstanding(self): | |||
|
449 | v = self.client[-1] | |||
|
450 | ar = v.apply_async(lambda : 1) | |||
|
451 | msg_id = ar.msg_ids[0] | |||
|
452 | ar.get() | |||
|
453 | self._wait_for_idle() | |||
|
454 | ar2 = v.apply_async(time.sleep, 1) | |||
|
455 | self.assertIn(msg_id, self.client.results) | |||
|
456 | self.assertIn(msg_id, self.client.metadata) | |||
|
457 | self.client.purge_local_results(ar) | |||
|
458 | self.assertNotIn(msg_id, self.client.results) | |||
|
459 | self.assertNotIn(msg_id, self.client.metadata) | |||
|
460 | with self.assertRaises(RuntimeError): | |||
|
461 | self.client.purge_local_results(ar2) | |||
|
462 | ar2.get() | |||
|
463 | self.client.purge_local_results(ar2) | |||
|
464 | ||||
|
465 | def test_purge_all_local_results_outstanding(self): | |||
|
466 | v = self.client[-1] | |||
|
467 | ar = v.apply_async(time.sleep, 1) | |||
|
468 | with self.assertRaises(RuntimeError): | |||
|
469 | self.client.purge_local_results('all') | |||
|
470 | ar.get() | |||
|
471 | self.client.purge_local_results('all') | |||
|
472 | ||||
448 | def test_purge_all_hub_results(self): |
|
473 | def test_purge_all_hub_results(self): | |
449 | self.client.purge_hub_results('all') |
|
474 | self.client.purge_hub_results('all') | |
450 | hist = self.client.hub_history() |
|
475 | hist = self.client.hub_history() |
General Comments 0
You need to be logged in to leave comments.
Login now