Show More
@@ -15,10 +15,11 b' TaskRecords are dicts of the form::' | |||||
15 | 'header' : dict(header), |
|
15 | 'header' : dict(header), | |
16 | 'content': dict(content), |
|
16 | 'content': dict(content), | |
17 | 'buffers': list(buffers), |
|
17 | 'buffers': list(buffers), | |
18 | 'submitted': datetime, |
|
18 | 'submitted': datetime or None, | |
19 | 'started': datetime or None, |
|
19 | 'started': datetime or None, | |
20 | 'completed': datetime or None, |
|
20 | 'completed': datetime or None, | |
21 |
're |
|
21 | 'received': datetime or None, | |
|
22 | 'resubmitted': str(uuid) or None, | |||
22 | 'result_header' : dict(header) or None, |
|
23 | 'result_header' : dict(header) or None, | |
23 | 'result_content' : dict(content) or None, |
|
24 | 'result_content' : dict(content) or None, | |
24 | 'result_buffers' : list(buffers) or None, |
|
25 | 'result_buffers' : list(buffers) or None, | |
@@ -31,16 +32,7 b' e.g.:' | |||||
31 | * client's outstanding: client_uuid = uuid && completed is None |
|
32 | * client's outstanding: client_uuid = uuid && completed is None | |
32 | * MIA: arrived is None (and completed is None) |
|
33 | * MIA: arrived is None (and completed is None) | |
33 |
|
34 | |||
34 | EngineRecords are dicts of the form:: |
|
35 | DictDB supports a subset of mongodb operators:: | |
35 |
|
||||
36 | { |
|
|||
37 | 'eid' : int(id), |
|
|||
38 | 'uuid': str(uuid) |
|
|||
39 | } |
|
|||
40 |
|
||||
41 | This may be extended, but is currently. |
|
|||
42 |
|
||||
43 | We support a subset of mongodb operators:: |
|
|||
44 |
|
36 | |||
45 | $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists |
|
37 | $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists | |
46 | """ |
|
38 | """ | |
@@ -210,12 +202,19 b' class DictDB(BaseDB):' | |||||
210 | before_count, before, self.size_limit, culled |
|
202 | before_count, before, self.size_limit, culled | |
211 | ) |
|
203 | ) | |
212 |
|
204 | |||
|
205 | def _check_dates(self, rec): | |||
|
206 | for key in ('submitted', 'started', 'completed'): | |||
|
207 | value = rec.get(key, None) | |||
|
208 | if value is not None and not isinstance(value, datetime): | |||
|
209 | raise ValueError("%s must be None or datetime, not %r" % (key, value)) | |||
|
210 | ||||
213 | # public API methods: |
|
211 | # public API methods: | |
214 |
|
212 | |||
215 | def add_record(self, msg_id, rec): |
|
213 | def add_record(self, msg_id, rec): | |
216 | """Add a new Task Record, by msg_id.""" |
|
214 | """Add a new Task Record, by msg_id.""" | |
217 | if msg_id in self._records: |
|
215 | if msg_id in self._records: | |
218 | raise KeyError("Already have msg_id %r"%(msg_id)) |
|
216 | raise KeyError("Already have msg_id %r"%(msg_id)) | |
|
217 | self._check_dates(rec) | |||
219 | self._records[msg_id] = rec |
|
218 | self._records[msg_id] = rec | |
220 | self._add_bytes(rec) |
|
219 | self._add_bytes(rec) | |
221 | self._maybe_cull() |
|
220 | self._maybe_cull() | |
@@ -232,6 +231,7 b' class DictDB(BaseDB):' | |||||
232 | """Update the data in an existing record.""" |
|
231 | """Update the data in an existing record.""" | |
233 | if msg_id in self._culled_ids: |
|
232 | if msg_id in self._culled_ids: | |
234 | raise KeyError("Record %r has been culled for size" % msg_id) |
|
233 | raise KeyError("Record %r has been culled for size" % msg_id) | |
|
234 | self._check_dates(rec) | |||
235 | _rec = self._records[msg_id] |
|
235 | _rec = self._records[msg_id] | |
236 | self._drop_bytes(_rec) |
|
236 | self._drop_bytes(_rec) | |
237 | _rec.update(rec) |
|
237 | _rec.update(rec) |
@@ -669,7 +669,7 b' class Hub(SessionFactory):' | |||||
669 | rheader = msg['header'] |
|
669 | rheader = msg['header'] | |
670 | md = msg['metadata'] |
|
670 | md = msg['metadata'] | |
671 | completed = rheader['date'] |
|
671 | completed = rheader['date'] | |
672 | started = md.get('started', None) |
|
672 | started = extract_dates(md.get('started', None)) | |
673 | result = { |
|
673 | result = { | |
674 | 'result_header' : rheader, |
|
674 | 'result_header' : rheader, | |
675 | 'result_metadata': md, |
|
675 | 'result_metadata': md, | |
@@ -775,7 +775,7 b' class Hub(SessionFactory):' | |||||
775 | if msg_id in self.tasks[eid]: |
|
775 | if msg_id in self.tasks[eid]: | |
776 | self.tasks[eid].remove(msg_id) |
|
776 | self.tasks[eid].remove(msg_id) | |
777 | completed = header['date'] |
|
777 | completed = header['date'] | |
778 | started = md.get('started', None) |
|
778 | started = extract_dates(md.get('started', None)) | |
779 | result = { |
|
779 | result = { | |
780 | 'result_header' : header, |
|
780 | 'result_header' : header, | |
781 | 'result_metadata': msg['metadata'], |
|
781 | 'result_metadata': msg['metadata'], | |
@@ -1194,6 +1194,7 b' class Hub(SessionFactory):' | |||||
1194 | self.db.drop_matching_records(dict(completed={'$ne':None})) |
|
1194 | self.db.drop_matching_records(dict(completed={'$ne':None})) | |
1195 | except Exception: |
|
1195 | except Exception: | |
1196 | reply = error.wrap_exception() |
|
1196 | reply = error.wrap_exception() | |
|
1197 | self.log.exception("Error dropping records") | |||
1197 | else: |
|
1198 | else: | |
1198 | pending = [m for m in msg_ids if (m in self.pending)] |
|
1199 | pending = [m for m in msg_ids if (m in self.pending)] | |
1199 | if pending: |
|
1200 | if pending: | |
@@ -1201,11 +1202,13 b' class Hub(SessionFactory):' | |||||
1201 | raise IndexError("msg pending: %r" % pending[0]) |
|
1202 | raise IndexError("msg pending: %r" % pending[0]) | |
1202 | except: |
|
1203 | except: | |
1203 | reply = error.wrap_exception() |
|
1204 | reply = error.wrap_exception() | |
|
1205 | self.log.exception("Error dropping records") | |||
1204 | else: |
|
1206 | else: | |
1205 | try: |
|
1207 | try: | |
1206 | self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) |
|
1208 | self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) | |
1207 | except Exception: |
|
1209 | except Exception: | |
1208 | reply = error.wrap_exception() |
|
1210 | reply = error.wrap_exception() | |
|
1211 | self.log.exception("Error dropping records") | |||
1209 |
|
1212 | |||
1210 | if reply['status'] == 'ok': |
|
1213 | if reply['status'] == 'ok': | |
1211 | eids = content.get('engine_ids', []) |
|
1214 | eids = content.get('engine_ids', []) | |
@@ -1215,12 +1218,14 b' class Hub(SessionFactory):' | |||||
1215 | raise IndexError("No such engine: %i" % eid) |
|
1218 | raise IndexError("No such engine: %i" % eid) | |
1216 | except: |
|
1219 | except: | |
1217 | reply = error.wrap_exception() |
|
1220 | reply = error.wrap_exception() | |
|
1221 | self.log.exception("Error dropping records") | |||
1218 | break |
|
1222 | break | |
1219 | uid = self.engines[eid].uuid |
|
1223 | uid = self.engines[eid].uuid | |
1220 | try: |
|
1224 | try: | |
1221 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1225 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |
1222 | except Exception: |
|
1226 | except Exception: | |
1223 | reply = error.wrap_exception() |
|
1227 | reply = error.wrap_exception() | |
|
1228 | self.log.exception("Error dropping records") | |||
1224 | break |
|
1229 | break | |
1225 |
|
1230 | |||
1226 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) |
|
1231 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) | |
@@ -1248,12 +1253,14 b' class Hub(SessionFactory):' | |||||
1248 | raise RuntimeError("DB appears to be in an inconsistent state." |
|
1253 | raise RuntimeError("DB appears to be in an inconsistent state." | |
1249 | "More matching records were found than should exist") |
|
1254 | "More matching records were found than should exist") | |
1250 | except Exception: |
|
1255 | except Exception: | |
|
1256 | self.log.exception("Failed to resubmit task") | |||
1251 | return finish(error.wrap_exception()) |
|
1257 | return finish(error.wrap_exception()) | |
1252 | elif len(records) < len(msg_ids): |
|
1258 | elif len(records) < len(msg_ids): | |
1253 | missing = [ m for m in msg_ids if m not in found_ids ] |
|
1259 | missing = [ m for m in msg_ids if m not in found_ids ] | |
1254 | try: |
|
1260 | try: | |
1255 | raise KeyError("No such msg(s): %r" % missing) |
|
1261 | raise KeyError("No such msg(s): %r" % missing) | |
1256 | except KeyError: |
|
1262 | except KeyError: | |
|
1263 | self.log.exception("Failed to resubmit task") | |||
1257 | return finish(error.wrap_exception()) |
|
1264 | return finish(error.wrap_exception()) | |
1258 | elif pending_ids: |
|
1265 | elif pending_ids: | |
1259 | pass |
|
1266 | pass | |
@@ -1343,6 +1350,7 b' class Hub(SessionFactory):' | |||||
1343 | records[rec['msg_id']] = rec |
|
1350 | records[rec['msg_id']] = rec | |
1344 | except Exception: |
|
1351 | except Exception: | |
1345 | content = error.wrap_exception() |
|
1352 | content = error.wrap_exception() | |
|
1353 | self.log.exception("Failed to get results") | |||
1346 | self.session.send(self.query, "result_reply", content=content, |
|
1354 | self.session.send(self.query, "result_reply", content=content, | |
1347 | parent=msg, ident=client_id) |
|
1355 | parent=msg, ident=client_id) | |
1348 | return |
|
1356 | return | |
@@ -1381,6 +1389,7 b' class Hub(SessionFactory):' | |||||
1381 | msg_ids = self.db.get_history() |
|
1389 | msg_ids = self.db.get_history() | |
1382 | except Exception as e: |
|
1390 | except Exception as e: | |
1383 | content = error.wrap_exception() |
|
1391 | content = error.wrap_exception() | |
|
1392 | self.log.exception("Failed to get history") | |||
1384 | else: |
|
1393 | else: | |
1385 | content = dict(status='ok', history=msg_ids) |
|
1394 | content = dict(status='ok', history=msg_ids) | |
1386 |
|
1395 | |||
@@ -1398,6 +1407,7 b' class Hub(SessionFactory):' | |||||
1398 | records = self.db.find_records(query, keys) |
|
1407 | records = self.db.find_records(query, keys) | |
1399 | except Exception as e: |
|
1408 | except Exception as e: | |
1400 | content = error.wrap_exception() |
|
1409 | content = error.wrap_exception() | |
|
1410 | self.log.exception("DB query failed") | |||
1401 | else: |
|
1411 | else: | |
1402 | # extract buffers from reply content: |
|
1412 | # extract buffers from reply content: | |
1403 | if keys is not None: |
|
1413 | if keys is not None: |
@@ -20,7 +20,7 b' from subprocess import Popen, PIPE, STDOUT' | |||||
20 | import nose |
|
20 | import nose | |
21 |
|
21 | |||
22 | from IPython.utils.path import get_ipython_dir |
|
22 | from IPython.utils.path import get_ipython_dir | |
23 | from IPython.parallel import Client |
|
23 | from IPython.parallel import Client, error | |
24 | from IPython.parallel.apps.launcher import (LocalProcessLauncher, |
|
24 | from IPython.parallel.apps.launcher import (LocalProcessLauncher, | |
25 | ipengine_cmd_argv, |
|
25 | ipengine_cmd_argv, | |
26 | ipcontroller_cmd_argv, |
|
26 | ipcontroller_cmd_argv, | |
@@ -53,6 +53,15 b' class TestProcessLauncher(LocalProcessLauncher):' | |||||
53 | # nose setup/teardown |
|
53 | # nose setup/teardown | |
54 |
|
54 | |||
55 | def setup(): |
|
55 | def setup(): | |
|
56 | ||||
|
57 | # show tracebacks for RemoteErrors | |||
|
58 | class RemoteErrorWithTB(error.RemoteError): | |||
|
59 | def __str__(self): | |||
|
60 | s = super(RemoteErrorWithTB, self).__str__() | |||
|
61 | return '\n'.join([s, self.traceback or '']) | |||
|
62 | ||||
|
63 | error.RemoteError = RemoteErrorWithTB | |||
|
64 | ||||
56 | cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest') |
|
65 | cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest') | |
57 | engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json') |
|
66 | engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json') | |
58 | client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json') |
|
67 | client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json') | |
@@ -109,7 +118,10 b" def add_engines(n=1, profile='iptest', total=False):" | |||||
109 | return eps |
|
118 | return eps | |
110 |
|
119 | |||
111 | def teardown(): |
|
120 | def teardown(): | |
|
121 | try: | |||
112 | time.sleep(1) |
|
122 | time.sleep(1) | |
|
123 | except KeyboardInterrupt: | |||
|
124 | return | |||
113 | while launchers: |
|
125 | while launchers: | |
114 | p = launchers.pop() |
|
126 | p = launchers.pop() | |
115 | if p.poll() is None: |
|
127 | if p.poll() is None: | |
@@ -119,7 +131,10 b' def teardown():' | |||||
119 | print(e) |
|
131 | print(e) | |
120 | pass |
|
132 | pass | |
121 | if p.poll() is None: |
|
133 | if p.poll() is None: | |
|
134 | try: | |||
122 | time.sleep(.25) |
|
135 | time.sleep(.25) | |
|
136 | except KeyboardInterrupt: | |||
|
137 | return | |||
123 | if p.poll() is None: |
|
138 | if p.poll() is None: | |
124 | try: |
|
139 | try: | |
125 | print('cleaning up test process...') |
|
140 | print('cleaning up test process...') |
@@ -247,7 +247,7 b' class TestDictBackend(TaskDBTest, TestCase):' | |||||
247 | self.load_records(1) |
|
247 | self.load_records(1) | |
248 | self.assertEqual(len(self.db.get_history()), 17) |
|
248 | self.assertEqual(len(self.db.get_history()), 17) | |
249 |
|
249 | |||
250 |
for i in range( |
|
250 | for i in range(25): | |
251 | self.load_records(1) |
|
251 | self.load_records(1) | |
252 | self.assertTrue(len(self.db.get_history()) >= 17) |
|
252 | self.assertTrue(len(self.db.get_history()) >= 17) | |
253 | self.assertTrue(len(self.db.get_history()) <= 20) |
|
253 | self.assertTrue(len(self.db.get_history()) <= 20) |
@@ -34,7 +34,7 b" next_attr_name = '__next__' if py3compat.PY3 else 'next'" | |||||
34 |
|
34 | |||
35 | # timestamp formats |
|
35 | # timestamp formats | |
36 | ISO8601 = "%Y-%m-%dT%H:%M:%S.%f" |
|
36 | ISO8601 = "%Y-%m-%dT%H:%M:%S.%f" | |
37 | ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6})Z?([\+\-]\d{2}:?\d{2})?$") |
|
37 | ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d{1,6})?Z?([\+\-]\d{2}:?\d{2})?$") | |
38 |
|
38 | |||
39 | #----------------------------------------------------------------------------- |
|
39 | #----------------------------------------------------------------------------- | |
40 | # Classes and functions |
|
40 | # Classes and functions | |
@@ -75,7 +75,10 b' def parse_date(s):' | |||||
75 | if m: |
|
75 | if m: | |
76 | # FIXME: add actual timezone support |
|
76 | # FIXME: add actual timezone support | |
77 | # this just drops the timezone info |
|
77 | # this just drops the timezone info | |
78 |
notz = m.groups() |
|
78 | notz, ms, tz = m.groups() | |
|
79 | if not ms: | |||
|
80 | ms = '.0' | |||
|
81 | notz = notz + ms | |||
79 | return datetime.strptime(notz, ISO8601) |
|
82 | return datetime.strptime(notz, ISO8601) | |
80 | return s |
|
83 | return s | |
81 |
|
84 |
@@ -96,8 +96,8 b' def test_encode_images():' | |||||
96 |
|
96 | |||
97 | def test_lambda(): |
|
97 | def test_lambda(): | |
98 | jc = json_clean(lambda : 1) |
|
98 | jc = json_clean(lambda : 1) | |
99 |
assert |
|
99 | nt.assert_is_instance(jc, str) | |
100 |
assert |
|
100 | nt.assert_in('<lambda>', jc) | |
101 | json.dumps(jc) |
|
101 | json.dumps(jc) | |
102 |
|
102 | |||
103 | def test_extract_dates(): |
|
103 | def test_extract_dates(): | |
@@ -120,16 +120,18 b' def test_extract_dates():' | |||||
120 | nt.assert_equal(dt, ref) |
|
120 | nt.assert_equal(dt, ref) | |
121 |
|
121 | |||
122 | def test_parse_ms_precision(): |
|
122 | def test_parse_ms_precision(): | |
123 |
base = '2013-07-03T16:34:52 |
|
123 | base = '2013-07-03T16:34:52' | |
124 | digits = '1234567890' |
|
124 | digits = '1234567890' | |
125 |
|
125 | |||
|
126 | parsed = jsonutil.parse_date(base) | |||
|
127 | nt.assert_is_instance(parsed, datetime.datetime) | |||
126 | for i in range(len(digits)): |
|
128 | for i in range(len(digits)): | |
127 | ts = base + digits[:i] |
|
129 | ts = base + '.' + digits[:i] | |
128 | parsed = jsonutil.parse_date(ts) |
|
130 | parsed = jsonutil.parse_date(ts) | |
129 | if i >= 1 and i <= 6: |
|
131 | if i >= 1 and i <= 6: | |
130 |
assert |
|
132 | nt.assert_is_instance(parsed, datetime.datetime) | |
131 | else: |
|
133 | else: | |
132 |
assert |
|
134 | nt.assert_is_instance(parsed, str) | |
133 |
|
135 | |||
134 | def test_date_default(): |
|
136 | def test_date_default(): | |
135 | data = dict(today=datetime.datetime.now(), utcnow=tz.utcnow()) |
|
137 | data = dict(today=datetime.datetime.now(), utcnow=tz.utcnow()) | |
@@ -138,7 +140,7 b' def test_date_default():' | |||||
138 | nt.assert_equal(jsondata.count("+00"), 1) |
|
140 | nt.assert_equal(jsondata.count("+00"), 1) | |
139 | extracted = jsonutil.extract_dates(json.loads(jsondata)) |
|
141 | extracted = jsonutil.extract_dates(json.loads(jsondata)) | |
140 | for dt in extracted.values(): |
|
142 | for dt in extracted.values(): | |
141 |
nt.assert_ |
|
143 | nt.assert_is_instance(dt, datetime.datetime) | |
142 |
|
144 | |||
143 | def test_exception(): |
|
145 | def test_exception(): | |
144 | bad_dicts = [{1:'number', '1':'string'}, |
|
146 | bad_dicts = [{1:'number', '1':'string'}, |
General Comments 0
You need to be logged in to leave comments.
Login now