##// END OF EJS Templates
Merge pull request #4722 from minrk/purge-outstanding...
Min RK -
r13920:c24b633d merge
parent child Browse files
Show More
@@ -1661,15 +1661,13 b' class Client(HasTraits):'
1661 return msg_ids
1661 return msg_ids
1662
1662
1663 def purge_local_results(self, jobs=[], targets=[]):
1663 def purge_local_results(self, jobs=[], targets=[]):
1664 """Clears the client caches of results and frees such memory.
1664 """Clears the client caches of results and their metadata.
1665
1665
1666 Individual results can be purged by msg_id, or the entire
1666 Individual results can be purged by msg_id, or the entire
1667 history of specific targets can be purged.
1667 history of specific targets can be purged.
1668
1668
1669 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1669 Use `purge_local_results('all')` to scrub everything from the Clients's
1670
1670 results and metadata caches.
1671 The client must have no outstanding tasks before purging the caches.
1672 Raises `AssertionError` if there are still outstanding tasks.
1673
1671
1674 After this call all `AsyncResults` are invalid and should be discarded.
1672 After this call all `AsyncResults` are invalid and should be discarded.
1675
1673
@@ -1683,24 +1681,30 b' class Client(HasTraits):'
1683
1681
1684 jobs : str or list of str or AsyncResult objects
1682 jobs : str or list of str or AsyncResult objects
1685 the msg_ids whose results should be purged.
1683 the msg_ids whose results should be purged.
1686 targets : int/str/list of ints/strs
1684 targets : int/list of ints
1687 The targets, by int_id, whose entire results are to be purged.
1685 The engines, by integer ID, whose entire result histories are to be purged.
1688
1686
1689 default : None
1687 Raises
1690 """
1688 ------
1691 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1689
1690 RuntimeError : if any of the tasks to be purged are still outstanding.
1692
1691
1692 """
1693 if not targets and not jobs:
1693 if not targets and not jobs:
1694 raise ValueError("Must specify at least one of `targets` and `jobs`")
1694 raise ValueError("Must specify at least one of `targets` and `jobs`")
1695
1695
1696 if jobs == 'all':
1696 if jobs == 'all':
1697 if self.outstanding:
1698 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1697 self.results.clear()
1699 self.results.clear()
1698 self.metadata.clear()
1700 self.metadata.clear()
1699 return
1700 else:
1701 else:
1701 msg_ids = []
1702 msg_ids = set()
1702 msg_ids.extend(self._build_msgids_from_target(targets))
1703 msg_ids.update(self._build_msgids_from_target(targets))
1703 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1704 msg_ids.update(self._build_msgids_from_jobs(jobs))
1705 still_outstanding = self.outstanding.intersection(msg_ids)
1706 if still_outstanding:
1707 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1704 for mid in msg_ids:
1708 for mid in msg_ids:
1705 self.results.pop(mid)
1709 self.results.pop(mid)
1706 self.metadata.pop(mid)
1710 self.metadata.pop(mid)
@@ -450,6 +450,31 b' class TestClient(ClusterTestCase):'
450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
450 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
451 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
452
452
453 def test_purge_local_results_outstanding(self):
454 v = self.client[-1]
455 ar = v.apply_async(lambda : 1)
456 msg_id = ar.msg_ids[0]
457 ar.get()
458 self._wait_for_idle()
459 ar2 = v.apply_async(time.sleep, 1)
460 self.assertIn(msg_id, self.client.results)
461 self.assertIn(msg_id, self.client.metadata)
462 self.client.purge_local_results(ar)
463 self.assertNotIn(msg_id, self.client.results)
464 self.assertNotIn(msg_id, self.client.metadata)
465 with self.assertRaises(RuntimeError):
466 self.client.purge_local_results(ar2)
467 ar2.get()
468 self.client.purge_local_results(ar2)
469
470 def test_purge_all_local_results_outstanding(self):
471 v = self.client[-1]
472 ar = v.apply_async(time.sleep, 1)
473 with self.assertRaises(RuntimeError):
474 self.client.purge_local_results('all')
475 ar.get()
476 self.client.purge_local_results('all')
477
453 def test_purge_all_hub_results(self):
478 def test_purge_all_hub_results(self):
454 self.client.purge_hub_results('all')
479 self.client.purge_hub_results('all')
455 hist = self.client.hub_history()
480 hist = self.client.hub_history()
General Comments 0
You need to be logged in to leave comments. Login now