Show More
@@ -1305,14 +1305,15 class Client(HasTraits): | |||||
1305 | Individual results can be purged by msg_id, or the entire |
|
1305 | Individual results can be purged by msg_id, or the entire | |
1306 | history of specific targets can be purged. |
|
1306 | history of specific targets can be purged. | |
1307 |
|
1307 | |||
|
1308 | Use `purge_results('all')` to scrub everything from the Hub's db. | |||
|
1309 | ||||
1308 | Parameters |
|
1310 | Parameters | |
1309 | ---------- |
|
1311 | ---------- | |
1310 |
|
1312 | |||
1311 | jobs : str or list of str or AsyncResult objects |
|
1313 | jobs : str or list of str or AsyncResult objects | |
1312 | the msg_ids whose results should be forgotten. |
|
1314 | the msg_ids whose results should be forgotten. | |
1313 | targets : int/str/list of ints/strs |
|
1315 | targets : int/str/list of ints/strs | |
1314 |
The targets, by |
|
1316 | The targets, by int_id, whose entire history is to be purged. | |
1315 | Use `targets='all'` to scrub everything from the Hub's memory. |
|
|||
1316 |
|
1317 | |||
1317 | default : None |
|
1318 | default : None | |
1318 | """ |
|
1319 | """ | |
@@ -1322,19 +1323,22 class Client(HasTraits): | |||||
1322 | targets = self._build_targets(targets)[1] |
|
1323 | targets = self._build_targets(targets)[1] | |
1323 |
|
1324 | |||
1324 | # construct msg_ids from jobs |
|
1325 | # construct msg_ids from jobs | |
1325 | msg_ids = [] |
|
1326 | if jobs == 'all': | |
1326 | if isinstance(jobs, (basestring,AsyncResult)): |
|
1327 | msg_ids = jobs | |
1327 | jobs = [jobs] |
|
1328 | else: | |
1328 | bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) |
|
1329 | msg_ids = [] | |
1329 | if bad_ids: |
|
1330 | if isinstance(jobs, (basestring,AsyncResult)): | |
1330 | raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) |
|
1331 | jobs = [jobs] | |
1331 | for j in jobs: |
|
1332 | bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | |
1332 | if isinstance(j, AsyncResult): |
|
1333 | if bad_ids: | |
1333 | msg_ids.extend(j.msg_ids) |
|
1334 | raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | |
1334 |
|
|
1335 | for j in jobs: | |
1335 |
|
|
1336 | if isinstance(j, AsyncResult): | |
1336 |
|
1337 | msg_ids.extend(j.msg_ids) | ||
1337 | content = dict(targets=targets, msg_ids=msg_ids) |
|
1338 | else: | |
|
1339 | msg_ids.append(j) | |||
|
1340 | ||||
|
1341 | content = dict(engine_ids=targets, msg_ids=msg_ids) | |||
1338 | self.session.send(self._query_socket, "purge_request", content=content) |
|
1342 | self.session.send(self._query_socket, "purge_request", content=content) | |
1339 | idents, msg = self.session.recv(self._query_socket, 0) |
|
1343 | idents, msg = self.session.recv(self._query_socket, 0) | |
1340 | if self.debug: |
|
1344 | if self.debug: |
@@ -1063,6 +1063,7 class Hub(SessionFactory): | |||||
1063 | """Purge results from memory. This method is more valuable before we move |
|
1063 | """Purge results from memory. This method is more valuable before we move | |
1064 | to a DB based message storage mechanism.""" |
|
1064 | to a DB based message storage mechanism.""" | |
1065 | content = msg['content'] |
|
1065 | content = msg['content'] | |
|
1066 | self.log.info("Dropping records with %s", content) | |||
1066 | msg_ids = content.get('msg_ids', []) |
|
1067 | msg_ids = content.get('msg_ids', []) | |
1067 | reply = dict(status='ok') |
|
1068 | reply = dict(status='ok') | |
1068 | if msg_ids == 'all': |
|
1069 | if msg_ids == 'all': | |
@@ -1092,7 +1093,6 class Hub(SessionFactory): | |||||
1092 | except: |
|
1093 | except: | |
1093 | reply = error.wrap_exception() |
|
1094 | reply = error.wrap_exception() | |
1094 | break |
|
1095 | break | |
1095 | msg_ids = self.completed.pop(eid) |
|
|||
1096 | uid = self.engines[eid].queue |
|
1096 | uid = self.engines[eid].queue | |
1097 | try: |
|
1097 | try: | |
1098 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1098 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
@@ -242,8 +242,16 class TestClient(ClusterTestCase): | |||||
242 | self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) |
|
242 | self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) | |
243 |
|
243 | |||
244 | def test_purge_results(self): |
|
244 | def test_purge_results(self): | |
|
245 | # ensure there are some tasks | |||
|
246 | for i in range(5): | |||
|
247 | self.client[:].apply_sync(lambda : 1) | |||
245 | hist = self.client.hub_history() |
|
248 | hist = self.client.hub_history() | |
246 | self.client.purge_results(hist) |
|
249 | self.client.purge_results(hist[-1]) | |
247 | newhist = self.client.hub_history() |
|
250 | newhist = self.client.hub_history() | |
248 |
self.assert |
|
251 | self.assertEquals(len(newhist)+1,len(hist)) | |
|
252 | ||||
|
253 | def test_purge_all_results(self): | |||
|
254 | self.client.purge_results('all') | |||
|
255 | hist = self.client.hub_history() | |||
|
256 | self.assertEquals(len(hist), 0) | |||
249 |
|
257 |
General Comments 0
You need to be logged in to leave comments.
Login now