Show More
@@ -650,12 +650,16 b' class Client(HasTraits):' | |||||
650 | e.engine_info['engine_id'] = eid |
|
650 | e.engine_info['engine_id'] = eid | |
651 | return e |
|
651 | return e | |
652 |
|
652 | |||
653 |
def _extract_metadata(self, |
|
653 | def _extract_metadata(self, msg): | |
|
654 | header = msg['header'] | |||
|
655 | parent = msg['parent_header'] | |||
|
656 | msg_meta = msg['metadata'] | |||
|
657 | content = msg['content'] | |||
654 | md = {'msg_id' : parent['msg_id'], |
|
658 | md = {'msg_id' : parent['msg_id'], | |
655 | 'received' : datetime.now(), |
|
659 | 'received' : datetime.now(), | |
656 |
'engine_uuid' : |
|
660 | 'engine_uuid' : msg_meta.get('engine', None), | |
657 |
'follow' : |
|
661 | 'follow' : msg_meta.get('follow', []), | |
658 |
'after' : |
|
662 | 'after' : msg_meta.get('after', []), | |
659 | 'status' : content['status'], |
|
663 | 'status' : content['status'], | |
660 | } |
|
664 | } | |
661 |
|
665 | |||
@@ -664,8 +668,8 b' class Client(HasTraits):' | |||||
664 |
|
668 | |||
665 | if 'date' in parent: |
|
669 | if 'date' in parent: | |
666 | md['submitted'] = parent['date'] |
|
670 | md['submitted'] = parent['date'] | |
667 |
if 'started' in |
|
671 | if 'started' in msg_meta: | |
668 |
md['started'] = |
|
672 | md['started'] = msg_meta['started'] | |
669 | if 'date' in header: |
|
673 | if 'date' in header: | |
670 | md['completed'] = header['date'] |
|
674 | md['completed'] = header['date'] | |
671 | return md |
|
675 | return md | |
@@ -738,7 +742,7 b' class Client(HasTraits):' | |||||
738 |
|
742 | |||
739 | # construct metadata: |
|
743 | # construct metadata: | |
740 | md = self.metadata[msg_id] |
|
744 | md = self.metadata[msg_id] | |
741 |
md.update(self._extract_metadata( |
|
745 | md.update(self._extract_metadata(msg)) | |
742 | # is this redundant? |
|
746 | # is this redundant? | |
743 | self.metadata[msg_id] = md |
|
747 | self.metadata[msg_id] = md | |
744 |
|
748 | |||
@@ -775,7 +779,7 b' class Client(HasTraits):' | |||||
775 |
|
779 | |||
776 | # construct metadata: |
|
780 | # construct metadata: | |
777 | md = self.metadata[msg_id] |
|
781 | md = self.metadata[msg_id] | |
778 |
md.update(self._extract_metadata( |
|
782 | md.update(self._extract_metadata(msg)) | |
779 | # is this redundant? |
|
783 | # is this redundant? | |
780 | self.metadata[msg_id] = md |
|
784 | self.metadata[msg_id] = md | |
781 |
|
785 | |||
@@ -1201,7 +1205,7 b' class Client(HasTraits):' | |||||
1201 |
|
1205 | |||
1202 | return result |
|
1206 | return result | |
1203 |
|
1207 | |||
1204 |
def send_apply_request(self, socket, f, args=None, kwargs=None, |
|
1208 | def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False, | |
1205 | ident=None): |
|
1209 | ident=None): | |
1206 | """construct and send an apply message via a socket. |
|
1210 | """construct and send an apply message via a socket. | |
1207 |
|
1211 | |||
@@ -1214,7 +1218,7 b' class Client(HasTraits):' | |||||
1214 | # defaults: |
|
1218 | # defaults: | |
1215 | args = args if args is not None else [] |
|
1219 | args = args if args is not None else [] | |
1216 | kwargs = kwargs if kwargs is not None else {} |
|
1220 | kwargs = kwargs if kwargs is not None else {} | |
1217 |
|
|
1221 | metadata = metadata if metadata is not None else {} | |
1218 |
|
1222 | |||
1219 | # validate arguments |
|
1223 | # validate arguments | |
1220 | if not callable(f) and not isinstance(f, Reference): |
|
1224 | if not callable(f) and not isinstance(f, Reference): | |
@@ -1223,13 +1227,13 b' class Client(HasTraits):' | |||||
1223 | raise TypeError("args must be tuple or list, not %s"%type(args)) |
|
1227 | raise TypeError("args must be tuple or list, not %s"%type(args)) | |
1224 | if not isinstance(kwargs, dict): |
|
1228 | if not isinstance(kwargs, dict): | |
1225 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
1229 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |
1226 |
if not isinstance( |
|
1230 | if not isinstance(metadata, dict): | |
1227 |
raise TypeError(" |
|
1231 | raise TypeError("metadata must be dict, not %s"%type(metadata)) | |
1228 |
|
1232 | |||
1229 | bufs = util.pack_apply_message(f,args,kwargs) |
|
1233 | bufs = util.pack_apply_message(f,args,kwargs) | |
1230 |
|
1234 | |||
1231 | msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, |
|
1235 | msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, | |
1232 |
|
|
1236 | metadata=metadata, track=track) | |
1233 |
|
1237 | |||
1234 | msg_id = msg['header']['msg_id'] |
|
1238 | msg_id = msg['header']['msg_id'] | |
1235 | self.outstanding.add(msg_id) |
|
1239 | self.outstanding.add(msg_id) | |
@@ -1245,7 +1249,7 b' class Client(HasTraits):' | |||||
1245 |
|
1249 | |||
1246 | return msg |
|
1250 | return msg | |
1247 |
|
1251 | |||
1248 |
def send_execute_request(self, socket, code, silent=True, |
|
1252 | def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None): | |
1249 | """construct and send an execute request via a socket. |
|
1253 | """construct and send an execute request via a socket. | |
1250 |
|
1254 | |||
1251 | """ |
|
1255 | """ | |
@@ -1254,19 +1258,19 b' class Client(HasTraits):' | |||||
1254 | raise RuntimeError("Client cannot be used after its sockets have been closed") |
|
1258 | raise RuntimeError("Client cannot be used after its sockets have been closed") | |
1255 |
|
1259 | |||
1256 | # defaults: |
|
1260 | # defaults: | |
1257 |
|
|
1261 | metadata = metadata if metadata is not None else {} | |
1258 |
|
1262 | |||
1259 | # validate arguments |
|
1263 | # validate arguments | |
1260 | if not isinstance(code, basestring): |
|
1264 | if not isinstance(code, basestring): | |
1261 | raise TypeError("code must be text, not %s" % type(code)) |
|
1265 | raise TypeError("code must be text, not %s" % type(code)) | |
1262 |
if not isinstance( |
|
1266 | if not isinstance(metadata, dict): | |
1263 |
raise TypeError(" |
|
1267 | raise TypeError("metadata must be dict, not %s" % type(metadata)) | |
1264 |
|
1268 | |||
1265 | content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) |
|
1269 | content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) | |
1266 |
|
1270 | |||
1267 |
|
1271 | |||
1268 | msg = self.session.send(socket, "execute_request", content=content, ident=ident, |
|
1272 | msg = self.session.send(socket, "execute_request", content=content, ident=ident, | |
1269 |
|
|
1273 | metadata=metadata) | |
1270 |
|
1274 | |||
1271 | msg_id = msg['header']['msg_id'] |
|
1275 | msg_id = msg['header']['msg_id'] | |
1272 | self.outstanding.add(msg_id) |
|
1276 | self.outstanding.add(msg_id) | |
@@ -1401,7 +1405,7 b' class Client(HasTraits):' | |||||
1401 | return ar |
|
1405 | return ar | |
1402 |
|
1406 | |||
1403 | @spin_first |
|
1407 | @spin_first | |
1404 |
def resubmit(self, indices_or_msg_ids=None, |
|
1408 | def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None): | |
1405 | """Resubmit one or more tasks. |
|
1409 | """Resubmit one or more tasks. | |
1406 |
|
1410 | |||
1407 | in-flight tasks may not be resubmitted. |
|
1411 | in-flight tasks may not be resubmitted. | |
@@ -1539,7 +1543,13 b' class Client(HasTraits):' | |||||
1539 | rcontent = self.session.unpack(rcontent) |
|
1543 | rcontent = self.session.unpack(rcontent) | |
1540 |
|
1544 | |||
1541 | md = self.metadata[msg_id] |
|
1545 | md = self.metadata[msg_id] | |
1542 | md.update(self._extract_metadata(header, parent, rcontent)) |
|
1546 | md_msg = dict( | |
|
1547 | content=rcontent, | |||
|
1548 | parent_header=parent, | |||
|
1549 | header=header, | |||
|
1550 | metadata=rec['result_metadata'], | |||
|
1551 | ) | |||
|
1552 | md.update(self._extract_metadata(md_msg)) | |||
1543 | if rec.get('received'): |
|
1553 | if rec.get('received'): | |
1544 | md['received'] = rec['received'] |
|
1554 | md['received'] = rec['received'] | |
1545 | md.update(iodict) |
|
1555 | md.update(iodict) |
@@ -1022,10 +1022,10 b' class LoadBalancedView(View):' | |||||
1022 |
|
1022 | |||
1023 | after = self._render_dependency(after) |
|
1023 | after = self._render_dependency(after) | |
1024 | follow = self._render_dependency(follow) |
|
1024 | follow = self._render_dependency(follow) | |
1025 |
|
|
1025 | metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) | |
1026 |
|
1026 | |||
1027 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, |
|
1027 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, | |
1028 |
|
|
1028 | metadata=metadata) | |
1029 | tracker = None if track is False else msg['tracker'] |
|
1029 | tracker = None if track is False else msg['tracker'] | |
1030 |
|
1030 | |||
1031 | ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker) |
|
1031 | ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker) |
@@ -56,6 +56,7 b' def empty_record():' | |||||
56 | return { |
|
56 | return { | |
57 | 'msg_id' : None, |
|
57 | 'msg_id' : None, | |
58 | 'header' : None, |
|
58 | 'header' : None, | |
|
59 | 'metadata' : None, | |||
59 | 'content': None, |
|
60 | 'content': None, | |
60 | 'buffers': None, |
|
61 | 'buffers': None, | |
61 | 'submitted': None, |
|
62 | 'submitted': None, | |
@@ -66,6 +67,7 b' def empty_record():' | |||||
66 | 'resubmitted': None, |
|
67 | 'resubmitted': None, | |
67 | 'received': None, |
|
68 | 'received': None, | |
68 | 'result_header' : None, |
|
69 | 'result_header' : None, | |
|
70 | 'result_metadata' : None, | |||
69 | 'result_content' : None, |
|
71 | 'result_content' : None, | |
70 | 'result_buffers' : None, |
|
72 | 'result_buffers' : None, | |
71 | 'queue' : None, |
|
73 | 'queue' : None, | |
@@ -83,6 +85,7 b' def init_record(msg):' | |||||
83 | 'msg_id' : header['msg_id'], |
|
85 | 'msg_id' : header['msg_id'], | |
84 | 'header' : header, |
|
86 | 'header' : header, | |
85 | 'content': msg['content'], |
|
87 | 'content': msg['content'], | |
|
88 | 'metadata': msg['metadata'], | |||
86 | 'buffers': msg['buffers'], |
|
89 | 'buffers': msg['buffers'], | |
87 | 'submitted': header['date'], |
|
90 | 'submitted': header['date'], | |
88 | 'client_uuid' : None, |
|
91 | 'client_uuid' : None, | |
@@ -92,6 +95,7 b' def init_record(msg):' | |||||
92 | 'resubmitted': None, |
|
95 | 'resubmitted': None, | |
93 | 'received': None, |
|
96 | 'received': None, | |
94 | 'result_header' : None, |
|
97 | 'result_header' : None, | |
|
98 | 'result_metadata': None, | |||
95 | 'result_content' : None, |
|
99 | 'result_content' : None, | |
96 | 'result_buffers' : None, |
|
100 | 'result_buffers' : None, | |
97 | 'queue' : None, |
|
101 | 'queue' : None, | |
@@ -646,10 +650,12 b' class Hub(SessionFactory):' | |||||
646 | return |
|
650 | return | |
647 | # update record anyway, because the unregistration could have been premature |
|
651 | # update record anyway, because the unregistration could have been premature | |
648 | rheader = msg['header'] |
|
652 | rheader = msg['header'] | |
|
653 | md = msg['metadata'] | |||
649 | completed = rheader['date'] |
|
654 | completed = rheader['date'] | |
650 |
started = |
|
655 | started = md.get('started', None) | |
651 | result = { |
|
656 | result = { | |
652 | 'result_header' : rheader, |
|
657 | 'result_header' : rheader, | |
|
658 | 'result_metadata': md, | |||
653 | 'result_content': msg['content'], |
|
659 | 'result_content': msg['content'], | |
654 | 'received': datetime.now(), |
|
660 | 'received': datetime.now(), | |
655 | 'started' : started, |
|
661 | 'started' : started, | |
@@ -736,10 +742,11 b' class Hub(SessionFactory):' | |||||
736 | self.unassigned.remove(msg_id) |
|
742 | self.unassigned.remove(msg_id) | |
737 |
|
743 | |||
738 | header = msg['header'] |
|
744 | header = msg['header'] | |
739 | engine_uuid = header.get('engine', u'') |
|
745 | md = msg['metadata'] | |
|
746 | engine_uuid = md.get('engine', u'') | |||
740 | eid = self.by_ident.get(cast_bytes(engine_uuid), None) |
|
747 | eid = self.by_ident.get(cast_bytes(engine_uuid), None) | |
741 |
|
748 | |||
742 |
status = |
|
749 | status = md.get('status', None) | |
743 |
|
750 | |||
744 | if msg_id in self.pending: |
|
751 | if msg_id in self.pending: | |
745 | self.log.info("task::task %r finished on %s", msg_id, eid) |
|
752 | self.log.info("task::task %r finished on %s", msg_id, eid) | |
@@ -751,9 +758,10 b' class Hub(SessionFactory):' | |||||
751 | if msg_id in self.tasks[eid]: |
|
758 | if msg_id in self.tasks[eid]: | |
752 | self.tasks[eid].remove(msg_id) |
|
759 | self.tasks[eid].remove(msg_id) | |
753 | completed = header['date'] |
|
760 | completed = header['date'] | |
754 |
started = |
|
761 | started = md.get('started', None) | |
755 | result = { |
|
762 | result = { | |
756 | 'result_header' : header, |
|
763 | 'result_header' : header, | |
|
764 | 'result_metadata': msg['metadata'], | |||
757 | 'result_content': msg['content'], |
|
765 | 'result_content': msg['content'], | |
758 | 'started' : started, |
|
766 | 'started' : started, | |
759 | 'completed' : completed, |
|
767 | 'completed' : completed, | |
@@ -1221,12 +1229,15 b' class Hub(SessionFactory):' | |||||
1221 | io_dict = {} |
|
1229 | io_dict = {} | |
1222 | for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'): |
|
1230 | for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'): | |
1223 | io_dict[key] = rec[key] |
|
1231 | io_dict[key] = rec[key] | |
1224 | content = { 'result_content': rec['result_content'], |
|
1232 | content = { | |
1225 |
|
|
1233 | 'header': rec['header'], | |
1226 | 'result_header' : rec['result_header'], |
|
1234 | 'metadata': rec['metadata'], | |
1227 | 'received' : rec['received'], |
|
1235 | 'result_metadata': rec['result_metadata'], | |
1228 | 'io' : io_dict, |
|
1236 | 'result_header' : rec['result_header'], | |
1229 | } |
|
1237 | 'result_content': rec['result_content'], | |
|
1238 | 'received' : rec['received'], | |||
|
1239 | 'io' : io_dict, | |||
|
1240 | } | |||
1230 | if rec['result_buffers']: |
|
1241 | if rec['result_buffers']: | |
1231 | buffers = map(bytes, rec['result_buffers']) |
|
1242 | buffers = map(bytes, rec['result_buffers']) | |
1232 | else: |
|
1243 | else: |
@@ -129,12 +129,14 b' MET = Dependency([])' | |||||
129 |
|
129 | |||
130 | class Job(object): |
|
130 | class Job(object): | |
131 | """Simple container for a job""" |
|
131 | """Simple container for a job""" | |
132 |
def __init__(self, msg_id, raw_msg, idents, msg, header, |
|
132 | def __init__(self, msg_id, raw_msg, idents, msg, header, metadata, | |
|
133 | targets, after, follow, timeout): | |||
133 | self.msg_id = msg_id |
|
134 | self.msg_id = msg_id | |
134 | self.raw_msg = raw_msg |
|
135 | self.raw_msg = raw_msg | |
135 | self.idents = idents |
|
136 | self.idents = idents | |
136 | self.msg = msg |
|
137 | self.msg = msg | |
137 | self.header = header |
|
138 | self.header = header | |
|
139 | self.metadata = metadata | |||
138 | self.targets = targets |
|
140 | self.targets = targets | |
139 | self.after = after |
|
141 | self.after = after | |
140 | self.follow = follow |
|
142 | self.follow = follow | |
@@ -327,13 +329,13 b' class TaskScheduler(SessionFactory):' | |||||
327 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) |
|
329 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | |
328 | except: |
|
330 | except: | |
329 | content = error.wrap_exception() |
|
331 | content = error.wrap_exception() | |
330 |
# build fake |
|
332 | # build fake metadata | |
331 |
|
|
333 | md = dict( | |
332 | status='error', |
|
334 | status=u'error', | |
333 | engine=engine, |
|
335 | engine=engine, | |
334 | date=datetime.now(), |
|
336 | date=datetime.now(), | |
335 | ) |
|
337 | ) | |
336 |
msg = self.session.msg('apply_reply', content, parent=parent, |
|
338 | msg = self.session.msg('apply_reply', content, parent=parent, metadata=md) | |
337 | raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) |
|
339 | raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) | |
338 | # and dispatch it |
|
340 | # and dispatch it | |
339 | self.dispatch_result(raw_reply) |
|
341 | self.dispatch_result(raw_reply) | |
@@ -365,20 +367,21 b' class TaskScheduler(SessionFactory):' | |||||
365 | self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) |
|
367 | self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) | |
366 |
|
368 | |||
367 | header = msg['header'] |
|
369 | header = msg['header'] | |
|
370 | md = msg['metadata'] | |||
368 | msg_id = header['msg_id'] |
|
371 | msg_id = header['msg_id'] | |
369 | self.all_ids.add(msg_id) |
|
372 | self.all_ids.add(msg_id) | |
370 |
|
373 | |||
371 | # get targets as a set of bytes objects |
|
374 | # get targets as a set of bytes objects | |
372 | # from a list of unicode objects |
|
375 | # from a list of unicode objects | |
373 |
targets = |
|
376 | targets = md.get('targets', []) | |
374 | targets = map(cast_bytes, targets) |
|
377 | targets = map(cast_bytes, targets) | |
375 | targets = set(targets) |
|
378 | targets = set(targets) | |
376 |
|
379 | |||
377 |
retries = |
|
380 | retries = md.get('retries', 0) | |
378 | self.retries[msg_id] = retries |
|
381 | self.retries[msg_id] = retries | |
379 |
|
382 | |||
380 | # time dependencies |
|
383 | # time dependencies | |
381 |
after = |
|
384 | after = md.get('after', None) | |
382 | if after: |
|
385 | if after: | |
383 | after = Dependency(after) |
|
386 | after = Dependency(after) | |
384 | if after.all: |
|
387 | if after.all: | |
@@ -402,10 +405,10 b' class TaskScheduler(SessionFactory):' | |||||
402 | after = MET |
|
405 | after = MET | |
403 |
|
406 | |||
404 | # location dependencies |
|
407 | # location dependencies | |
405 |
follow = Dependency( |
|
408 | follow = Dependency(md.get('follow', [])) | |
406 |
|
409 | |||
407 | # turn timeouts into datetime objects: |
|
410 | # turn timeouts into datetime objects: | |
408 |
timeout = |
|
411 | timeout = md.get('timeout', None) | |
409 | if timeout: |
|
412 | if timeout: | |
410 | # cast to float, because jsonlib returns floats as decimal.Decimal, |
|
413 | # cast to float, because jsonlib returns floats as decimal.Decimal, | |
411 | # which timedelta does not accept |
|
414 | # which timedelta does not accept | |
@@ -413,7 +416,7 b' class TaskScheduler(SessionFactory):' | |||||
413 |
|
416 | |||
414 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, |
|
417 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, | |
415 | header=header, targets=targets, after=after, follow=follow, |
|
418 | header=header, targets=targets, after=after, follow=follow, | |
416 | timeout=timeout, |
|
419 | timeout=timeout, metadata=md, | |
417 | ) |
|
420 | ) | |
418 |
|
421 | |||
419 | # validate and reduce dependencies: |
|
422 | # validate and reduce dependencies: | |
@@ -585,10 +588,10 b' class TaskScheduler(SessionFactory):' | |||||
585 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) |
|
588 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) | |
586 | return |
|
589 | return | |
587 |
|
590 | |||
588 |
|
|
591 | md = msg['metadata'] | |
589 | parent = msg['parent_header'] |
|
592 | parent = msg['parent_header'] | |
590 |
if |
|
593 | if md.get('dependencies_met', True): | |
591 |
success = ( |
|
594 | success = (md['status'] == 'ok') | |
592 | msg_id = parent['msg_id'] |
|
595 | msg_id = parent['msg_id'] | |
593 | retries = self.retries[msg_id] |
|
596 | retries = self.retries[msg_id] | |
594 | if not success and retries > 0: |
|
597 | if not success and retries > 0: |
@@ -109,6 +109,7 b' class SQLiteDB(BaseDB):' | |||||
109 | # the ordered list of column names |
|
109 | # the ordered list of column names | |
110 | _keys = List(['msg_id' , |
|
110 | _keys = List(['msg_id' , | |
111 | 'header' , |
|
111 | 'header' , | |
|
112 | 'metadata', | |||
112 | 'content', |
|
113 | 'content', | |
113 | 'buffers', |
|
114 | 'buffers', | |
114 | 'submitted', |
|
115 | 'submitted', | |
@@ -119,6 +120,7 b' class SQLiteDB(BaseDB):' | |||||
119 | 'resubmitted', |
|
120 | 'resubmitted', | |
120 | 'received', |
|
121 | 'received', | |
121 | 'result_header' , |
|
122 | 'result_header' , | |
|
123 | 'result_metadata', | |||
122 | 'result_content' , |
|
124 | 'result_content' , | |
123 | 'result_buffers' , |
|
125 | 'result_buffers' , | |
124 | 'queue' , |
|
126 | 'queue' , | |
@@ -131,6 +133,7 b' class SQLiteDB(BaseDB):' | |||||
131 | # sqlite datatypes for checking that db is current format |
|
133 | # sqlite datatypes for checking that db is current format | |
132 | _types = Dict({'msg_id' : 'text' , |
|
134 | _types = Dict({'msg_id' : 'text' , | |
133 | 'header' : 'dict text', |
|
135 | 'header' : 'dict text', | |
|
136 | 'metadata' : 'dict text', | |||
134 | 'content' : 'dict text', |
|
137 | 'content' : 'dict text', | |
135 | 'buffers' : 'bufs blob', |
|
138 | 'buffers' : 'bufs blob', | |
136 | 'submitted' : 'timestamp', |
|
139 | 'submitted' : 'timestamp', | |
@@ -141,6 +144,7 b' class SQLiteDB(BaseDB):' | |||||
141 | 'resubmitted' : 'text', |
|
144 | 'resubmitted' : 'text', | |
142 | 'received' : 'timestamp', |
|
145 | 'received' : 'timestamp', | |
143 | 'result_header' : 'dict text', |
|
146 | 'result_header' : 'dict text', | |
|
147 | 'result_metadata' : 'dict text', | |||
144 | 'result_content' : 'dict text', |
|
148 | 'result_content' : 'dict text', | |
145 | 'result_buffers' : 'bufs blob', |
|
149 | 'result_buffers' : 'bufs blob', | |
146 | 'queue' : 'text', |
|
150 | 'queue' : 'text', | |
@@ -240,6 +244,7 b' class SQLiteDB(BaseDB):' | |||||
240 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s |
|
244 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s | |
241 | (msg_id text PRIMARY KEY, |
|
245 | (msg_id text PRIMARY KEY, | |
242 | header dict text, |
|
246 | header dict text, | |
|
247 | metadata dict text, | |||
243 | content dict text, |
|
248 | content dict text, | |
244 | buffers bufs blob, |
|
249 | buffers bufs blob, | |
245 | submitted timestamp, |
|
250 | submitted timestamp, | |
@@ -250,6 +255,7 b' class SQLiteDB(BaseDB):' | |||||
250 | resubmitted text, |
|
255 | resubmitted text, | |
251 | received timestamp, |
|
256 | received timestamp, | |
252 | result_header dict text, |
|
257 | result_header dict text, | |
|
258 | result_metadata dict text, | |||
253 | result_content dict text, |
|
259 | result_content dict text, | |
254 | result_buffers bufs blob, |
|
260 | result_buffers bufs blob, | |
255 | queue text, |
|
261 | queue text, |
@@ -218,9 +218,9 b' class Kernel(Configurable):' | |||||
218 | # is it safe to assume a msg_id will not be resubmitted? |
|
218 | # is it safe to assume a msg_id will not be resubmitted? | |
219 | reply_type = msg_type.split('_')[0] + '_reply' |
|
219 | reply_type = msg_type.split('_')[0] + '_reply' | |
220 | status = {'status' : 'aborted'} |
|
220 | status = {'status' : 'aborted'} | |
221 |
|
|
221 | md = {'engine' : self.ident} | |
222 |
|
|
222 | md.update(status) | |
223 |
reply_msg = self.session.send(stream, reply_type, |
|
223 | reply_msg = self.session.send(stream, reply_type, metadata=md, | |
224 | content=status, parent=msg, ident=idents) |
|
224 | content=status, parent=msg, ident=idents) | |
225 | return |
|
225 | return | |
226 |
|
226 | |||
@@ -293,13 +293,16 b' class Kernel(Configurable):' | |||||
293 | # Kernel request handlers |
|
293 | # Kernel request handlers | |
294 | #--------------------------------------------------------------------------- |
|
294 | #--------------------------------------------------------------------------- | |
295 |
|
295 | |||
296 |
def _make_ |
|
296 | def _make_metadata(self, other=None): | |
297 |
"""init |
|
297 | """init metadata dict, for execute/apply_reply""" | |
298 |
|
|
298 | new_md = { | |
299 | 'dependencies_met' : True, |
|
299 | 'dependencies_met' : True, | |
300 | 'engine' : self.ident, |
|
300 | 'engine' : self.ident, | |
301 | 'started': datetime.now(), |
|
301 | 'started': datetime.now(), | |
302 | } |
|
302 | } | |
|
303 | if other: | |||
|
304 | new_md.update(other) | |||
|
305 | return new_md | |||
303 |
|
306 | |||
304 | def _publish_pyin(self, code, parent, execution_count): |
|
307 | def _publish_pyin(self, code, parent, execution_count): | |
305 | """Publish the code request on the pyin stream.""" |
|
308 | """Publish the code request on the pyin stream.""" | |
@@ -333,7 +336,7 b' class Kernel(Configurable):' | |||||
333 | self.log.error("%s", parent) |
|
336 | self.log.error("%s", parent) | |
334 | return |
|
337 | return | |
335 |
|
338 | |||
336 |
|
|
339 | md = self._make_metadata(parent['metadata']) | |
337 |
|
340 | |||
338 | shell = self.shell # we'll need this a lot here |
|
341 | shell = self.shell # we'll need this a lot here | |
339 |
|
342 | |||
@@ -425,13 +428,13 b' class Kernel(Configurable):' | |||||
425 | # Send the reply. |
|
428 | # Send the reply. | |
426 | reply_content = json_clean(reply_content) |
|
429 | reply_content = json_clean(reply_content) | |
427 |
|
430 | |||
428 |
|
|
431 | md['status'] = reply_content['status'] | |
429 | if reply_content['status'] == 'error' and \ |
|
432 | if reply_content['status'] == 'error' and \ | |
430 | reply_content['ename'] == 'UnmetDependency': |
|
433 | reply_content['ename'] == 'UnmetDependency': | |
431 |
|
|
434 | md['dependencies_met'] = False | |
432 |
|
435 | |||
433 | reply_msg = self.session.send(stream, u'execute_reply', |
|
436 | reply_msg = self.session.send(stream, u'execute_reply', | |
434 |
reply_content, parent, |
|
437 | reply_content, parent, metadata=md, | |
435 | ident=ident) |
|
438 | ident=ident) | |
436 |
|
439 | |||
437 | self.log.debug("%s", reply_msg) |
|
440 | self.log.debug("%s", reply_msg) | |
@@ -543,7 +546,7 b' class Kernel(Configurable):' | |||||
543 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
546 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
544 | # self.iopub_socket.send(pyin_msg) |
|
547 | # self.iopub_socket.send(pyin_msg) | |
545 | # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) |
|
548 | # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) | |
546 |
|
|
549 | md = self._make_metadata(parent['metadata']) | |
547 | try: |
|
550 | try: | |
548 | working = shell.user_ns |
|
551 | working = shell.user_ns | |
549 |
|
552 | |||
@@ -589,19 +592,19 b' class Kernel(Configurable):' | |||||
589 | result_buf = [] |
|
592 | result_buf = [] | |
590 |
|
593 | |||
591 | if reply_content['ename'] == 'UnmetDependency': |
|
594 | if reply_content['ename'] == 'UnmetDependency': | |
592 |
|
|
595 | md['dependencies_met'] = False | |
593 | else: |
|
596 | else: | |
594 | reply_content = {'status' : 'ok'} |
|
597 | reply_content = {'status' : 'ok'} | |
595 |
|
598 | |||
596 | # put 'ok'/'error' status in header, for scheduler introspection: |
|
599 | # put 'ok'/'error' status in header, for scheduler introspection: | |
597 |
|
|
600 | md['status'] = reply_content['status'] | |
598 |
|
601 | |||
599 | # flush i/o |
|
602 | # flush i/o | |
600 | sys.stdout.flush() |
|
603 | sys.stdout.flush() | |
601 | sys.stderr.flush() |
|
604 | sys.stderr.flush() | |
602 |
|
605 | |||
603 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
606 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |
604 |
parent=parent, ident=ident,buffers=result_buf, |
|
607 | parent=parent, ident=ident,buffers=result_buf, metadata=md) | |
605 |
|
608 | |||
606 | self._publish_status(u'idle', parent) |
|
609 | self._publish_status(u'idle', parent) | |
607 |
|
610 | |||
@@ -672,9 +675,9 b' class Kernel(Configurable):' | |||||
672 | reply_type = msg_type.split('_')[0] + '_reply' |
|
675 | reply_type = msg_type.split('_')[0] + '_reply' | |
673 |
|
676 | |||
674 | status = {'status' : 'aborted'} |
|
677 | status = {'status' : 'aborted'} | |
675 |
|
|
678 | md = {'engine' : self.ident} | |
676 |
|
|
679 | md.update(status) | |
677 |
reply_msg = self.session.send(stream, reply_type, |
|
680 | reply_msg = self.session.send(stream, reply_type, meatadata=md, | |
678 | content=status, parent=msg, ident=idents) |
|
681 | content=status, parent=msg, ident=idents) | |
679 | self.log.debug("%s", reply_msg) |
|
682 | self.log.debug("%s", reply_msg) | |
680 | # We need to wait a bit for requests to come in. This can probably |
|
683 | # We need to wait a bit for requests to come in. This can probably |
General Comments 0
You need to be logged in to leave comments.
Login now