##// END OF EJS Templates
fix purge_results for args other than specified msg_id...
MinRK -
Show More
@@ -1305,14 +1305,15 class Client(HasTraits):
1305 1305 Individual results can be purged by msg_id, or the entire
1306 1306 history of specific targets can be purged.
1307 1307
1308 Use `purge_results('all')` to scrub everything from the Hub's db.
1309
1308 1310 Parameters
1309 1311 ----------
1310 1312
1311 1313 jobs : str or list of str or AsyncResult objects
1312 1314 the msg_ids whose results should be forgotten.
1313 1315 targets : int/str/list of ints/strs
1314 The targets, by uuid or int_id, whose entire history is to be purged.
1315 Use `targets='all'` to scrub everything from the Hub's memory.
1316 The targets, by int_id, whose entire history is to be purged.
1316 1317
1317 1318 default : None
1318 1319 """
@@ -1322,19 +1323,22 class Client(HasTraits):
1322 1323 targets = self._build_targets(targets)[1]
1323 1324
1324 1325 # construct msg_ids from jobs
1325 msg_ids = []
1326 if isinstance(jobs, (basestring,AsyncResult)):
1327 jobs = [jobs]
1328 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1329 if bad_ids:
1330 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1331 for j in jobs:
1332 if isinstance(j, AsyncResult):
1333 msg_ids.extend(j.msg_ids)
1334 else:
1335 msg_ids.append(j)
1336
1337 content = dict(targets=targets, msg_ids=msg_ids)
1326 if jobs == 'all':
1327 msg_ids = jobs
1328 else:
1329 msg_ids = []
1330 if isinstance(jobs, (basestring,AsyncResult)):
1331 jobs = [jobs]
1332 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1333 if bad_ids:
1334 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1335 for j in jobs:
1336 if isinstance(j, AsyncResult):
1337 msg_ids.extend(j.msg_ids)
1338 else:
1339 msg_ids.append(j)
1340
1341 content = dict(engine_ids=targets, msg_ids=msg_ids)
1338 1342 self.session.send(self._query_socket, "purge_request", content=content)
1339 1343 idents, msg = self.session.recv(self._query_socket, 0)
1340 1344 if self.debug:
@@ -1063,6 +1063,7 class Hub(SessionFactory):
1063 1063 """Purge results from memory. This method is more valuable before we move
1064 1064 to a DB based message storage mechanism."""
1065 1065 content = msg['content']
1066 self.log.info("Dropping records with %s", content)
1066 1067 msg_ids = content.get('msg_ids', [])
1067 1068 reply = dict(status='ok')
1068 1069 if msg_ids == 'all':
@@ -1092,7 +1093,6 class Hub(SessionFactory):
1092 1093 except:
1093 1094 reply = error.wrap_exception()
1094 1095 break
1095 msg_ids = self.completed.pop(eid)
1096 1096 uid = self.engines[eid].queue
1097 1097 try:
1098 1098 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
@@ -242,8 +242,16 class TestClient(ClusterTestCase):
242 242 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
243 243
244 244 def test_purge_results(self):
245 # ensure there are some tasks
246 for i in range(5):
247 self.client[:].apply_sync(lambda : 1)
245 248 hist = self.client.hub_history()
246 self.client.purge_results(hist)
249 self.client.purge_results(hist[-1])
247 250 newhist = self.client.hub_history()
248 self.assertTrue(len(newhist) == 0)
251 self.assertEquals(len(newhist)+1,len(hist))
252
253 def test_purge_all_results(self):
254 self.client.purge_results('all')
255 hist = self.client.hub_history()
256 self.assertEquals(len(hist), 0)
249 257
General Comments 0
You need to be logged in to leave comments. Login now