##// END OF EJS Templates
resubmitted tasks are now wholly separate (new msg_ids)...
MinRK -
Show More
@@ -1286,12 +1286,6 b' class Client(HasTraits):'
1286 raise TypeError("indices must be str or int, not %r"%id)
1286 raise TypeError("indices must be str or int, not %r"%id)
1287 theids.append(id)
1287 theids.append(id)
1288
1288
1289 for msg_id in theids:
1290 self.outstanding.discard(msg_id)
1291 if msg_id in self.history:
1292 self.history.remove(msg_id)
1293 self.results.pop(msg_id, None)
1294 self.metadata.pop(msg_id, None)
1295 content = dict(msg_ids = theids)
1289 content = dict(msg_ids = theids)
1296
1290
1297 self.session.send(self._query_socket, 'resubmit_request', content)
1291 self.session.send(self._query_socket, 'resubmit_request', content)
@@ -1303,8 +1297,10 b' class Client(HasTraits):'
1303 content = msg['content']
1297 content = msg['content']
1304 if content['status'] != 'ok':
1298 if content['status'] != 'ok':
1305 raise self._unwrap_exception(content)
1299 raise self._unwrap_exception(content)
1300 mapping = content['resubmitted']
1301 new_ids = [ mapping[msg_id] for msg_id in theids ]
1306
1302
1307 ar = AsyncHubResult(self, msg_ids=theids)
1303 ar = AsyncHubResult(self, msg_ids=new_ids)
1308
1304
1309 if block:
1305 if block:
1310 ar.wait()
1306 ar.wait()
@@ -1134,7 +1134,7 b' class Hub(SessionFactory):'
1134
1134
1135 # validate msg_ids
1135 # validate msg_ids
1136 found_ids = [ rec['msg_id'] for rec in records ]
1136 found_ids = [ rec['msg_id'] for rec in records ]
1137 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1138 if len(records) > len(msg_ids):
1138 if len(records) > len(msg_ids):
1139 try:
1139 try:
1140 raise RuntimeError("DB appears to be in an inconsistent state."
1140 raise RuntimeError("DB appears to be in an inconsistent state."
@@ -1147,40 +1147,46 b' class Hub(SessionFactory):'
1147 raise KeyError("No such msg(s): %r" % missing)
1147 raise KeyError("No such msg(s): %r" % missing)
1148 except KeyError:
1148 except KeyError:
1149 return finish(error.wrap_exception())
1149 return finish(error.wrap_exception())
1150 elif invalid_ids:
1150 elif pending_ids:
1151 msg_id = invalid_ids[0]
1151 pass
1152 try:
1152 # no need to raise on resubmit of pending task, now that we
1153 raise ValueError("Task %r appears to be inflight" % msg_id)
1153 # resubmit under new ID, but do we want to raise anyway?
1154 except Exception:
1154 # msg_id = invalid_ids[0]
1155 return finish(error.wrap_exception())
1155 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1158 # return finish(error.wrap_exception())
1159
1160 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1156
1162
1157 # clear the existing records
1158 now = datetime.now()
1159 rec = empty_record()
1160 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1161 rec['resubmitted'] = now
1162 rec['queue'] = 'task'
1163 rec['client_uuid'] = client_id[0]
1164 try:
1165 for msg_id in msg_ids:
1166 self.all_completed.discard(msg_id)
1167 self.db.update_record(msg_id, rec)
1168 except Exception:
1169 self.log.error('db::db error upating record', exc_info=True)
1170 reply = error.wrap_exception()
1171 else:
1172 # send the messages
1163 # send the messages
1173 for rec in records:
1164 for rec in records:
1174 header = rec['header']
1165 header = rec['header']
1175 # include resubmitted in header to prevent digest collision
1176 header['resubmitted'] = now
1177 msg = self.session.msg(header['msg_type'])
1166 msg = self.session.msg(header['msg_type'])
1167 msg_id = msg['msg_id']
1178 msg['content'] = rec['content']
1168 msg['content'] = rec['content']
1169 header.update(msg['header'])
1179 msg['header'] = header
1170 msg['header'] = header
1180 msg['header']['msg_id'] = rec['msg_id']
1171
1181 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1172 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1182
1173
1183 finish(dict(status='ok'))
1174 resubmitted[rec['msg_id']] = msg_id
1175 self.pending.add(msg_id)
1176 msg['buffers'] = []
1177 try:
1178 self.db.add_record(msg_id, init_record(msg))
1179 except Exception:
1180 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1181
1182 finish(dict(status='ok', resubmitted=resubmitted))
1183
1184 # store the new IDs in the Task DB
1185 for msg_id, resubmit_id in resubmitted.iteritems():
1186 try:
1187 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1188 except Exception:
1189 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1184
1190
1185
1191
1186 def _extract_record(self, rec):
1192 def _extract_record(self, rec):
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):'
138 'engine_uuid' : 'text',
138 'engine_uuid' : 'text',
139 'started' : 'timestamp',
139 'started' : 'timestamp',
140 'completed' : 'timestamp',
140 'completed' : 'timestamp',
141 'resubmitted' : 'timestamp',
141 'resubmitted' : 'text',
142 'received' : 'timestamp',
142 'received' : 'timestamp',
143 'result_header' : 'dict text',
143 'result_header' : 'dict text',
144 'result_content' : 'dict text',
144 'result_content' : 'dict text',
@@ -247,7 +247,7 b' class SQLiteDB(BaseDB):'
247 engine_uuid text,
247 engine_uuid text,
248 started timestamp,
248 started timestamp,
249 completed timestamp,
249 completed timestamp,
250 resubmitted timestamp,
250 resubmitted text,
251 received timestamp,
251 received timestamp,
252 result_header dict text,
252 result_header dict text,
253 result_content dict text,
253 result_content dict text,
@@ -331,13 +331,14 b' class TestClient(ClusterTestCase):'
331 r2 = ahr.get(1)
331 r2 = ahr.get(1)
332
332
333 def test_resubmit_inflight(self):
333 def test_resubmit_inflight(self):
334 """ensure ValueError on resubmit of inflight task"""
334 """resubmit of inflight task"""
335 v = self.client.load_balanced_view()
335 v = self.client.load_balanced_view()
336 ar = v.apply_async(time.sleep,1)
336 ar = v.apply_async(time.sleep,1)
337 # give the message a chance to arrive
337 # give the message a chance to arrive
338 time.sleep(0.2)
338 time.sleep(0.2)
339 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
339 ahr = self.client.resubmit(ar.msg_ids)
340 ar.get(2)
340 ar.get(2)
341 ahr.get(2)
341
342
342 def test_resubmit_badkey(self):
343 def test_resubmit_badkey(self):
343 """ensure KeyError on resubmit of nonexistant task"""
344 """ensure KeyError on resubmit of nonexistant task"""
@@ -34,7 +34,7 b' TaskRecord keys:'
34 =============== =============== =============
34 =============== =============== =============
35 Key Type Description
35 Key Type Description
36 =============== =============== =============
36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
37 msg_id uuid(ascii) The msg ID
38 header dict The request header
38 header dict The request header
39 content dict The request content (likely empty)
39 content dict The request content (likely empty)
40 buffers list(bytes) buffers containing serialized request objects
40 buffers list(bytes) buffers containing serialized request objects
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket"
43 engine_uuid uuid(bytes) IDENT of engine's socket
43 engine_uuid uuid(bytes) IDENT of engine's socket
44 started datetime time task began execution on engine
44 started datetime time task began execution on engine
45 completed datetime time task finished execution (success or failure) on engine
45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
46 resubmitted uuid(ascii) msg_id of resubmitted task (if applicable)
47 result_header dict header for result
47 result_header dict header for result
48 result_content dict content for result
48 result_content dict content for result
49 result_buffers list(bytes) buffers containing serialized request objects
49 result_buffers list(bytes) buffers containing serialized request objects
General Comments 0
You need to be logged in to leave comments. Login now