Show More
@@ -1286,12 +1286,6 b' class Client(HasTraits):' | |||||
1286 | raise TypeError("indices must be str or int, not %r"%id) |
|
1286 | raise TypeError("indices must be str or int, not %r"%id) | |
1287 | theids.append(id) |
|
1287 | theids.append(id) | |
1288 |
|
1288 | |||
1289 | for msg_id in theids: |
|
|||
1290 | self.outstanding.discard(msg_id) |
|
|||
1291 | if msg_id in self.history: |
|
|||
1292 | self.history.remove(msg_id) |
|
|||
1293 | self.results.pop(msg_id, None) |
|
|||
1294 | self.metadata.pop(msg_id, None) |
|
|||
1295 | content = dict(msg_ids = theids) |
|
1289 | content = dict(msg_ids = theids) | |
1296 |
|
1290 | |||
1297 | self.session.send(self._query_socket, 'resubmit_request', content) |
|
1291 | self.session.send(self._query_socket, 'resubmit_request', content) | |
@@ -1303,8 +1297,10 b' class Client(HasTraits):' | |||||
1303 | content = msg['content'] |
|
1297 | content = msg['content'] | |
1304 | if content['status'] != 'ok': |
|
1298 | if content['status'] != 'ok': | |
1305 | raise self._unwrap_exception(content) |
|
1299 | raise self._unwrap_exception(content) | |
|
1300 | mapping = content['resubmitted'] | |||
|
1301 | new_ids = [ mapping[msg_id] for msg_id in theids ] | |||
1306 |
|
1302 | |||
1307 |
ar = AsyncHubResult(self, msg_ids= |
|
1303 | ar = AsyncHubResult(self, msg_ids=new_ids) | |
1308 |
|
1304 | |||
1309 | if block: |
|
1305 | if block: | |
1310 | ar.wait() |
|
1306 | ar.wait() |
@@ -1134,7 +1134,7 b' class Hub(SessionFactory):' | |||||
1134 |
|
1134 | |||
1135 | # validate msg_ids |
|
1135 | # validate msg_ids | |
1136 | found_ids = [ rec['msg_id'] for rec in records ] |
|
1136 | found_ids = [ rec['msg_id'] for rec in records ] | |
1137 | invalid_ids = filter(lambda m: m in self.pending, found_ids) |
|
1137 | pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ] | |
1138 | if len(records) > len(msg_ids): |
|
1138 | if len(records) > len(msg_ids): | |
1139 | try: |
|
1139 | try: | |
1140 | raise RuntimeError("DB appears to be in an inconsistent state." |
|
1140 | raise RuntimeError("DB appears to be in an inconsistent state." | |
@@ -1147,40 +1147,46 b' class Hub(SessionFactory):' | |||||
1147 | raise KeyError("No such msg(s): %r" % missing) |
|
1147 | raise KeyError("No such msg(s): %r" % missing) | |
1148 | except KeyError: |
|
1148 | except KeyError: | |
1149 | return finish(error.wrap_exception()) |
|
1149 | return finish(error.wrap_exception()) | |
1150 |
elif |
|
1150 | elif pending_ids: | |
1151 | msg_id = invalid_ids[0] |
|
1151 | pass | |
1152 | try: |
|
1152 | # no need to raise on resubmit of pending task, now that we | |
1153 | raise ValueError("Task %r appears to be inflight" % msg_id) |
|
1153 | # resubmit under new ID, but do we want to raise anyway? | |
1154 | except Exception: |
|
1154 | # msg_id = invalid_ids[0] | |
1155 | return finish(error.wrap_exception()) |
|
1155 | # try: | |
|
1156 | # raise ValueError("Task(s) %r appears to be inflight" % ) | |||
|
1157 | # except Exception: | |||
|
1158 | # return finish(error.wrap_exception()) | |||
|
1159 | ||||
|
1160 | # mapping of original IDs to resubmitted IDs | |||
|
1161 | resubmitted = {} | |||
1156 |
|
1162 | |||
1157 | # clear the existing records |
|
|||
1158 | now = datetime.now() |
|
|||
1159 | rec = empty_record() |
|
|||
1160 | map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) |
|
|||
1161 | rec['resubmitted'] = now |
|
|||
1162 | rec['queue'] = 'task' |
|
|||
1163 | rec['client_uuid'] = client_id[0] |
|
|||
1164 | try: |
|
|||
1165 | for msg_id in msg_ids: |
|
|||
1166 | self.all_completed.discard(msg_id) |
|
|||
1167 | self.db.update_record(msg_id, rec) |
|
|||
1168 | except Exception: |
|
|||
1169 | self.log.error('db::db error upating record', exc_info=True) |
|
|||
1170 | reply = error.wrap_exception() |
|
|||
1171 | else: |
|
|||
1172 |
|
|
1163 | # send the messages | |
1173 |
|
|
1164 | for rec in records: | |
1174 |
|
|
1165 | header = rec['header'] | |
1175 | # include resubmitted in header to prevent digest collision |
|
|||
1176 | header['resubmitted'] = now |
|
|||
1177 |
|
|
1166 | msg = self.session.msg(header['msg_type']) | |
|
1167 | msg_id = msg['msg_id'] | |||
1178 |
|
|
1168 | msg['content'] = rec['content'] | |
|
1169 | header.update(msg['header']) | |||
1179 |
|
|
1170 | msg['header'] = header | |
1180 | msg['header']['msg_id'] = rec['msg_id'] |
|
1171 | ||
1181 |
|
|
1172 | self.session.send(self.resubmit, msg, buffers=rec['buffers']) | |
1182 |
|
1173 | |||
1183 | finish(dict(status='ok')) |
|
1174 | resubmitted[rec['msg_id']] = msg_id | |
|
1175 | self.pending.add(msg_id) | |||
|
1176 | msg['buffers'] = [] | |||
|
1177 | try: | |||
|
1178 | self.db.add_record(msg_id, init_record(msg)) | |||
|
1179 | except Exception: | |||
|
1180 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |||
|
1181 | ||||
|
1182 | finish(dict(status='ok', resubmitted=resubmitted)) | |||
|
1183 | ||||
|
1184 | # store the new IDs in the Task DB | |||
|
1185 | for msg_id, resubmit_id in resubmitted.iteritems(): | |||
|
1186 | try: | |||
|
1187 | self.db.update_record(msg_id, {'resubmitted' : resubmit_id}) | |||
|
1188 | except Exception: | |||
|
1189 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |||
1184 |
|
1190 | |||
1185 |
|
1191 | |||
1186 | def _extract_record(self, rec): |
|
1192 | def _extract_record(self, rec): |
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):' | |||||
138 | 'engine_uuid' : 'text', |
|
138 | 'engine_uuid' : 'text', | |
139 | 'started' : 'timestamp', |
|
139 | 'started' : 'timestamp', | |
140 | 'completed' : 'timestamp', |
|
140 | 'completed' : 'timestamp', | |
141 |
'resubmitted' : 't |
|
141 | 'resubmitted' : 'text', | |
142 | 'received' : 'timestamp', |
|
142 | 'received' : 'timestamp', | |
143 | 'result_header' : 'dict text', |
|
143 | 'result_header' : 'dict text', | |
144 | 'result_content' : 'dict text', |
|
144 | 'result_content' : 'dict text', | |
@@ -247,7 +247,7 b' class SQLiteDB(BaseDB):' | |||||
247 | engine_uuid text, |
|
247 | engine_uuid text, | |
248 | started timestamp, |
|
248 | started timestamp, | |
249 | completed timestamp, |
|
249 | completed timestamp, | |
250 |
resubmitted t |
|
250 | resubmitted text, | |
251 | received timestamp, |
|
251 | received timestamp, | |
252 | result_header dict text, |
|
252 | result_header dict text, | |
253 | result_content dict text, |
|
253 | result_content dict text, |
@@ -331,13 +331,14 b' class TestClient(ClusterTestCase):' | |||||
331 | r2 = ahr.get(1) |
|
331 | r2 = ahr.get(1) | |
332 |
|
332 | |||
333 | def test_resubmit_inflight(self): |
|
333 | def test_resubmit_inflight(self): | |
334 |
""" |
|
334 | """resubmit of inflight task""" | |
335 | v = self.client.load_balanced_view() |
|
335 | v = self.client.load_balanced_view() | |
336 | ar = v.apply_async(time.sleep,1) |
|
336 | ar = v.apply_async(time.sleep,1) | |
337 | # give the message a chance to arrive |
|
337 | # give the message a chance to arrive | |
338 | time.sleep(0.2) |
|
338 | time.sleep(0.2) | |
339 |
|
|
339 | ahr = self.client.resubmit(ar.msg_ids) | |
340 | ar.get(2) |
|
340 | ar.get(2) | |
|
341 | ahr.get(2) | |||
341 |
|
342 | |||
342 | def test_resubmit_badkey(self): |
|
343 | def test_resubmit_badkey(self): | |
343 | """ensure KeyError on resubmit of nonexistant task""" |
|
344 | """ensure KeyError on resubmit of nonexistant task""" |
@@ -34,7 +34,7 b' TaskRecord keys:' | |||||
34 | =============== =============== ============= |
|
34 | =============== =============== ============= | |
35 | Key Type Description |
|
35 | Key Type Description | |
36 | =============== =============== ============= |
|
36 | =============== =============== ============= | |
37 |
msg_id uuid( |
|
37 | msg_id uuid(ascii) The msg ID | |
38 | header dict The request header |
|
38 | header dict The request header | |
39 | content dict The request content (likely empty) |
|
39 | content dict The request content (likely empty) | |
40 | buffers list(bytes) buffers containing serialized request objects |
|
40 | buffers list(bytes) buffers containing serialized request objects | |
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket" | |||||
43 | engine_uuid uuid(bytes) IDENT of engine's socket |
|
43 | engine_uuid uuid(bytes) IDENT of engine's socket | |
44 | started datetime time task began execution on engine |
|
44 | started datetime time task began execution on engine | |
45 | completed datetime time task finished execution (success or failure) on engine |
|
45 | completed datetime time task finished execution (success or failure) on engine | |
46 |
resubmitted |
|
46 | resubmitted uuid(ascii) msg_id of resubmitted task (if applicable) | |
47 | result_header dict header for result |
|
47 | result_header dict header for result | |
48 | result_content dict content for result |
|
48 | result_content dict content for result | |
49 | result_buffers list(bytes) buffers containing serialized request objects |
|
49 | result_buffers list(bytes) buffers containing serialized request objects |
General Comments 0
You need to be logged in to leave comments.
Login now