##// END OF EJS Templates
fix purge_results for args other than specified msg_id...
MinRK -
Show More
@@ -1305,14 +1305,15 b' class Client(HasTraits):'
1305 Individual results can be purged by msg_id, or the entire
1305 Individual results can be purged by msg_id, or the entire
1306 history of specific targets can be purged.
1306 history of specific targets can be purged.
1307
1307
1308 Use `purge_results('all')` to scrub everything from the Hub's db.
1309
1308 Parameters
1310 Parameters
1309 ----------
1311 ----------
1310
1312
1311 jobs : str or list of str or AsyncResult objects
1313 jobs : str or list of str or AsyncResult objects
1312 the msg_ids whose results should be forgotten.
1314 the msg_ids whose results should be forgotten.
1313 targets : int/str/list of ints/strs
1315 targets : int/str/list of ints/strs
1314 The targets, by uuid or int_id, whose entire history is to be purged.
1316 The targets, by int_id, whose entire history is to be purged.
1315 Use `targets='all'` to scrub everything from the Hub's memory.
1316
1317
1317 default : None
1318 default : None
1318 """
1319 """
@@ -1322,19 +1323,22 b' class Client(HasTraits):'
1322 targets = self._build_targets(targets)[1]
1323 targets = self._build_targets(targets)[1]
1323
1324
1324 # construct msg_ids from jobs
1325 # construct msg_ids from jobs
1325 msg_ids = []
1326 if jobs == 'all':
1326 if isinstance(jobs, (basestring,AsyncResult)):
1327 msg_ids = jobs
1327 jobs = [jobs]
1328 else:
1328 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1329 msg_ids = []
1329 if bad_ids:
1330 if isinstance(jobs, (basestring,AsyncResult)):
1330 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1331 jobs = [jobs]
1331 for j in jobs:
1332 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1332 if isinstance(j, AsyncResult):
1333 if bad_ids:
1333 msg_ids.extend(j.msg_ids)
1334 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1334 else:
1335 for j in jobs:
1335 msg_ids.append(j)
1336 if isinstance(j, AsyncResult):
1336
1337 msg_ids.extend(j.msg_ids)
1337 content = dict(targets=targets, msg_ids=msg_ids)
1338 else:
1339 msg_ids.append(j)
1340
1341 content = dict(engine_ids=targets, msg_ids=msg_ids)
1338 self.session.send(self._query_socket, "purge_request", content=content)
1342 self.session.send(self._query_socket, "purge_request", content=content)
1339 idents, msg = self.session.recv(self._query_socket, 0)
1343 idents, msg = self.session.recv(self._query_socket, 0)
1340 if self.debug:
1344 if self.debug:
@@ -1063,6 +1063,7 b' class Hub(SessionFactory):'
1063 """Purge results from memory. This method is more valuable before we move
1063 """Purge results from memory. This method is more valuable before we move
1064 to a DB based message storage mechanism."""
1064 to a DB based message storage mechanism."""
1065 content = msg['content']
1065 content = msg['content']
1066 self.log.info("Dropping records with %s", content)
1066 msg_ids = content.get('msg_ids', [])
1067 msg_ids = content.get('msg_ids', [])
1067 reply = dict(status='ok')
1068 reply = dict(status='ok')
1068 if msg_ids == 'all':
1069 if msg_ids == 'all':
@@ -1092,7 +1093,6 b' class Hub(SessionFactory):'
1092 except:
1093 except:
1093 reply = error.wrap_exception()
1094 reply = error.wrap_exception()
1094 break
1095 break
1095 msg_ids = self.completed.pop(eid)
1096 uid = self.engines[eid].queue
1096 uid = self.engines[eid].queue
1097 try:
1097 try:
1098 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1098 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
@@ -242,8 +242,16 b' class TestClient(ClusterTestCase):'
242 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
242 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
243
243
244 def test_purge_results(self):
244 def test_purge_results(self):
245 # ensure there are some tasks
246 for i in range(5):
247 self.client[:].apply_sync(lambda : 1)
245 hist = self.client.hub_history()
248 hist = self.client.hub_history()
246 self.client.purge_results(hist)
249 self.client.purge_results(hist[-1])
247 newhist = self.client.hub_history()
250 newhist = self.client.hub_history()
248 self.assertTrue(len(newhist) == 0)
251 self.assertEquals(len(newhist)+1,len(hist))
252
253 def test_purge_all_results(self):
254 self.client.purge_results('all')
255 hist = self.client.hub_history()
256 self.assertEquals(len(hist), 0)
249
257
General Comments 0
You need to be logged in to leave comments. Login now