Show More
@@ -15,10 +15,11 b' TaskRecords are dicts of the form::' | |||
|
15 | 15 | 'header' : dict(header), |
|
16 | 16 | 'content': dict(content), |
|
17 | 17 | 'buffers': list(buffers), |
|
18 | 'submitted': datetime, | |
|
18 | 'submitted': datetime or None, | |
|
19 | 19 | 'started': datetime or None, |
|
20 | 20 | 'completed': datetime or None, |
|
21 |
're |
|
|
21 | 'received': datetime or None, | |
|
22 | 'resubmitted': str(uuid) or None, | |
|
22 | 23 | 'result_header' : dict(header) or None, |
|
23 | 24 | 'result_content' : dict(content) or None, |
|
24 | 25 | 'result_buffers' : list(buffers) or None, |
@@ -31,16 +32,7 b' e.g.:' | |||
|
31 | 32 | * client's outstanding: client_uuid = uuid && completed is None |
|
32 | 33 | * MIA: arrived is None (and completed is None) |
|
33 | 34 | |
|
34 | EngineRecords are dicts of the form:: | |
|
35 | ||
|
36 | { | |
|
37 | 'eid' : int(id), | |
|
38 | 'uuid': str(uuid) | |
|
39 | } | |
|
40 | ||
|
41 | This may be extended, but is currently. | |
|
42 | ||
|
43 | We support a subset of mongodb operators:: | |
|
35 | DictDB supports a subset of mongodb operators:: | |
|
44 | 36 | |
|
45 | 37 | $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists |
|
46 | 38 | """ |
@@ -210,12 +202,19 b' class DictDB(BaseDB):' | |||
|
210 | 202 | before_count, before, self.size_limit, culled |
|
211 | 203 | ) |
|
212 | 204 | |
|
205 | def _check_dates(self, rec): | |
|
206 | for key in ('submitted', 'started', 'completed'): | |
|
207 | value = rec.get(key, None) | |
|
208 | if value is not None and not isinstance(value, datetime): | |
|
209 | raise ValueError("%s must be None or datetime, not %r" % (key, value)) | |
|
210 | ||
|
213 | 211 | # public API methods: |
|
214 | 212 | |
|
215 | 213 | def add_record(self, msg_id, rec): |
|
216 | 214 | """Add a new Task Record, by msg_id.""" |
|
217 | 215 | if msg_id in self._records: |
|
218 | 216 | raise KeyError("Already have msg_id %r"%(msg_id)) |
|
217 | self._check_dates(rec) | |
|
219 | 218 | self._records[msg_id] = rec |
|
220 | 219 | self._add_bytes(rec) |
|
221 | 220 | self._maybe_cull() |
@@ -232,6 +231,7 b' class DictDB(BaseDB):' | |||
|
232 | 231 | """Update the data in an existing record.""" |
|
233 | 232 | if msg_id in self._culled_ids: |
|
234 | 233 | raise KeyError("Record %r has been culled for size" % msg_id) |
|
234 | self._check_dates(rec) | |
|
235 | 235 | _rec = self._records[msg_id] |
|
236 | 236 | self._drop_bytes(_rec) |
|
237 | 237 | _rec.update(rec) |
@@ -669,7 +669,7 b' class Hub(SessionFactory):' | |||
|
669 | 669 | rheader = msg['header'] |
|
670 | 670 | md = msg['metadata'] |
|
671 | 671 | completed = rheader['date'] |
|
672 | started = md.get('started', None) | |
|
672 | started = extract_dates(md.get('started', None)) | |
|
673 | 673 | result = { |
|
674 | 674 | 'result_header' : rheader, |
|
675 | 675 | 'result_metadata': md, |
@@ -775,7 +775,7 b' class Hub(SessionFactory):' | |||
|
775 | 775 | if msg_id in self.tasks[eid]: |
|
776 | 776 | self.tasks[eid].remove(msg_id) |
|
777 | 777 | completed = header['date'] |
|
778 | started = md.get('started', None) | |
|
778 | started = extract_dates(md.get('started', None)) | |
|
779 | 779 | result = { |
|
780 | 780 | 'result_header' : header, |
|
781 | 781 | 'result_metadata': msg['metadata'], |
@@ -1194,6 +1194,7 b' class Hub(SessionFactory):' | |||
|
1194 | 1194 | self.db.drop_matching_records(dict(completed={'$ne':None})) |
|
1195 | 1195 | except Exception: |
|
1196 | 1196 | reply = error.wrap_exception() |
|
1197 | self.log.exception("Error dropping records") | |
|
1197 | 1198 | else: |
|
1198 | 1199 | pending = [m for m in msg_ids if (m in self.pending)] |
|
1199 | 1200 | if pending: |
@@ -1201,11 +1202,13 b' class Hub(SessionFactory):' | |||
|
1201 | 1202 | raise IndexError("msg pending: %r" % pending[0]) |
|
1202 | 1203 | except: |
|
1203 | 1204 | reply = error.wrap_exception() |
|
1205 | self.log.exception("Error dropping records") | |
|
1204 | 1206 | else: |
|
1205 | 1207 | try: |
|
1206 | 1208 | self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) |
|
1207 | 1209 | except Exception: |
|
1208 | 1210 | reply = error.wrap_exception() |
|
1211 | self.log.exception("Error dropping records") | |
|
1209 | 1212 | |
|
1210 | 1213 | if reply['status'] == 'ok': |
|
1211 | 1214 | eids = content.get('engine_ids', []) |
@@ -1215,12 +1218,14 b' class Hub(SessionFactory):' | |||
|
1215 | 1218 | raise IndexError("No such engine: %i" % eid) |
|
1216 | 1219 | except: |
|
1217 | 1220 | reply = error.wrap_exception() |
|
1221 | self.log.exception("Error dropping records") | |
|
1218 | 1222 | break |
|
1219 | 1223 | uid = self.engines[eid].uuid |
|
1220 | 1224 | try: |
|
1221 | 1225 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1222 | 1226 | except Exception: |
|
1223 | 1227 | reply = error.wrap_exception() |
|
1228 | self.log.exception("Error dropping records") | |
|
1224 | 1229 | break |
|
1225 | 1230 | |
|
1226 | 1231 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) |
@@ -1248,12 +1253,14 b' class Hub(SessionFactory):' | |||
|
1248 | 1253 | raise RuntimeError("DB appears to be in an inconsistent state." |
|
1249 | 1254 | "More matching records were found than should exist") |
|
1250 | 1255 | except Exception: |
|
1256 | self.log.exception("Failed to resubmit task") | |
|
1251 | 1257 | return finish(error.wrap_exception()) |
|
1252 | 1258 | elif len(records) < len(msg_ids): |
|
1253 | 1259 | missing = [ m for m in msg_ids if m not in found_ids ] |
|
1254 | 1260 | try: |
|
1255 | 1261 | raise KeyError("No such msg(s): %r" % missing) |
|
1256 | 1262 | except KeyError: |
|
1263 | self.log.exception("Failed to resubmit task") | |
|
1257 | 1264 | return finish(error.wrap_exception()) |
|
1258 | 1265 | elif pending_ids: |
|
1259 | 1266 | pass |
@@ -1343,6 +1350,7 b' class Hub(SessionFactory):' | |||
|
1343 | 1350 | records[rec['msg_id']] = rec |
|
1344 | 1351 | except Exception: |
|
1345 | 1352 | content = error.wrap_exception() |
|
1353 | self.log.exception("Failed to get results") | |
|
1346 | 1354 | self.session.send(self.query, "result_reply", content=content, |
|
1347 | 1355 | parent=msg, ident=client_id) |
|
1348 | 1356 | return |
@@ -1381,6 +1389,7 b' class Hub(SessionFactory):' | |||
|
1381 | 1389 | msg_ids = self.db.get_history() |
|
1382 | 1390 | except Exception as e: |
|
1383 | 1391 | content = error.wrap_exception() |
|
1392 | self.log.exception("Failed to get history") | |
|
1384 | 1393 | else: |
|
1385 | 1394 | content = dict(status='ok', history=msg_ids) |
|
1386 | 1395 | |
@@ -1398,6 +1407,7 b' class Hub(SessionFactory):' | |||
|
1398 | 1407 | records = self.db.find_records(query, keys) |
|
1399 | 1408 | except Exception as e: |
|
1400 | 1409 | content = error.wrap_exception() |
|
1410 | self.log.exception("DB query failed") | |
|
1401 | 1411 | else: |
|
1402 | 1412 | # extract buffers from reply content: |
|
1403 | 1413 | if keys is not None: |
@@ -20,7 +20,7 b' from subprocess import Popen, PIPE, STDOUT' | |||
|
20 | 20 | import nose |
|
21 | 21 | |
|
22 | 22 | from IPython.utils.path import get_ipython_dir |
|
23 | from IPython.parallel import Client | |
|
23 | from IPython.parallel import Client, error | |
|
24 | 24 | from IPython.parallel.apps.launcher import (LocalProcessLauncher, |
|
25 | 25 | ipengine_cmd_argv, |
|
26 | 26 | ipcontroller_cmd_argv, |
@@ -53,6 +53,15 b' class TestProcessLauncher(LocalProcessLauncher):' | |||
|
53 | 53 | # nose setup/teardown |
|
54 | 54 | |
|
55 | 55 | def setup(): |
|
56 | ||
|
57 | # show tracebacks for RemoteErrors | |
|
58 | class RemoteErrorWithTB(error.RemoteError): | |
|
59 | def __str__(self): | |
|
60 | s = super(RemoteErrorWithTB, self).__str__() | |
|
61 | return '\n'.join([s, self.traceback or '']) | |
|
62 | ||
|
63 | error.RemoteError = RemoteErrorWithTB | |
|
64 | ||
|
56 | 65 | cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest') |
|
57 | 66 | engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json') |
|
58 | 67 | client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json') |
@@ -109,7 +118,10 b" def add_engines(n=1, profile='iptest', total=False):" | |||
|
109 | 118 | return eps |
|
110 | 119 | |
|
111 | 120 | def teardown(): |
|
121 | try: | |
|
112 | 122 | time.sleep(1) |
|
123 | except KeyboardInterrupt: | |
|
124 | return | |
|
113 | 125 | while launchers: |
|
114 | 126 | p = launchers.pop() |
|
115 | 127 | if p.poll() is None: |
@@ -119,7 +131,10 b' def teardown():' | |||
|
119 | 131 | print(e) |
|
120 | 132 | pass |
|
121 | 133 | if p.poll() is None: |
|
134 | try: | |
|
122 | 135 | time.sleep(.25) |
|
136 | except KeyboardInterrupt: | |
|
137 | return | |
|
123 | 138 | if p.poll() is None: |
|
124 | 139 | try: |
|
125 | 140 | print('cleaning up test process...') |
@@ -247,7 +247,7 b' class TestDictBackend(TaskDBTest, TestCase):' | |||
|
247 | 247 | self.load_records(1) |
|
248 | 248 | self.assertEqual(len(self.db.get_history()), 17) |
|
249 | 249 | |
|
250 |
for i in range( |
|
|
250 | for i in range(25): | |
|
251 | 251 | self.load_records(1) |
|
252 | 252 | self.assertTrue(len(self.db.get_history()) >= 17) |
|
253 | 253 | self.assertTrue(len(self.db.get_history()) <= 20) |
@@ -34,7 +34,7 b" next_attr_name = '__next__' if py3compat.PY3 else 'next'" | |||
|
34 | 34 | |
|
35 | 35 | # timestamp formats |
|
36 | 36 | ISO8601 = "%Y-%m-%dT%H:%M:%S.%f" |
|
37 | ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6})Z?([\+\-]\d{2}:?\d{2})?$") | |
|
37 | ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d{1,6})?Z?([\+\-]\d{2}:?\d{2})?$") | |
|
38 | 38 | |
|
39 | 39 | #----------------------------------------------------------------------------- |
|
40 | 40 | # Classes and functions |
@@ -75,7 +75,10 b' def parse_date(s):' | |||
|
75 | 75 | if m: |
|
76 | 76 | # FIXME: add actual timezone support |
|
77 | 77 | # this just drops the timezone info |
|
78 |
notz = m.groups() |
|
|
78 | notz, ms, tz = m.groups() | |
|
79 | if not ms: | |
|
80 | ms = '.0' | |
|
81 | notz = notz + ms | |
|
79 | 82 | return datetime.strptime(notz, ISO8601) |
|
80 | 83 | return s |
|
81 | 84 |
@@ -96,8 +96,8 b' def test_encode_images():' | |||
|
96 | 96 | |
|
97 | 97 | def test_lambda(): |
|
98 | 98 | jc = json_clean(lambda : 1) |
|
99 |
assert |
|
|
100 |
assert |
|
|
99 | nt.assert_is_instance(jc, str) | |
|
100 | nt.assert_in('<lambda>', jc) | |
|
101 | 101 | json.dumps(jc) |
|
102 | 102 | |
|
103 | 103 | def test_extract_dates(): |
@@ -120,16 +120,18 b' def test_extract_dates():' | |||
|
120 | 120 | nt.assert_equal(dt, ref) |
|
121 | 121 | |
|
122 | 122 | def test_parse_ms_precision(): |
|
123 |
base = '2013-07-03T16:34:52 |
|
|
123 | base = '2013-07-03T16:34:52' | |
|
124 | 124 | digits = '1234567890' |
|
125 | 125 | |
|
126 | parsed = jsonutil.parse_date(base) | |
|
127 | nt.assert_is_instance(parsed, datetime.datetime) | |
|
126 | 128 | for i in range(len(digits)): |
|
127 | ts = base + digits[:i] | |
|
129 | ts = base + '.' + digits[:i] | |
|
128 | 130 | parsed = jsonutil.parse_date(ts) |
|
129 | 131 | if i >= 1 and i <= 6: |
|
130 |
assert |
|
|
132 | nt.assert_is_instance(parsed, datetime.datetime) | |
|
131 | 133 | else: |
|
132 |
assert |
|
|
134 | nt.assert_is_instance(parsed, str) | |
|
133 | 135 | |
|
134 | 136 | def test_date_default(): |
|
135 | 137 | data = dict(today=datetime.datetime.now(), utcnow=tz.utcnow()) |
@@ -138,7 +140,7 b' def test_date_default():' | |||
|
138 | 140 | nt.assert_equal(jsondata.count("+00"), 1) |
|
139 | 141 | extracted = jsonutil.extract_dates(json.loads(jsondata)) |
|
140 | 142 | for dt in extracted.values(): |
|
141 |
nt.assert_ |
|
|
143 | nt.assert_is_instance(dt, datetime.datetime) | |
|
142 | 144 | |
|
143 | 145 | def test_exception(): |
|
144 | 146 | bad_dicts = [{1:'number', '1':'string'}, |
General Comments 0
You need to be logged in to leave comments.
Login now