Show More
@@ -1286,12 +1286,6 b' class Client(HasTraits):' | |||
|
1286 | 1286 | raise TypeError("indices must be str or int, not %r"%id) |
|
1287 | 1287 | theids.append(id) |
|
1288 | 1288 | |
|
1289 | for msg_id in theids: | |
|
1290 | self.outstanding.discard(msg_id) | |
|
1291 | if msg_id in self.history: | |
|
1292 | self.history.remove(msg_id) | |
|
1293 | self.results.pop(msg_id, None) | |
|
1294 | self.metadata.pop(msg_id, None) | |
|
1295 | 1289 | content = dict(msg_ids = theids) |
|
1296 | 1290 | |
|
1297 | 1291 | self.session.send(self._query_socket, 'resubmit_request', content) |
@@ -1303,8 +1297,10 b' class Client(HasTraits):' | |||
|
1303 | 1297 | content = msg['content'] |
|
1304 | 1298 | if content['status'] != 'ok': |
|
1305 | 1299 | raise self._unwrap_exception(content) |
|
1300 | mapping = content['resubmitted'] | |
|
1301 | new_ids = [ mapping[msg_id] for msg_id in theids ] | |
|
1306 | 1302 | |
|
1307 |
ar = AsyncHubResult(self, msg_ids= |
|
|
1303 | ar = AsyncHubResult(self, msg_ids=new_ids) | |
|
1308 | 1304 | |
|
1309 | 1305 | if block: |
|
1310 | 1306 | ar.wait() |
@@ -1134,7 +1134,7 b' class Hub(SessionFactory):' | |||
|
1134 | 1134 | |
|
1135 | 1135 | # validate msg_ids |
|
1136 | 1136 | found_ids = [ rec['msg_id'] for rec in records ] |
|
1137 | invalid_ids = filter(lambda m: m in self.pending, found_ids) | |
|
1137 | pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ] | |
|
1138 | 1138 | if len(records) > len(msg_ids): |
|
1139 | 1139 | try: |
|
1140 | 1140 | raise RuntimeError("DB appears to be in an inconsistent state." |
@@ -1147,40 +1147,46 b' class Hub(SessionFactory):' | |||
|
1147 | 1147 | raise KeyError("No such msg(s): %r" % missing) |
|
1148 | 1148 | except KeyError: |
|
1149 | 1149 | return finish(error.wrap_exception()) |
|
1150 |
elif |
|
|
1151 | msg_id = invalid_ids[0] | |
|
1152 | try: | |
|
1153 | raise ValueError("Task %r appears to be inflight" % msg_id) | |
|
1154 | except Exception: | |
|
1155 | return finish(error.wrap_exception()) | |
|
1150 | elif pending_ids: | |
|
1151 | pass | |
|
1152 | # no need to raise on resubmit of pending task, now that we | |
|
1153 | # resubmit under new ID, but do we want to raise anyway? | |
|
1154 | # msg_id = invalid_ids[0] | |
|
1155 | # try: | |
|
1156 | # raise ValueError("Task(s) %r appears to be inflight" % ) | |
|
1157 | # except Exception: | |
|
1158 | # return finish(error.wrap_exception()) | |
|
1159 | ||
|
1160 | # mapping of original IDs to resubmitted IDs | |
|
1161 | resubmitted = {} | |
|
1156 | 1162 | |
|
1157 | # clear the existing records | |
|
1158 | now = datetime.now() | |
|
1159 | rec = empty_record() | |
|
1160 | map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) | |
|
1161 | rec['resubmitted'] = now | |
|
1162 | rec['queue'] = 'task' | |
|
1163 | rec['client_uuid'] = client_id[0] | |
|
1164 | try: | |
|
1165 | for msg_id in msg_ids: | |
|
1166 | self.all_completed.discard(msg_id) | |
|
1167 | self.db.update_record(msg_id, rec) | |
|
1168 | except Exception: | |
|
1169 | self.log.error('db::db error upating record', exc_info=True) | |
|
1170 | reply = error.wrap_exception() | |
|
1171 | else: | |
|
1172 | 1163 |
|
|
1173 | 1164 |
|
|
1174 | 1165 |
|
|
1175 | # include resubmitted in header to prevent digest collision | |
|
1176 | header['resubmitted'] = now | |
|
1177 | 1166 |
|
|
1167 | msg_id = msg['msg_id'] | |
|
1178 | 1168 |
|
|
1169 | header.update(msg['header']) | |
|
1179 | 1170 |
|
|
1180 | msg['header']['msg_id'] = rec['msg_id'] | |
|
1171 | ||
|
1181 | 1172 |
|
|
1182 | 1173 | |
|
1183 | finish(dict(status='ok')) | |
|
1174 | resubmitted[rec['msg_id']] = msg_id | |
|
1175 | self.pending.add(msg_id) | |
|
1176 | msg['buffers'] = [] | |
|
1177 | try: | |
|
1178 | self.db.add_record(msg_id, init_record(msg)) | |
|
1179 | except Exception: | |
|
1180 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |
|
1181 | ||
|
1182 | finish(dict(status='ok', resubmitted=resubmitted)) | |
|
1183 | ||
|
1184 | # store the new IDs in the Task DB | |
|
1185 | for msg_id, resubmit_id in resubmitted.iteritems(): | |
|
1186 | try: | |
|
1187 | self.db.update_record(msg_id, {'resubmitted' : resubmit_id}) | |
|
1188 | except Exception: | |
|
1189 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |
|
1184 | 1190 | |
|
1185 | 1191 | |
|
1186 | 1192 | def _extract_record(self, rec): |
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):' | |||
|
138 | 138 | 'engine_uuid' : 'text', |
|
139 | 139 | 'started' : 'timestamp', |
|
140 | 140 | 'completed' : 'timestamp', |
|
141 |
'resubmitted' : 't |
|
|
141 | 'resubmitted' : 'text', | |
|
142 | 142 | 'received' : 'timestamp', |
|
143 | 143 | 'result_header' : 'dict text', |
|
144 | 144 | 'result_content' : 'dict text', |
@@ -247,7 +247,7 b' class SQLiteDB(BaseDB):' | |||
|
247 | 247 | engine_uuid text, |
|
248 | 248 | started timestamp, |
|
249 | 249 | completed timestamp, |
|
250 |
resubmitted t |
|
|
250 | resubmitted text, | |
|
251 | 251 | received timestamp, |
|
252 | 252 | result_header dict text, |
|
253 | 253 | result_content dict text, |
@@ -331,13 +331,14 b' class TestClient(ClusterTestCase):' | |||
|
331 | 331 | r2 = ahr.get(1) |
|
332 | 332 | |
|
333 | 333 | def test_resubmit_inflight(self): |
|
334 |
""" |
|
|
334 | """resubmit of inflight task""" | |
|
335 | 335 | v = self.client.load_balanced_view() |
|
336 | 336 | ar = v.apply_async(time.sleep,1) |
|
337 | 337 | # give the message a chance to arrive |
|
338 | 338 | time.sleep(0.2) |
|
339 |
|
|
|
339 | ahr = self.client.resubmit(ar.msg_ids) | |
|
340 | 340 | ar.get(2) |
|
341 | ahr.get(2) | |
|
341 | 342 | |
|
342 | 343 | def test_resubmit_badkey(self): |
|
343 | 344 | """ensure KeyError on resubmit of nonexistant task""" |
@@ -34,7 +34,7 b' TaskRecord keys:' | |||
|
34 | 34 | =============== =============== ============= |
|
35 | 35 | Key Type Description |
|
36 | 36 | =============== =============== ============= |
|
37 |
msg_id uuid( |
|
|
37 | msg_id uuid(ascii) The msg ID | |
|
38 | 38 | header dict The request header |
|
39 | 39 | content dict The request content (likely empty) |
|
40 | 40 | buffers list(bytes) buffers containing serialized request objects |
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket" | |||
|
43 | 43 | engine_uuid uuid(bytes) IDENT of engine's socket |
|
44 | 44 | started datetime time task began execution on engine |
|
45 | 45 | completed datetime time task finished execution (success or failure) on engine |
|
46 |
resubmitted |
|
|
46 | resubmitted uuid(ascii) msg_id of resubmitted task (if applicable) | |
|
47 | 47 | result_header dict header for result |
|
48 | 48 | result_content dict content for result |
|
49 | 49 | result_buffers list(bytes) buffers containing serialized request objects |
General Comments 0
You need to be logged in to leave comments.
Login now