##// END OF EJS Templates
Merge pull request #2340 from JanSchulz/client-clear-cache...
Min RK -
r9153:9d021a36 merge
parent child Browse files
Show More
@@ -1389,7 +1389,7 class Client(HasTraits):
1389 raise TypeError("indices must be str or int, not %r"%id)
1389 raise TypeError("indices must be str or int, not %r"%id)
1390 theids.append(id)
1390 theids.append(id)
1391
1391
1392 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1392 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1393 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1393 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1394
1394
1395 if remote_ids:
1395 if remote_ids:
@@ -1605,8 +1605,77 class Client(HasTraits):
1605 else:
1605 else:
1606 return content
1606 return content
1607
1607
1608 def _build_msgids_from_target(self, targets=None):
1609 """Build a list of msg_ids from the list of engine targets"""
1610 if not targets: # needed as _build_targets otherwise uses all engines
1611 return []
1612 target_ids = self._build_targets(targets)[0]
1613 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1614
1615 def _build_msgids_from_jobs(self, jobs=None):
1616 """Build a list of msg_ids from "jobs" """
1617 if not jobs:
1618 return []
1619 msg_ids = []
1620 if isinstance(jobs, (basestring,AsyncResult)):
1621 jobs = [jobs]
1622 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1623 if bad_ids:
1624 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1625 for j in jobs:
1626 if isinstance(j, AsyncResult):
1627 msg_ids.extend(j.msg_ids)
1628 else:
1629 msg_ids.append(j)
1630 return msg_ids
1631
1632 def purge_local_results(self, jobs=[], targets=[]):
1633 """Clears the client caches of results and frees such memory.
1634
1635 Individual results can be purged by msg_id, or the entire
1636 history of specific targets can be purged.
1637
1638 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1639
1640 The client must have no outstanding tasks before purging the caches.
1641 Raises `AssertionError` if there are still outstanding tasks.
1642
1643 After this call all `AsyncResults` are invalid and should be discarded.
1644
1645 If you must "reget" the results, you can still do so by using
1646 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1647 redownload the results from the hub if they are still available
1648 (i.e `client.purge_hub_results(...)` has not been called.
1649
1650 Parameters
1651 ----------
1652
1653 jobs : str or list of str or AsyncResult objects
1654 the msg_ids whose results should be purged.
1655 targets : int/str/list of ints/strs
1656 The targets, by int_id, whose entire results are to be purged.
1657
1658 default : None
1659 """
1660 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1661
1662 if not targets and not jobs:
1663 raise ValueError("Must specify at least one of `targets` and `jobs`")
1664
1665 if jobs == 'all':
1666 self.results.clear()
1667 self.metadata.clear()
1668 return
1669 else:
1670 msg_ids = []
1671 msg_ids.extend(self._build_msgids_from_target(targets))
1672 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1673 map(self.results.pop, msg_ids)
1674 map(self.metadata.pop, msg_ids)
1675
1676
1608 @spin_first
1677 @spin_first
1609 def purge_results(self, jobs=[], targets=[]):
1678 def purge_hub_results(self, jobs=[], targets=[]):
1610 """Tell the Hub to forget results.
1679 """Tell the Hub to forget results.
1611
1680
1612 Individual results can be purged by msg_id, or the entire
1681 Individual results can be purged by msg_id, or the entire
@@ -1633,17 +1702,7 class Client(HasTraits):
1633 if jobs == 'all':
1702 if jobs == 'all':
1634 msg_ids = jobs
1703 msg_ids = jobs
1635 else:
1704 else:
1636 msg_ids = []
1705 msg_ids = self._build_msgids_from_jobs(jobs)
1637 if isinstance(jobs, (basestring,AsyncResult)):
1638 jobs = [jobs]
1639 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1640 if bad_ids:
1641 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1642 for j in jobs:
1643 if isinstance(j, AsyncResult):
1644 msg_ids.extend(j.msg_ids)
1645 else:
1646 msg_ids.append(j)
1647
1706
1648 content = dict(engine_ids=targets, msg_ids=msg_ids)
1707 content = dict(engine_ids=targets, msg_ids=msg_ids)
1649 self.session.send(self._query_socket, "purge_request", content=content)
1708 self.session.send(self._query_socket, "purge_request", content=content)
@@ -1654,6 +1713,41 class Client(HasTraits):
1654 if content['status'] != 'ok':
1713 if content['status'] != 'ok':
1655 raise self._unwrap_exception(content)
1714 raise self._unwrap_exception(content)
1656
1715
1716 def purge_results(self, jobs=[], targets=[]):
1717 """Clears the cached results from both the hub and the local client
1718
1719 Individual results can be purged by msg_id, or the entire
1720 history of specific targets can be purged.
1721
1722 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1723 the Client's db.
1724
1725 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1726 the same arguments.
1727
1728 Parameters
1729 ----------
1730
1731 jobs : str or list of str or AsyncResult objects
1732 the msg_ids whose results should be forgotten.
1733 targets : int/str/list of ints/strs
1734 The targets, by int_id, whose entire history is to be purged.
1735
1736 default : None
1737 """
1738 self.purge_local_results(jobs=jobs, targets=targets)
1739 self.purge_hub_results(jobs=jobs, targets=targets)
1740
1741 def purge_everything(self):
1742 """Clears all content from previous Tasks from both the hub and the local client
1743
1744 In addition to calling `purge_results("all")` it also deletes the history and
1745 other bookkeeping lists.
1746 """
1747 self.purge_results("all")
1748 self.history = []
1749 self.session.digest_history.clear()
1750
1657 @spin_first
1751 @spin_first
1658 def hub_history(self):
1752 def hub_history(self):
1659 """Get the Hub's history
1753 """Get the Hub's history
@@ -400,7 +400,7 class TestClient(ClusterTestCase):
400 """ensure KeyError on resubmit of nonexistant task"""
400 """ensure KeyError on resubmit of nonexistant task"""
401 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
401 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
402
402
403 def test_purge_results(self):
403 def test_purge_hub_results(self):
404 # ensure there are some tasks
404 # ensure there are some tasks
405 for i in range(5):
405 for i in range(5):
406 self.client[:].apply_sync(lambda : 1)
406 self.client[:].apply_sync(lambda : 1)
@@ -412,16 +412,63 class TestClient(ClusterTestCase):
412 hist = self.client.hub_history()
412 hist = self.client.hub_history()
413 ahr = rc2.get_result([hist[-1]])
413 ahr = rc2.get_result([hist[-1]])
414 ahr.wait(10)
414 ahr.wait(10)
415 self.client.purge_results(hist[-1])
415 self.client.purge_hub_results(hist[-1])
416 newhist = self.client.hub_history()
416 newhist = self.client.hub_history()
417 self.assertEqual(len(newhist)+1,len(hist))
417 self.assertEqual(len(newhist)+1,len(hist))
418 rc2.spin()
418 rc2.spin()
419 rc2.close()
419 rc2.close()
420
421 def test_purge_local_results(self):
422 # ensure there are some tasks
423 res = []
424 for i in range(5):
425 res.append(self.client[:].apply_async(lambda : 1))
426 time.sleep(0.1)
427 self.client.wait(10) # wait for the results to come back
428 before = len(self.client.results)
429 self.assertEqual(len(self.client.metadata),before)
430 self.client.purge_local_results(res[-1])
431 self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
432 self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
420
433
434 def test_purge_all_hub_results(self):
435 self.client.purge_hub_results('all')
436 hist = self.client.hub_history()
437 self.assertEqual(len(hist), 0)
438
439 def test_purge_all_local_results(self):
440 self.client.purge_local_results('all')
441 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
442 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
443
421 def test_purge_all_results(self):
444 def test_purge_all_results(self):
445 # ensure there are some tasks
446 for i in range(5):
447 self.client[:].apply_sync(lambda : 1)
448 self.client.wait(10)
422 self.client.purge_results('all')
449 self.client.purge_results('all')
450 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
451 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
452 time.sleep(0.1)
453 hist = self.client.hub_history()#
454 self.assertEqual(len(hist), 0, msg="hub history not empty")
455
456 def test_purge_everything(self):
457 # ensure there are some tasks
458 for i in range(5):
459 self.client[:].apply_sync(lambda : 1)
460 self.client.wait(10)
461 self.client.purge_everything()
462 # The client results
463 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
464 self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
465 # the hub results
423 hist = self.client.hub_history()
466 hist = self.client.hub_history()
424 self.assertEqual(len(hist), 0)
467 self.assertEqual(len(hist), 0, msg="hub history not empty")
468 # The client "bookkeeping"
469 self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
470 self.assertEqual(len(self.client.history), 0, msg="client history not empty")
471
425
472
426 def test_spin_thread(self):
473 def test_spin_thread(self):
427 self.client.spin_thread(0.01)
474 self.client.spin_thread(0.01)
General Comments 0
You need to be logged in to leave comments. Login now