##// END OF EJS Templates
resubmitted tasks are now wholly separate (new msg_ids)...
MinRK -
Show More
@@ -1286,12 +1286,6 b' class Client(HasTraits):'
1286 1286 raise TypeError("indices must be str or int, not %r"%id)
1287 1287 theids.append(id)
1288 1288
1289 for msg_id in theids:
1290 self.outstanding.discard(msg_id)
1291 if msg_id in self.history:
1292 self.history.remove(msg_id)
1293 self.results.pop(msg_id, None)
1294 self.metadata.pop(msg_id, None)
1295 1289 content = dict(msg_ids = theids)
1296 1290
1297 1291 self.session.send(self._query_socket, 'resubmit_request', content)
@@ -1303,8 +1297,10 b' class Client(HasTraits):'
1303 1297 content = msg['content']
1304 1298 if content['status'] != 'ok':
1305 1299 raise self._unwrap_exception(content)
1300 mapping = content['resubmitted']
1301 new_ids = [ mapping[msg_id] for msg_id in theids ]
1306 1302
1307 ar = AsyncHubResult(self, msg_ids=theids)
1303 ar = AsyncHubResult(self, msg_ids=new_ids)
1308 1304
1309 1305 if block:
1310 1306 ar.wait()
@@ -1134,7 +1134,7 b' class Hub(SessionFactory):'
1134 1134
1135 1135 # validate msg_ids
1136 1136 found_ids = [ rec['msg_id'] for rec in records ]
1137 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1138 1138 if len(records) > len(msg_ids):
1139 1139 try:
1140 1140 raise RuntimeError("DB appears to be in an inconsistent state."
@@ -1147,40 +1147,46 b' class Hub(SessionFactory):'
1147 1147 raise KeyError("No such msg(s): %r" % missing)
1148 1148 except KeyError:
1149 1149 return finish(error.wrap_exception())
1150 elif invalid_ids:
1151 msg_id = invalid_ids[0]
1150 elif pending_ids:
1151 pass
1152 # no need to raise on resubmit of pending task, now that we
1153 # resubmit under new ID, but do we want to raise anyway?
1154 # msg_id = invalid_ids[0]
1155 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1158 # return finish(error.wrap_exception())
1159
1160 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1162
1163 # send the messages
1164 for rec in records:
1165 header = rec['header']
1166 msg = self.session.msg(header['msg_type'])
1167 msg_id = msg['msg_id']
1168 msg['content'] = rec['content']
1169 header.update(msg['header'])
1170 msg['header'] = header
1171
1172 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1173
1174 resubmitted[rec['msg_id']] = msg_id
1175 self.pending.add(msg_id)
1176 msg['buffers'] = []
1152 1177 try:
1153 raise ValueError("Task %r appears to be inflight" % msg_id)
1178 self.db.add_record(msg_id, init_record(msg))
1154 1179 except Exception:
1155 return finish(error.wrap_exception())
1180 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1156 1181
1157 # clear the existing records
1158 now = datetime.now()
1159 rec = empty_record()
1160 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1161 rec['resubmitted'] = now
1162 rec['queue'] = 'task'
1163 rec['client_uuid'] = client_id[0]
1164 try:
1165 for msg_id in msg_ids:
1166 self.all_completed.discard(msg_id)
1167 self.db.update_record(msg_id, rec)
1168 except Exception:
1169 self.log.error('db::db error upating record', exc_info=True)
1170 reply = error.wrap_exception()
1171 else:
1172 # send the messages
1173 for rec in records:
1174 header = rec['header']
1175 # include resubmitted in header to prevent digest collision
1176 header['resubmitted'] = now
1177 msg = self.session.msg(header['msg_type'])
1178 msg['content'] = rec['content']
1179 msg['header'] = header
1180 msg['header']['msg_id'] = rec['msg_id']
1181 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1182
1183 finish(dict(status='ok'))
1182 finish(dict(status='ok', resubmitted=resubmitted))
1183
1184 # store the new IDs in the Task DB
1185 for msg_id, resubmit_id in resubmitted.iteritems():
1186 try:
1187 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1188 except Exception:
1189 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1184 1190
1185 1191
1186 1192 def _extract_record(self, rec):
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):'
138 138 'engine_uuid' : 'text',
139 139 'started' : 'timestamp',
140 140 'completed' : 'timestamp',
141 'resubmitted' : 'timestamp',
141 'resubmitted' : 'text',
142 142 'received' : 'timestamp',
143 143 'result_header' : 'dict text',
144 144 'result_content' : 'dict text',
@@ -247,7 +247,7 b' class SQLiteDB(BaseDB):'
247 247 engine_uuid text,
248 248 started timestamp,
249 249 completed timestamp,
250 resubmitted timestamp,
250 resubmitted text,
251 251 received timestamp,
252 252 result_header dict text,
253 253 result_content dict text,
@@ -331,13 +331,14 b' class TestClient(ClusterTestCase):'
331 331 r2 = ahr.get(1)
332 332
333 333 def test_resubmit_inflight(self):
334 """ensure ValueError on resubmit of inflight task"""
334 """resubmit of inflight task"""
335 335 v = self.client.load_balanced_view()
336 336 ar = v.apply_async(time.sleep,1)
337 337 # give the message a chance to arrive
338 338 time.sleep(0.2)
339 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
339 ahr = self.client.resubmit(ar.msg_ids)
340 340 ar.get(2)
341 ahr.get(2)
341 342
342 343 def test_resubmit_badkey(self):
343 344 """ensure KeyError on resubmit of nonexistant task"""
@@ -34,7 +34,7 b' TaskRecord keys:'
34 34 =============== =============== =============
35 35 Key Type Description
36 36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
37 msg_id uuid(ascii) The msg ID
38 38 header dict The request header
39 39 content dict The request content (likely empty)
40 40 buffers list(bytes) buffers containing serialized request objects
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket"
43 43 engine_uuid uuid(bytes) IDENT of engine's socket
44 44 started datetime time task began execution on engine
45 45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
46 resubmitted uuid(ascii) msg_id of resubmitted task (if applicable)
47 47 result_header dict header for result
48 48 result_content dict content for result
49 49 result_buffers list(bytes) buffers containing serialized request objects
General Comments 0
You need to be logged in to leave comments. Login now