##// END OF EJS Templates
Merge pull request #4722 from minrk/purge-outstanding...
Min RK -
r13920:c24b633d merge
parent child Browse files
Show More
@@ -1661,15 +1661,13 b' class Client(HasTraits):'
1661 1661 return msg_ids
1662 1662
1663 1663 def purge_local_results(self, jobs=[], targets=[]):
1664 """Clears the client caches of results and frees such memory.
1664 """Clears the client caches of results and their metadata.
1665 1665
1666 1666 Individual results can be purged by msg_id, or the entire
1667 1667 history of specific targets can be purged.
1668 1668
1669 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1670
1671 The client must have no outstanding tasks before purging the caches.
1672 Raises `AssertionError` if there are still outstanding tasks.
1669 Use `purge_local_results('all')` to scrub everything from the Clients's
1670 results and metadata caches.
1673 1671
1674 1672 After this call all `AsyncResults` are invalid and should be discarded.
1675 1673
@@ -1683,24 +1681,30 b' class Client(HasTraits):'
1683 1681
1684 1682 jobs : str or list of str or AsyncResult objects
1685 1683 the msg_ids whose results should be purged.
1686 targets : int/str/list of ints/strs
1687 The targets, by int_id, whose entire results are to be purged.
1684 targets : int/list of ints
1685 The engines, by integer ID, whose entire result histories are to be purged.
1688 1686
1689 default : None
1690 """
1691 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1687 Raises
1688 ------
1689
1690 RuntimeError : if any of the tasks to be purged are still outstanding.
1692 1691
1692 """
1693 1693 if not targets and not jobs:
1694 1694 raise ValueError("Must specify at least one of `targets` and `jobs`")
1695 1695
1696 1696 if jobs == 'all':
1697 if self.outstanding:
1698 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1697 1699 self.results.clear()
1698 1700 self.metadata.clear()
1699 return
1700 1701 else:
1701 msg_ids = []
1702 msg_ids.extend(self._build_msgids_from_target(targets))
1703 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1702 msg_ids = set()
1703 msg_ids.update(self._build_msgids_from_target(targets))
1704 msg_ids.update(self._build_msgids_from_jobs(jobs))
1705 still_outstanding = self.outstanding.intersection(msg_ids)
1706 if still_outstanding:
1707 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1704 1708 for mid in msg_ids:
1705 1709 self.results.pop(mid)
1706 1710 self.metadata.pop(mid)
@@ -450,6 +450,31 b' class TestClient(ClusterTestCase):'
450 450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
451 451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
452 452
453 def test_purge_local_results_outstanding(self):
454 v = self.client[-1]
455 ar = v.apply_async(lambda : 1)
456 msg_id = ar.msg_ids[0]
457 ar.get()
458 self._wait_for_idle()
459 ar2 = v.apply_async(time.sleep, 1)
460 self.assertIn(msg_id, self.client.results)
461 self.assertIn(msg_id, self.client.metadata)
462 self.client.purge_local_results(ar)
463 self.assertNotIn(msg_id, self.client.results)
464 self.assertNotIn(msg_id, self.client.metadata)
465 with self.assertRaises(RuntimeError):
466 self.client.purge_local_results(ar2)
467 ar2.get()
468 self.client.purge_local_results(ar2)
469
470 def test_purge_all_local_results_outstanding(self):
471 v = self.client[-1]
472 ar = v.apply_async(time.sleep, 1)
473 with self.assertRaises(RuntimeError):
474 self.client.purge_local_results('all')
475 ar.get()
476 self.client.purge_local_results('all')
477
453 478 def test_purge_all_hub_results(self):
454 479 self.client.purge_hub_results('all')
455 480 hist = self.client.hub_history()
General Comments 0
You need to be logged in to leave comments. Login now