Show More
@@ -1389,7 +1389,7 class Client(HasTraits): | |||||
1389 | raise TypeError("indices must be str or int, not %r"%id) |
|
1389 | raise TypeError("indices must be str or int, not %r"%id) | |
1390 | theids.append(id) |
|
1390 | theids.append(id) | |
1391 |
|
1391 | |||
1392 |
local_ids = filter(lambda msg_id: msg_id in self. |
|
1392 | local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids) | |
1393 | remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) |
|
1393 | remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) | |
1394 |
|
1394 | |||
1395 | if remote_ids: |
|
1395 | if remote_ids: | |
@@ -1605,8 +1605,77 class Client(HasTraits): | |||||
1605 | else: |
|
1605 | else: | |
1606 | return content |
|
1606 | return content | |
1607 |
|
1607 | |||
|
1608 | def _build_msgids_from_target(self, targets=None): | |||
|
1609 | """Build a list of msg_ids from the list of engine targets""" | |||
|
1610 | if not targets: # needed as _build_targets otherwise uses all engines | |||
|
1611 | return [] | |||
|
1612 | target_ids = self._build_targets(targets)[0] | |||
|
1613 | return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata) | |||
|
1614 | ||||
|
1615 | def _build_msgids_from_jobs(self, jobs=None): | |||
|
1616 | """Build a list of msg_ids from "jobs" """ | |||
|
1617 | if not jobs: | |||
|
1618 | return [] | |||
|
1619 | msg_ids = [] | |||
|
1620 | if isinstance(jobs, (basestring,AsyncResult)): | |||
|
1621 | jobs = [jobs] | |||
|
1622 | bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | |||
|
1623 | if bad_ids: | |||
|
1624 | raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | |||
|
1625 | for j in jobs: | |||
|
1626 | if isinstance(j, AsyncResult): | |||
|
1627 | msg_ids.extend(j.msg_ids) | |||
|
1628 | else: | |||
|
1629 | msg_ids.append(j) | |||
|
1630 | return msg_ids | |||
|
1631 | ||||
|
1632 | def purge_local_results(self, jobs=[], targets=[]): | |||
|
1633 | """Clears the client caches of results and frees such memory. | |||
|
1634 | ||||
|
1635 | Individual results can be purged by msg_id, or the entire | |||
|
1636 | history of specific targets can be purged. | |||
|
1637 | ||||
|
1638 | Use `purge_local_results('all')` to scrub everything from the Clients's db. | |||
|
1639 | ||||
|
1640 | The client must have no outstanding tasks before purging the caches. | |||
|
1641 | Raises `AssertionError` if there are still outstanding tasks. | |||
|
1642 | ||||
|
1643 | After this call all `AsyncResults` are invalid and should be discarded. | |||
|
1644 | ||||
|
1645 | If you must "reget" the results, you can still do so by using | |||
|
1646 | `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will | |||
|
1647 | redownload the results from the hub if they are still available | |||
|
1648 | (i.e `client.purge_hub_results(...)` has not been called. | |||
|
1649 | ||||
|
1650 | Parameters | |||
|
1651 | ---------- | |||
|
1652 | ||||
|
1653 | jobs : str or list of str or AsyncResult objects | |||
|
1654 | the msg_ids whose results should be purged. | |||
|
1655 | targets : int/str/list of ints/strs | |||
|
1656 | The targets, by int_id, whose entire results are to be purged. | |||
|
1657 | ||||
|
1658 | default : None | |||
|
1659 | """ | |||
|
1660 | assert not self.outstanding, "Can't purge a client with outstanding tasks!" | |||
|
1661 | ||||
|
1662 | if not targets and not jobs: | |||
|
1663 | raise ValueError("Must specify at least one of `targets` and `jobs`") | |||
|
1664 | ||||
|
1665 | if jobs == 'all': | |||
|
1666 | self.results.clear() | |||
|
1667 | self.metadata.clear() | |||
|
1668 | return | |||
|
1669 | else: | |||
|
1670 | msg_ids = [] | |||
|
1671 | msg_ids.extend(self._build_msgids_from_target(targets)) | |||
|
1672 | msg_ids.extend(self._build_msgids_from_jobs(jobs)) | |||
|
1673 | map(self.results.pop, msg_ids) | |||
|
1674 | map(self.metadata.pop, msg_ids) | |||
|
1675 | ||||
|
1676 | ||||
1608 | @spin_first |
|
1677 | @spin_first | |
1609 | def purge_results(self, jobs=[], targets=[]): |
|
1678 | def purge_hub_results(self, jobs=[], targets=[]): | |
1610 | """Tell the Hub to forget results. |
|
1679 | """Tell the Hub to forget results. | |
1611 |
|
1680 | |||
1612 | Individual results can be purged by msg_id, or the entire |
|
1681 | Individual results can be purged by msg_id, or the entire | |
@@ -1633,17 +1702,7 class Client(HasTraits): | |||||
1633 | if jobs == 'all': |
|
1702 | if jobs == 'all': | |
1634 | msg_ids = jobs |
|
1703 | msg_ids = jobs | |
1635 | else: |
|
1704 | else: | |
1636 | msg_ids = [] |
|
1705 | msg_ids = self._build_msgids_from_jobs(jobs) | |
1637 | if isinstance(jobs, (basestring,AsyncResult)): |
|
|||
1638 | jobs = [jobs] |
|
|||
1639 | bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) |
|
|||
1640 | if bad_ids: |
|
|||
1641 | raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) |
|
|||
1642 | for j in jobs: |
|
|||
1643 | if isinstance(j, AsyncResult): |
|
|||
1644 | msg_ids.extend(j.msg_ids) |
|
|||
1645 | else: |
|
|||
1646 | msg_ids.append(j) |
|
|||
1647 |
|
1706 | |||
1648 | content = dict(engine_ids=targets, msg_ids=msg_ids) |
|
1707 | content = dict(engine_ids=targets, msg_ids=msg_ids) | |
1649 | self.session.send(self._query_socket, "purge_request", content=content) |
|
1708 | self.session.send(self._query_socket, "purge_request", content=content) | |
@@ -1654,6 +1713,41 class Client(HasTraits): | |||||
1654 | if content['status'] != 'ok': |
|
1713 | if content['status'] != 'ok': | |
1655 | raise self._unwrap_exception(content) |
|
1714 | raise self._unwrap_exception(content) | |
1656 |
|
1715 | |||
|
1716 | def purge_results(self, jobs=[], targets=[]): | |||
|
1717 | """Clears the cached results from both the hub and the local client | |||
|
1718 | ||||
|
1719 | Individual results can be purged by msg_id, or the entire | |||
|
1720 | history of specific targets can be purged. | |||
|
1721 | ||||
|
1722 | Use `purge_results('all')` to scrub every cached result from both the Hub's and | |||
|
1723 | the Client's db. | |||
|
1724 | ||||
|
1725 | Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with | |||
|
1726 | the same arguments. | |||
|
1727 | ||||
|
1728 | Parameters | |||
|
1729 | ---------- | |||
|
1730 | ||||
|
1731 | jobs : str or list of str or AsyncResult objects | |||
|
1732 | the msg_ids whose results should be forgotten. | |||
|
1733 | targets : int/str/list of ints/strs | |||
|
1734 | The targets, by int_id, whose entire history is to be purged. | |||
|
1735 | ||||
|
1736 | default : None | |||
|
1737 | """ | |||
|
1738 | self.purge_local_results(jobs=jobs, targets=targets) | |||
|
1739 | self.purge_hub_results(jobs=jobs, targets=targets) | |||
|
1740 | ||||
|
1741 | def purge_everything(self): | |||
|
1742 | """Clears all content from previous Tasks from both the hub and the local client | |||
|
1743 | ||||
|
1744 | In addition to calling `purge_results("all")` it also deletes the history and | |||
|
1745 | other bookkeeping lists. | |||
|
1746 | """ | |||
|
1747 | self.purge_results("all") | |||
|
1748 | self.history = [] | |||
|
1749 | self.session.digest_history.clear() | |||
|
1750 | ||||
1657 | @spin_first |
|
1751 | @spin_first | |
1658 | def hub_history(self): |
|
1752 | def hub_history(self): | |
1659 | """Get the Hub's history |
|
1753 | """Get the Hub's history |
@@ -400,7 +400,7 class TestClient(ClusterTestCase): | |||||
400 | """ensure KeyError on resubmit of nonexistant task""" |
|
400 | """ensure KeyError on resubmit of nonexistant task""" | |
401 | self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) |
|
401 | self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) | |
402 |
|
402 | |||
403 | def test_purge_results(self): |
|
403 | def test_purge_hub_results(self): | |
404 | # ensure there are some tasks |
|
404 | # ensure there are some tasks | |
405 | for i in range(5): |
|
405 | for i in range(5): | |
406 | self.client[:].apply_sync(lambda : 1) |
|
406 | self.client[:].apply_sync(lambda : 1) | |
@@ -412,16 +412,63 class TestClient(ClusterTestCase): | |||||
412 | hist = self.client.hub_history() |
|
412 | hist = self.client.hub_history() | |
413 | ahr = rc2.get_result([hist[-1]]) |
|
413 | ahr = rc2.get_result([hist[-1]]) | |
414 | ahr.wait(10) |
|
414 | ahr.wait(10) | |
415 | self.client.purge_results(hist[-1]) |
|
415 | self.client.purge_hub_results(hist[-1]) | |
416 | newhist = self.client.hub_history() |
|
416 | newhist = self.client.hub_history() | |
417 | self.assertEqual(len(newhist)+1,len(hist)) |
|
417 | self.assertEqual(len(newhist)+1,len(hist)) | |
418 | rc2.spin() |
|
418 | rc2.spin() | |
419 | rc2.close() |
|
419 | rc2.close() | |
|
420 | ||||
|
421 | def test_purge_local_results(self): | |||
|
422 | # ensure there are some tasks | |||
|
423 | res = [] | |||
|
424 | for i in range(5): | |||
|
425 | res.append(self.client[:].apply_async(lambda : 1)) | |||
|
426 | time.sleep(0.1) | |||
|
427 | self.client.wait(10) # wait for the results to come back | |||
|
428 | before = len(self.client.results) | |||
|
429 | self.assertEqual(len(self.client.metadata),before) | |||
|
430 | self.client.purge_local_results(res[-1]) | |||
|
431 | self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results") | |||
|
432 | self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata") | |||
420 |
|
433 | |||
|
434 | def test_purge_all_hub_results(self): | |||
|
435 | self.client.purge_hub_results('all') | |||
|
436 | hist = self.client.hub_history() | |||
|
437 | self.assertEqual(len(hist), 0) | |||
|
438 | ||||
|
439 | def test_purge_all_local_results(self): | |||
|
440 | self.client.purge_local_results('all') | |||
|
441 | self.assertEqual(len(self.client.results), 0, msg="Results not empty") | |||
|
442 | self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty") | |||
|
443 | ||||
421 | def test_purge_all_results(self): |
|
444 | def test_purge_all_results(self): | |
|
445 | # ensure there are some tasks | |||
|
446 | for i in range(5): | |||
|
447 | self.client[:].apply_sync(lambda : 1) | |||
|
448 | self.client.wait(10) | |||
422 | self.client.purge_results('all') |
|
449 | self.client.purge_results('all') | |
|
450 | self.assertEqual(len(self.client.results), 0, msg="Results not empty") | |||
|
451 | self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty") | |||
|
452 | time.sleep(0.1) | |||
|
453 | hist = self.client.hub_history()# | |||
|
454 | self.assertEqual(len(hist), 0, msg="hub history not empty") | |||
|
455 | ||||
|
456 | def test_purge_everything(self): | |||
|
457 | # ensure there are some tasks | |||
|
458 | for i in range(5): | |||
|
459 | self.client[:].apply_sync(lambda : 1) | |||
|
460 | self.client.wait(10) | |||
|
461 | self.client.purge_everything() | |||
|
462 | # The client results | |||
|
463 | self.assertEqual(len(self.client.results), 0, msg="Results not empty") | |||
|
464 | self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty") | |||
|
465 | # the hub results | |||
423 | hist = self.client.hub_history() |
|
466 | hist = self.client.hub_history() | |
424 | self.assertEqual(len(hist), 0) |
|
467 | self.assertEqual(len(hist), 0, msg="hub history not empty") | |
|
468 | # The client "bookkeeping" | |||
|
469 | self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty") | |||
|
470 | self.assertEqual(len(self.client.history), 0, msg="client history not empty") | |||
|
471 | ||||
425 |
|
472 | |||
426 | def test_spin_thread(self): |
|
473 | def test_spin_thread(self): | |
427 | self.client.spin_thread(0.01) |
|
474 | self.client.spin_thread(0.01) |
General Comments 0
You need to be logged in to leave comments.
Login now