##// END OF EJS Templates
Backport PR #4722: allow purging local results as long as they are not outstanding...
MinRK -
Show More
@@ -1659,15 +1659,13 b' class Client(HasTraits):'
1659 return msg_ids
1659 return msg_ids
1660
1660
1661 def purge_local_results(self, jobs=[], targets=[]):
1661 def purge_local_results(self, jobs=[], targets=[]):
1662 """Clears the client caches of results and frees such memory.
1662 """Clears the client caches of results and their metadata.
1663
1663
1664 Individual results can be purged by msg_id, or the entire
1664 Individual results can be purged by msg_id, or the entire
1665 history of specific targets can be purged.
1665 history of specific targets can be purged.
1666
1666
1667 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1667 Use `purge_local_results('all')` to scrub everything from the Clients's
1668
1668 results and metadata caches.
1669 The client must have no outstanding tasks before purging the caches.
1670 Raises `AssertionError` if there are still outstanding tasks.
1671
1669
1672 After this call all `AsyncResults` are invalid and should be discarded.
1670 After this call all `AsyncResults` are invalid and should be discarded.
1673
1671
@@ -1681,24 +1679,30 b' class Client(HasTraits):'
1681
1679
1682 jobs : str or list of str or AsyncResult objects
1680 jobs : str or list of str or AsyncResult objects
1683 the msg_ids whose results should be purged.
1681 the msg_ids whose results should be purged.
1684 targets : int/str/list of ints/strs
1682 targets : int/list of ints
1685 The targets, by int_id, whose entire results are to be purged.
1683 The engines, by integer ID, whose entire result histories are to be purged.
1684
1685 Raises
1686 ------
1687
1688 RuntimeError : if any of the tasks to be purged are still outstanding.
1686
1689
1687 default : None
1688 """
1690 """
1689 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1690
1691 if not targets and not jobs:
1691 if not targets and not jobs:
1692 raise ValueError("Must specify at least one of `targets` and `jobs`")
1692 raise ValueError("Must specify at least one of `targets` and `jobs`")
1693
1693
1694 if jobs == 'all':
1694 if jobs == 'all':
1695 if self.outstanding:
1696 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1695 self.results.clear()
1697 self.results.clear()
1696 self.metadata.clear()
1698 self.metadata.clear()
1697 return
1698 else:
1699 else:
1699 msg_ids = []
1700 msg_ids = set()
1700 msg_ids.extend(self._build_msgids_from_target(targets))
1701 msg_ids.update(self._build_msgids_from_target(targets))
1701 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1702 msg_ids.update(self._build_msgids_from_jobs(jobs))
1703 still_outstanding = self.outstanding.intersection(msg_ids)
1704 if still_outstanding:
1705 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1702 map(self.results.pop, msg_ids)
1706 map(self.results.pop, msg_ids)
1703 map(self.metadata.pop, msg_ids)
1707 map(self.metadata.pop, msg_ids)
1704
1708
@@ -444,7 +444,32 b' class TestClient(ClusterTestCase):'
444 self.client.purge_local_results(res[-1])
444 self.client.purge_local_results(res[-1])
445 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
445 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
446 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
446 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
447
447
448 def test_purge_local_results_outstanding(self):
449 v = self.client[-1]
450 ar = v.apply_async(lambda : 1)
451 msg_id = ar.msg_ids[0]
452 ar.get()
453 self._wait_for_idle()
454 ar2 = v.apply_async(time.sleep, 1)
455 self.assertIn(msg_id, self.client.results)
456 self.assertIn(msg_id, self.client.metadata)
457 self.client.purge_local_results(ar)
458 self.assertNotIn(msg_id, self.client.results)
459 self.assertNotIn(msg_id, self.client.metadata)
460 with self.assertRaises(RuntimeError):
461 self.client.purge_local_results(ar2)
462 ar2.get()
463 self.client.purge_local_results(ar2)
464
465 def test_purge_all_local_results_outstanding(self):
466 v = self.client[-1]
467 ar = v.apply_async(time.sleep, 1)
468 with self.assertRaises(RuntimeError):
469 self.client.purge_local_results('all')
470 ar.get()
471 self.client.purge_local_results('all')
472
448 def test_purge_all_hub_results(self):
473 def test_purge_all_hub_results(self):
449 self.client.purge_hub_results('all')
474 self.client.purge_hub_results('all')
450 hist = self.client.hub_history()
475 hist = self.client.hub_history()
General Comments 0
You need to be logged in to leave comments. Login now