##// END OF EJS Templates
Merge pull request #5098 from minrk/parallel-debug...
Thomas Kluyver -
r15313:9d27bffe merge
parent child Browse files
Show More
@@ -15,10 +15,11 b' TaskRecords are dicts of the form::'
15 15 'header' : dict(header),
16 16 'content': dict(content),
17 17 'buffers': list(buffers),
18 'submitted': datetime,
18 'submitted': datetime or None,
19 19 'started': datetime or None,
20 20 'completed': datetime or None,
21 'resubmitted': datetime or None,
21 'received': datetime or None,
22 'resubmitted': str(uuid) or None,
22 23 'result_header' : dict(header) or None,
23 24 'result_content' : dict(content) or None,
24 25 'result_buffers' : list(buffers) or None,
@@ -31,16 +32,7 b' e.g.:'
31 32 * client's outstanding: client_uuid = uuid && completed is None
32 33 * MIA: arrived is None (and completed is None)
33 34
34 EngineRecords are dicts of the form::
35
36 {
37 'eid' : int(id),
38 'uuid': str(uuid)
39 }
40
41 This may be extended, but is currently.
42
43 We support a subset of mongodb operators::
35 DictDB supports a subset of mongodb operators::
44 36
45 37 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
46 38 """
@@ -210,12 +202,19 b' class DictDB(BaseDB):'
210 202 before_count, before, self.size_limit, culled
211 203 )
212 204
205 def _check_dates(self, rec):
206 for key in ('submitted', 'started', 'completed'):
207 value = rec.get(key, None)
208 if value is not None and not isinstance(value, datetime):
209 raise ValueError("%s must be None or datetime, not %r" % (key, value))
210
213 211 # public API methods:
214 212
215 213 def add_record(self, msg_id, rec):
216 214 """Add a new Task Record, by msg_id."""
217 215 if msg_id in self._records:
218 216 raise KeyError("Already have msg_id %r"%(msg_id))
217 self._check_dates(rec)
219 218 self._records[msg_id] = rec
220 219 self._add_bytes(rec)
221 220 self._maybe_cull()
@@ -232,6 +231,7 b' class DictDB(BaseDB):'
232 231 """Update the data in an existing record."""
233 232 if msg_id in self._culled_ids:
234 233 raise KeyError("Record %r has been culled for size" % msg_id)
234 self._check_dates(rec)
235 235 _rec = self._records[msg_id]
236 236 self._drop_bytes(_rec)
237 237 _rec.update(rec)
@@ -669,7 +669,7 b' class Hub(SessionFactory):'
669 669 rheader = msg['header']
670 670 md = msg['metadata']
671 671 completed = rheader['date']
672 started = md.get('started', None)
672 started = extract_dates(md.get('started', None))
673 673 result = {
674 674 'result_header' : rheader,
675 675 'result_metadata': md,
@@ -775,7 +775,7 b' class Hub(SessionFactory):'
775 775 if msg_id in self.tasks[eid]:
776 776 self.tasks[eid].remove(msg_id)
777 777 completed = header['date']
778 started = md.get('started', None)
778 started = extract_dates(md.get('started', None))
779 779 result = {
780 780 'result_header' : header,
781 781 'result_metadata': msg['metadata'],
@@ -1194,6 +1194,7 b' class Hub(SessionFactory):'
1194 1194 self.db.drop_matching_records(dict(completed={'$ne':None}))
1195 1195 except Exception:
1196 1196 reply = error.wrap_exception()
1197 self.log.exception("Error dropping records")
1197 1198 else:
1198 1199 pending = [m for m in msg_ids if (m in self.pending)]
1199 1200 if pending:
@@ -1201,11 +1202,13 b' class Hub(SessionFactory):'
1201 1202 raise IndexError("msg pending: %r" % pending[0])
1202 1203 except:
1203 1204 reply = error.wrap_exception()
1205 self.log.exception("Error dropping records")
1204 1206 else:
1205 1207 try:
1206 1208 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1207 1209 except Exception:
1208 1210 reply = error.wrap_exception()
1211 self.log.exception("Error dropping records")
1209 1212
1210 1213 if reply['status'] == 'ok':
1211 1214 eids = content.get('engine_ids', [])
@@ -1215,12 +1218,14 b' class Hub(SessionFactory):'
1215 1218 raise IndexError("No such engine: %i" % eid)
1216 1219 except:
1217 1220 reply = error.wrap_exception()
1221 self.log.exception("Error dropping records")
1218 1222 break
1219 1223 uid = self.engines[eid].uuid
1220 1224 try:
1221 1225 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1222 1226 except Exception:
1223 1227 reply = error.wrap_exception()
1228 self.log.exception("Error dropping records")
1224 1229 break
1225 1230
1226 1231 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
@@ -1248,12 +1253,14 b' class Hub(SessionFactory):'
1248 1253 raise RuntimeError("DB appears to be in an inconsistent state."
1249 1254 "More matching records were found than should exist")
1250 1255 except Exception:
1256 self.log.exception("Failed to resubmit task")
1251 1257 return finish(error.wrap_exception())
1252 1258 elif len(records) < len(msg_ids):
1253 1259 missing = [ m for m in msg_ids if m not in found_ids ]
1254 1260 try:
1255 1261 raise KeyError("No such msg(s): %r" % missing)
1256 1262 except KeyError:
1263 self.log.exception("Failed to resubmit task")
1257 1264 return finish(error.wrap_exception())
1258 1265 elif pending_ids:
1259 1266 pass
@@ -1343,6 +1350,7 b' class Hub(SessionFactory):'
1343 1350 records[rec['msg_id']] = rec
1344 1351 except Exception:
1345 1352 content = error.wrap_exception()
1353 self.log.exception("Failed to get results")
1346 1354 self.session.send(self.query, "result_reply", content=content,
1347 1355 parent=msg, ident=client_id)
1348 1356 return
@@ -1381,6 +1389,7 b' class Hub(SessionFactory):'
1381 1389 msg_ids = self.db.get_history()
1382 1390 except Exception as e:
1383 1391 content = error.wrap_exception()
1392 self.log.exception("Failed to get history")
1384 1393 else:
1385 1394 content = dict(status='ok', history=msg_ids)
1386 1395
@@ -1398,6 +1407,7 b' class Hub(SessionFactory):'
1398 1407 records = self.db.find_records(query, keys)
1399 1408 except Exception as e:
1400 1409 content = error.wrap_exception()
1410 self.log.exception("DB query failed")
1401 1411 else:
1402 1412 # extract buffers from reply content:
1403 1413 if keys is not None:
@@ -20,7 +20,7 b' from subprocess import Popen, PIPE, STDOUT'
20 20 import nose
21 21
22 22 from IPython.utils.path import get_ipython_dir
23 from IPython.parallel import Client
23 from IPython.parallel import Client, error
24 24 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
25 25 ipengine_cmd_argv,
26 26 ipcontroller_cmd_argv,
@@ -53,6 +53,15 b' class TestProcessLauncher(LocalProcessLauncher):'
53 53 # nose setup/teardown
54 54
55 55 def setup():
56
57 # show tracebacks for RemoteErrors
58 class RemoteErrorWithTB(error.RemoteError):
59 def __str__(self):
60 s = super(RemoteErrorWithTB, self).__str__()
61 return '\n'.join([s, self.traceback or ''])
62
63 error.RemoteError = RemoteErrorWithTB
64
56 65 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
57 66 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
58 67 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
@@ -109,7 +118,10 b" def add_engines(n=1, profile='iptest', total=False):"
109 118 return eps
110 119
111 120 def teardown():
112 time.sleep(1)
121 try:
122 time.sleep(1)
123 except KeyboardInterrupt:
124 return
113 125 while launchers:
114 126 p = launchers.pop()
115 127 if p.poll() is None:
@@ -119,7 +131,10 b' def teardown():'
119 131 print(e)
120 132 pass
121 133 if p.poll() is None:
122 time.sleep(.25)
134 try:
135 time.sleep(.25)
136 except KeyboardInterrupt:
137 return
123 138 if p.poll() is None:
124 139 try:
125 140 print('cleaning up test process...')
@@ -247,7 +247,7 b' class TestDictBackend(TaskDBTest, TestCase):'
247 247 self.load_records(1)
248 248 self.assertEqual(len(self.db.get_history()), 17)
249 249
250 for i in range(100):
250 for i in range(25):
251 251 self.load_records(1)
252 252 self.assertTrue(len(self.db.get_history()) >= 17)
253 253 self.assertTrue(len(self.db.get_history()) <= 20)
@@ -34,7 +34,7 b" next_attr_name = '__next__' if py3compat.PY3 else 'next'"
34 34
35 35 # timestamp formats
36 36 ISO8601 = "%Y-%m-%dT%H:%M:%S.%f"
37 ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6})Z?([\+\-]\d{2}:?\d{2})?$")
37 ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d{1,6})?Z?([\+\-]\d{2}:?\d{2})?$")
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Classes and functions
@@ -75,7 +75,10 b' def parse_date(s):'
75 75 if m:
76 76 # FIXME: add actual timezone support
77 77 # this just drops the timezone info
78 notz = m.groups()[0]
78 notz, ms, tz = m.groups()
79 if not ms:
80 ms = '.0'
81 notz = notz + ms
79 82 return datetime.strptime(notz, ISO8601)
80 83 return s
81 84
@@ -96,8 +96,8 b' def test_encode_images():'
96 96
97 97 def test_lambda():
98 98 jc = json_clean(lambda : 1)
99 assert isinstance(jc, str)
100 assert '<lambda>' in jc
99 nt.assert_is_instance(jc, str)
100 nt.assert_in('<lambda>', jc)
101 101 json.dumps(jc)
102 102
103 103 def test_extract_dates():
@@ -120,16 +120,18 b' def test_extract_dates():'
120 120 nt.assert_equal(dt, ref)
121 121
122 122 def test_parse_ms_precision():
123 base = '2013-07-03T16:34:52.'
123 base = '2013-07-03T16:34:52'
124 124 digits = '1234567890'
125 125
126 parsed = jsonutil.parse_date(base)
127 nt.assert_is_instance(parsed, datetime.datetime)
126 128 for i in range(len(digits)):
127 ts = base + digits[:i]
129 ts = base + '.' + digits[:i]
128 130 parsed = jsonutil.parse_date(ts)
129 131 if i >= 1 and i <= 6:
130 assert isinstance(parsed, datetime.datetime)
132 nt.assert_is_instance(parsed, datetime.datetime)
131 133 else:
132 assert isinstance(parsed, str)
134 nt.assert_is_instance(parsed, str)
133 135
134 136 def test_date_default():
135 137 data = dict(today=datetime.datetime.now(), utcnow=tz.utcnow())
@@ -138,7 +140,7 b' def test_date_default():'
138 140 nt.assert_equal(jsondata.count("+00"), 1)
139 141 extracted = jsonutil.extract_dates(json.loads(jsondata))
140 142 for dt in extracted.values():
141 nt.assert_true(isinstance(dt, datetime.datetime))
143 nt.assert_is_instance(dt, datetime.datetime)
142 144
143 145 def test_exception():
144 146 bad_dicts = [{1:'number', '1':'string'},
General Comments 0
You need to be logged in to leave comments. Login now