##// END OF EJS Templates
Merge pull request #5098 from minrk/parallel-debug...
Thomas Kluyver -
r15313:9d27bffe merge
parent child Browse files
Show More
@@ -15,10 +15,11 b' TaskRecords are dicts of the form::'
15 'header' : dict(header),
15 'header' : dict(header),
16 'content': dict(content),
16 'content': dict(content),
17 'buffers': list(buffers),
17 'buffers': list(buffers),
18 'submitted': datetime,
18 'submitted': datetime or None,
19 'started': datetime or None,
19 'started': datetime or None,
20 'completed': datetime or None,
20 'completed': datetime or None,
21 'resubmitted': datetime or None,
21 'received': datetime or None,
22 'resubmitted': str(uuid) or None,
22 'result_header' : dict(header) or None,
23 'result_header' : dict(header) or None,
23 'result_content' : dict(content) or None,
24 'result_content' : dict(content) or None,
24 'result_buffers' : list(buffers) or None,
25 'result_buffers' : list(buffers) or None,
@@ -31,16 +32,7 b' e.g.:'
31 * client's outstanding: client_uuid = uuid && completed is None
32 * client's outstanding: client_uuid = uuid && completed is None
32 * MIA: arrived is None (and completed is None)
33 * MIA: arrived is None (and completed is None)
33
34
34 EngineRecords are dicts of the form::
35 DictDB supports a subset of mongodb operators::
35
36 {
37 'eid' : int(id),
38 'uuid': str(uuid)
39 }
40
41 This may be extended, but is currently.
42
43 We support a subset of mongodb operators::
44
36
45 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
37 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
46 """
38 """
@@ -210,12 +202,19 b' class DictDB(BaseDB):'
210 before_count, before, self.size_limit, culled
202 before_count, before, self.size_limit, culled
211 )
203 )
212
204
205 def _check_dates(self, rec):
206 for key in ('submitted', 'started', 'completed'):
207 value = rec.get(key, None)
208 if value is not None and not isinstance(value, datetime):
209 raise ValueError("%s must be None or datetime, not %r" % (key, value))
210
213 # public API methods:
211 # public API methods:
214
212
215 def add_record(self, msg_id, rec):
213 def add_record(self, msg_id, rec):
216 """Add a new Task Record, by msg_id."""
214 """Add a new Task Record, by msg_id."""
217 if msg_id in self._records:
215 if msg_id in self._records:
218 raise KeyError("Already have msg_id %r"%(msg_id))
216 raise KeyError("Already have msg_id %r"%(msg_id))
217 self._check_dates(rec)
219 self._records[msg_id] = rec
218 self._records[msg_id] = rec
220 self._add_bytes(rec)
219 self._add_bytes(rec)
221 self._maybe_cull()
220 self._maybe_cull()
@@ -232,6 +231,7 b' class DictDB(BaseDB):'
232 """Update the data in an existing record."""
231 """Update the data in an existing record."""
233 if msg_id in self._culled_ids:
232 if msg_id in self._culled_ids:
234 raise KeyError("Record %r has been culled for size" % msg_id)
233 raise KeyError("Record %r has been culled for size" % msg_id)
234 self._check_dates(rec)
235 _rec = self._records[msg_id]
235 _rec = self._records[msg_id]
236 self._drop_bytes(_rec)
236 self._drop_bytes(_rec)
237 _rec.update(rec)
237 _rec.update(rec)
@@ -669,7 +669,7 b' class Hub(SessionFactory):'
669 rheader = msg['header']
669 rheader = msg['header']
670 md = msg['metadata']
670 md = msg['metadata']
671 completed = rheader['date']
671 completed = rheader['date']
672 started = md.get('started', None)
672 started = extract_dates(md.get('started', None))
673 result = {
673 result = {
674 'result_header' : rheader,
674 'result_header' : rheader,
675 'result_metadata': md,
675 'result_metadata': md,
@@ -775,7 +775,7 b' class Hub(SessionFactory):'
775 if msg_id in self.tasks[eid]:
775 if msg_id in self.tasks[eid]:
776 self.tasks[eid].remove(msg_id)
776 self.tasks[eid].remove(msg_id)
777 completed = header['date']
777 completed = header['date']
778 started = md.get('started', None)
778 started = extract_dates(md.get('started', None))
779 result = {
779 result = {
780 'result_header' : header,
780 'result_header' : header,
781 'result_metadata': msg['metadata'],
781 'result_metadata': msg['metadata'],
@@ -1194,6 +1194,7 b' class Hub(SessionFactory):'
1194 self.db.drop_matching_records(dict(completed={'$ne':None}))
1194 self.db.drop_matching_records(dict(completed={'$ne':None}))
1195 except Exception:
1195 except Exception:
1196 reply = error.wrap_exception()
1196 reply = error.wrap_exception()
1197 self.log.exception("Error dropping records")
1197 else:
1198 else:
1198 pending = [m for m in msg_ids if (m in self.pending)]
1199 pending = [m for m in msg_ids if (m in self.pending)]
1199 if pending:
1200 if pending:
@@ -1201,11 +1202,13 b' class Hub(SessionFactory):'
1201 raise IndexError("msg pending: %r" % pending[0])
1202 raise IndexError("msg pending: %r" % pending[0])
1202 except:
1203 except:
1203 reply = error.wrap_exception()
1204 reply = error.wrap_exception()
1205 self.log.exception("Error dropping records")
1204 else:
1206 else:
1205 try:
1207 try:
1206 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1208 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1207 except Exception:
1209 except Exception:
1208 reply = error.wrap_exception()
1210 reply = error.wrap_exception()
1211 self.log.exception("Error dropping records")
1209
1212
1210 if reply['status'] == 'ok':
1213 if reply['status'] == 'ok':
1211 eids = content.get('engine_ids', [])
1214 eids = content.get('engine_ids', [])
@@ -1215,12 +1218,14 b' class Hub(SessionFactory):'
1215 raise IndexError("No such engine: %i" % eid)
1218 raise IndexError("No such engine: %i" % eid)
1216 except:
1219 except:
1217 reply = error.wrap_exception()
1220 reply = error.wrap_exception()
1221 self.log.exception("Error dropping records")
1218 break
1222 break
1219 uid = self.engines[eid].uuid
1223 uid = self.engines[eid].uuid
1220 try:
1224 try:
1221 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1225 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1222 except Exception:
1226 except Exception:
1223 reply = error.wrap_exception()
1227 reply = error.wrap_exception()
1228 self.log.exception("Error dropping records")
1224 break
1229 break
1225
1230
1226 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1231 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
@@ -1248,12 +1253,14 b' class Hub(SessionFactory):'
1248 raise RuntimeError("DB appears to be in an inconsistent state."
1253 raise RuntimeError("DB appears to be in an inconsistent state."
1249 "More matching records were found than should exist")
1254 "More matching records were found than should exist")
1250 except Exception:
1255 except Exception:
1256 self.log.exception("Failed to resubmit task")
1251 return finish(error.wrap_exception())
1257 return finish(error.wrap_exception())
1252 elif len(records) < len(msg_ids):
1258 elif len(records) < len(msg_ids):
1253 missing = [ m for m in msg_ids if m not in found_ids ]
1259 missing = [ m for m in msg_ids if m not in found_ids ]
1254 try:
1260 try:
1255 raise KeyError("No such msg(s): %r" % missing)
1261 raise KeyError("No such msg(s): %r" % missing)
1256 except KeyError:
1262 except KeyError:
1263 self.log.exception("Failed to resubmit task")
1257 return finish(error.wrap_exception())
1264 return finish(error.wrap_exception())
1258 elif pending_ids:
1265 elif pending_ids:
1259 pass
1266 pass
@@ -1343,6 +1350,7 b' class Hub(SessionFactory):'
1343 records[rec['msg_id']] = rec
1350 records[rec['msg_id']] = rec
1344 except Exception:
1351 except Exception:
1345 content = error.wrap_exception()
1352 content = error.wrap_exception()
1353 self.log.exception("Failed to get results")
1346 self.session.send(self.query, "result_reply", content=content,
1354 self.session.send(self.query, "result_reply", content=content,
1347 parent=msg, ident=client_id)
1355 parent=msg, ident=client_id)
1348 return
1356 return
@@ -1381,6 +1389,7 b' class Hub(SessionFactory):'
1381 msg_ids = self.db.get_history()
1389 msg_ids = self.db.get_history()
1382 except Exception as e:
1390 except Exception as e:
1383 content = error.wrap_exception()
1391 content = error.wrap_exception()
1392 self.log.exception("Failed to get history")
1384 else:
1393 else:
1385 content = dict(status='ok', history=msg_ids)
1394 content = dict(status='ok', history=msg_ids)
1386
1395
@@ -1398,6 +1407,7 b' class Hub(SessionFactory):'
1398 records = self.db.find_records(query, keys)
1407 records = self.db.find_records(query, keys)
1399 except Exception as e:
1408 except Exception as e:
1400 content = error.wrap_exception()
1409 content = error.wrap_exception()
1410 self.log.exception("DB query failed")
1401 else:
1411 else:
1402 # extract buffers from reply content:
1412 # extract buffers from reply content:
1403 if keys is not None:
1413 if keys is not None:
@@ -20,7 +20,7 b' from subprocess import Popen, PIPE, STDOUT'
20 import nose
20 import nose
21
21
22 from IPython.utils.path import get_ipython_dir
22 from IPython.utils.path import get_ipython_dir
23 from IPython.parallel import Client
23 from IPython.parallel import Client, error
24 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
24 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
25 ipengine_cmd_argv,
25 ipengine_cmd_argv,
26 ipcontroller_cmd_argv,
26 ipcontroller_cmd_argv,
@@ -53,6 +53,15 b' class TestProcessLauncher(LocalProcessLauncher):'
53 # nose setup/teardown
53 # nose setup/teardown
54
54
55 def setup():
55 def setup():
56
57 # show tracebacks for RemoteErrors
58 class RemoteErrorWithTB(error.RemoteError):
59 def __str__(self):
60 s = super(RemoteErrorWithTB, self).__str__()
61 return '\n'.join([s, self.traceback or ''])
62
63 error.RemoteError = RemoteErrorWithTB
64
56 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
65 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
57 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
66 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
58 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
67 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
@@ -109,7 +118,10 b" def add_engines(n=1, profile='iptest', total=False):"
109 return eps
118 return eps
110
119
111 def teardown():
120 def teardown():
112 time.sleep(1)
121 try:
122 time.sleep(1)
123 except KeyboardInterrupt:
124 return
113 while launchers:
125 while launchers:
114 p = launchers.pop()
126 p = launchers.pop()
115 if p.poll() is None:
127 if p.poll() is None:
@@ -119,7 +131,10 b' def teardown():'
119 print(e)
131 print(e)
120 pass
132 pass
121 if p.poll() is None:
133 if p.poll() is None:
122 time.sleep(.25)
134 try:
135 time.sleep(.25)
136 except KeyboardInterrupt:
137 return
123 if p.poll() is None:
138 if p.poll() is None:
124 try:
139 try:
125 print('cleaning up test process...')
140 print('cleaning up test process...')
@@ -247,7 +247,7 b' class TestDictBackend(TaskDBTest, TestCase):'
247 self.load_records(1)
247 self.load_records(1)
248 self.assertEqual(len(self.db.get_history()), 17)
248 self.assertEqual(len(self.db.get_history()), 17)
249
249
250 for i in range(100):
250 for i in range(25):
251 self.load_records(1)
251 self.load_records(1)
252 self.assertTrue(len(self.db.get_history()) >= 17)
252 self.assertTrue(len(self.db.get_history()) >= 17)
253 self.assertTrue(len(self.db.get_history()) <= 20)
253 self.assertTrue(len(self.db.get_history()) <= 20)
@@ -34,7 +34,7 b" next_attr_name = '__next__' if py3compat.PY3 else 'next'"
34
34
35 # timestamp formats
35 # timestamp formats
36 ISO8601 = "%Y-%m-%dT%H:%M:%S.%f"
36 ISO8601 = "%Y-%m-%dT%H:%M:%S.%f"
37 ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6})Z?([\+\-]\d{2}:?\d{2})?$")
37 ISO8601_PAT=re.compile(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d{1,6})?Z?([\+\-]\d{2}:?\d{2})?$")
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Classes and functions
40 # Classes and functions
@@ -75,7 +75,10 b' def parse_date(s):'
75 if m:
75 if m:
76 # FIXME: add actual timezone support
76 # FIXME: add actual timezone support
77 # this just drops the timezone info
77 # this just drops the timezone info
78 notz = m.groups()[0]
78 notz, ms, tz = m.groups()
79 if not ms:
80 ms = '.0'
81 notz = notz + ms
79 return datetime.strptime(notz, ISO8601)
82 return datetime.strptime(notz, ISO8601)
80 return s
83 return s
81
84
@@ -96,8 +96,8 b' def test_encode_images():'
96
96
97 def test_lambda():
97 def test_lambda():
98 jc = json_clean(lambda : 1)
98 jc = json_clean(lambda : 1)
99 assert isinstance(jc, str)
99 nt.assert_is_instance(jc, str)
100 assert '<lambda>' in jc
100 nt.assert_in('<lambda>', jc)
101 json.dumps(jc)
101 json.dumps(jc)
102
102
103 def test_extract_dates():
103 def test_extract_dates():
@@ -120,16 +120,18 b' def test_extract_dates():'
120 nt.assert_equal(dt, ref)
120 nt.assert_equal(dt, ref)
121
121
122 def test_parse_ms_precision():
122 def test_parse_ms_precision():
123 base = '2013-07-03T16:34:52.'
123 base = '2013-07-03T16:34:52'
124 digits = '1234567890'
124 digits = '1234567890'
125
125
126 parsed = jsonutil.parse_date(base)
127 nt.assert_is_instance(parsed, datetime.datetime)
126 for i in range(len(digits)):
128 for i in range(len(digits)):
127 ts = base + digits[:i]
129 ts = base + '.' + digits[:i]
128 parsed = jsonutil.parse_date(ts)
130 parsed = jsonutil.parse_date(ts)
129 if i >= 1 and i <= 6:
131 if i >= 1 and i <= 6:
130 assert isinstance(parsed, datetime.datetime)
132 nt.assert_is_instance(parsed, datetime.datetime)
131 else:
133 else:
132 assert isinstance(parsed, str)
134 nt.assert_is_instance(parsed, str)
133
135
134 def test_date_default():
136 def test_date_default():
135 data = dict(today=datetime.datetime.now(), utcnow=tz.utcnow())
137 data = dict(today=datetime.datetime.now(), utcnow=tz.utcnow())
@@ -138,7 +140,7 b' def test_date_default():'
138 nt.assert_equal(jsondata.count("+00"), 1)
140 nt.assert_equal(jsondata.count("+00"), 1)
139 extracted = jsonutil.extract_dates(json.loads(jsondata))
141 extracted = jsonutil.extract_dates(json.loads(jsondata))
140 for dt in extracted.values():
142 for dt in extracted.values():
141 nt.assert_true(isinstance(dt, datetime.datetime))
143 nt.assert_is_instance(dt, datetime.datetime)
142
144
143 def test_exception():
145 def test_exception():
144 bad_dicts = [{1:'number', '1':'string'},
146 bad_dicts = [{1:'number', '1':'string'},
General Comments 0
You need to be logged in to leave comments. Login now