##// END OF EJS Templates
migrate subheader usage to new metadata
MinRK -
Show More
@@ -650,12 +650,16 b' class Client(HasTraits):'
650 e.engine_info['engine_id'] = eid
650 e.engine_info['engine_id'] = eid
651 return e
651 return e
652
652
653 def _extract_metadata(self, header, parent, content):
653 def _extract_metadata(self, msg):
654 header = msg['header']
655 parent = msg['parent_header']
656 msg_meta = msg['metadata']
657 content = msg['content']
654 md = {'msg_id' : parent['msg_id'],
658 md = {'msg_id' : parent['msg_id'],
655 'received' : datetime.now(),
659 'received' : datetime.now(),
656 'engine_uuid' : header.get('engine', None),
660 'engine_uuid' : msg_meta.get('engine', None),
657 'follow' : parent.get('follow', []),
661 'follow' : msg_meta.get('follow', []),
658 'after' : parent.get('after', []),
662 'after' : msg_meta.get('after', []),
659 'status' : content['status'],
663 'status' : content['status'],
660 }
664 }
661
665
@@ -664,8 +668,8 b' class Client(HasTraits):'
664
668
665 if 'date' in parent:
669 if 'date' in parent:
666 md['submitted'] = parent['date']
670 md['submitted'] = parent['date']
667 if 'started' in header:
671 if 'started' in msg_meta:
668 md['started'] = header['started']
672 md['started'] = msg_meta['started']
669 if 'date' in header:
673 if 'date' in header:
670 md['completed'] = header['date']
674 md['completed'] = header['date']
671 return md
675 return md
@@ -738,7 +742,7 b' class Client(HasTraits):'
738
742
739 # construct metadata:
743 # construct metadata:
740 md = self.metadata[msg_id]
744 md = self.metadata[msg_id]
741 md.update(self._extract_metadata(header, parent, content))
745 md.update(self._extract_metadata(msg))
742 # is this redundant?
746 # is this redundant?
743 self.metadata[msg_id] = md
747 self.metadata[msg_id] = md
744
748
@@ -775,7 +779,7 b' class Client(HasTraits):'
775
779
776 # construct metadata:
780 # construct metadata:
777 md = self.metadata[msg_id]
781 md = self.metadata[msg_id]
778 md.update(self._extract_metadata(header, parent, content))
782 md.update(self._extract_metadata(msg))
779 # is this redundant?
783 # is this redundant?
780 self.metadata[msg_id] = md
784 self.metadata[msg_id] = md
781
785
@@ -1201,7 +1205,7 b' class Client(HasTraits):'
1201
1205
1202 return result
1206 return result
1203
1207
1204 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1208 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1205 ident=None):
1209 ident=None):
1206 """construct and send an apply message via a socket.
1210 """construct and send an apply message via a socket.
1207
1211
@@ -1214,7 +1218,7 b' class Client(HasTraits):'
1214 # defaults:
1218 # defaults:
1215 args = args if args is not None else []
1219 args = args if args is not None else []
1216 kwargs = kwargs if kwargs is not None else {}
1220 kwargs = kwargs if kwargs is not None else {}
1217 subheader = subheader if subheader is not None else {}
1221 metadata = metadata if metadata is not None else {}
1218
1222
1219 # validate arguments
1223 # validate arguments
1220 if not callable(f) and not isinstance(f, Reference):
1224 if not callable(f) and not isinstance(f, Reference):
@@ -1223,13 +1227,13 b' class Client(HasTraits):'
1223 raise TypeError("args must be tuple or list, not %s"%type(args))
1227 raise TypeError("args must be tuple or list, not %s"%type(args))
1224 if not isinstance(kwargs, dict):
1228 if not isinstance(kwargs, dict):
1225 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1229 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1226 if not isinstance(subheader, dict):
1230 if not isinstance(metadata, dict):
1227 raise TypeError("subheader must be dict, not %s"%type(subheader))
1231 raise TypeError("metadata must be dict, not %s"%type(metadata))
1228
1232
1229 bufs = util.pack_apply_message(f,args,kwargs)
1233 bufs = util.pack_apply_message(f,args,kwargs)
1230
1234
1231 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1235 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1232 subheader=subheader, track=track)
1236 metadata=metadata, track=track)
1233
1237
1234 msg_id = msg['header']['msg_id']
1238 msg_id = msg['header']['msg_id']
1235 self.outstanding.add(msg_id)
1239 self.outstanding.add(msg_id)
@@ -1245,7 +1249,7 b' class Client(HasTraits):'
1245
1249
1246 return msg
1250 return msg
1247
1251
1248 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1252 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1249 """construct and send an execute request via a socket.
1253 """construct and send an execute request via a socket.
1250
1254
1251 """
1255 """
@@ -1254,19 +1258,19 b' class Client(HasTraits):'
1254 raise RuntimeError("Client cannot be used after its sockets have been closed")
1258 raise RuntimeError("Client cannot be used after its sockets have been closed")
1255
1259
1256 # defaults:
1260 # defaults:
1257 subheader = subheader if subheader is not None else {}
1261 metadata = metadata if metadata is not None else {}
1258
1262
1259 # validate arguments
1263 # validate arguments
1260 if not isinstance(code, basestring):
1264 if not isinstance(code, basestring):
1261 raise TypeError("code must be text, not %s" % type(code))
1265 raise TypeError("code must be text, not %s" % type(code))
1262 if not isinstance(subheader, dict):
1266 if not isinstance(metadata, dict):
1263 raise TypeError("subheader must be dict, not %s" % type(subheader))
1267 raise TypeError("metadata must be dict, not %s" % type(metadata))
1264
1268
1265 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1269 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1266
1270
1267
1271
1268 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1272 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1269 subheader=subheader)
1273 metadata=metadata)
1270
1274
1271 msg_id = msg['header']['msg_id']
1275 msg_id = msg['header']['msg_id']
1272 self.outstanding.add(msg_id)
1276 self.outstanding.add(msg_id)
@@ -1401,7 +1405,7 b' class Client(HasTraits):'
1401 return ar
1405 return ar
1402
1406
1403 @spin_first
1407 @spin_first
1404 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1408 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1405 """Resubmit one or more tasks.
1409 """Resubmit one or more tasks.
1406
1410
1407 in-flight tasks may not be resubmitted.
1411 in-flight tasks may not be resubmitted.
@@ -1539,7 +1543,13 b' class Client(HasTraits):'
1539 rcontent = self.session.unpack(rcontent)
1543 rcontent = self.session.unpack(rcontent)
1540
1544
1541 md = self.metadata[msg_id]
1545 md = self.metadata[msg_id]
1542 md.update(self._extract_metadata(header, parent, rcontent))
1546 md_msg = dict(
1547 content=rcontent,
1548 parent_header=parent,
1549 header=header,
1550 metadata=rec['result_metadata'],
1551 )
1552 md.update(self._extract_metadata(md_msg))
1543 if rec.get('received'):
1553 if rec.get('received'):
1544 md['received'] = rec['received']
1554 md['received'] = rec['received']
1545 md.update(iodict)
1555 md.update(iodict)
@@ -1022,10 +1022,10 b' class LoadBalancedView(View):'
1022
1022
1023 after = self._render_dependency(after)
1023 after = self._render_dependency(after)
1024 follow = self._render_dependency(follow)
1024 follow = self._render_dependency(follow)
1025 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1025 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1026
1026
1027 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1027 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1028 subheader=subheader)
1028 metadata=metadata)
1029 tracker = None if track is False else msg['tracker']
1029 tracker = None if track is False else msg['tracker']
1030
1030
1031 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1031 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
@@ -56,6 +56,7 b' def empty_record():'
56 return {
56 return {
57 'msg_id' : None,
57 'msg_id' : None,
58 'header' : None,
58 'header' : None,
59 'metadata' : None,
59 'content': None,
60 'content': None,
60 'buffers': None,
61 'buffers': None,
61 'submitted': None,
62 'submitted': None,
@@ -66,6 +67,7 b' def empty_record():'
66 'resubmitted': None,
67 'resubmitted': None,
67 'received': None,
68 'received': None,
68 'result_header' : None,
69 'result_header' : None,
70 'result_metadata' : None,
69 'result_content' : None,
71 'result_content' : None,
70 'result_buffers' : None,
72 'result_buffers' : None,
71 'queue' : None,
73 'queue' : None,
@@ -83,6 +85,7 b' def init_record(msg):'
83 'msg_id' : header['msg_id'],
85 'msg_id' : header['msg_id'],
84 'header' : header,
86 'header' : header,
85 'content': msg['content'],
87 'content': msg['content'],
88 'metadata': msg['metadata'],
86 'buffers': msg['buffers'],
89 'buffers': msg['buffers'],
87 'submitted': header['date'],
90 'submitted': header['date'],
88 'client_uuid' : None,
91 'client_uuid' : None,
@@ -92,6 +95,7 b' def init_record(msg):'
92 'resubmitted': None,
95 'resubmitted': None,
93 'received': None,
96 'received': None,
94 'result_header' : None,
97 'result_header' : None,
98 'result_metadata': None,
95 'result_content' : None,
99 'result_content' : None,
96 'result_buffers' : None,
100 'result_buffers' : None,
97 'queue' : None,
101 'queue' : None,
@@ -646,10 +650,12 b' class Hub(SessionFactory):'
646 return
650 return
647 # update record anyway, because the unregistration could have been premature
651 # update record anyway, because the unregistration could have been premature
648 rheader = msg['header']
652 rheader = msg['header']
653 md = msg['metadata']
649 completed = rheader['date']
654 completed = rheader['date']
650 started = rheader.get('started', None)
655 started = md.get('started', None)
651 result = {
656 result = {
652 'result_header' : rheader,
657 'result_header' : rheader,
658 'result_metadata': md,
653 'result_content': msg['content'],
659 'result_content': msg['content'],
654 'received': datetime.now(),
660 'received': datetime.now(),
655 'started' : started,
661 'started' : started,
@@ -736,10 +742,11 b' class Hub(SessionFactory):'
736 self.unassigned.remove(msg_id)
742 self.unassigned.remove(msg_id)
737
743
738 header = msg['header']
744 header = msg['header']
739 engine_uuid = header.get('engine', u'')
745 md = msg['metadata']
746 engine_uuid = md.get('engine', u'')
740 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
747 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
741
748
742 status = header.get('status', None)
749 status = md.get('status', None)
743
750
744 if msg_id in self.pending:
751 if msg_id in self.pending:
745 self.log.info("task::task %r finished on %s", msg_id, eid)
752 self.log.info("task::task %r finished on %s", msg_id, eid)
@@ -751,9 +758,10 b' class Hub(SessionFactory):'
751 if msg_id in self.tasks[eid]:
758 if msg_id in self.tasks[eid]:
752 self.tasks[eid].remove(msg_id)
759 self.tasks[eid].remove(msg_id)
753 completed = header['date']
760 completed = header['date']
754 started = header.get('started', None)
761 started = md.get('started', None)
755 result = {
762 result = {
756 'result_header' : header,
763 'result_header' : header,
764 'result_metadata': msg['metadata'],
757 'result_content': msg['content'],
765 'result_content': msg['content'],
758 'started' : started,
766 'started' : started,
759 'completed' : completed,
767 'completed' : completed,
@@ -1221,12 +1229,15 b' class Hub(SessionFactory):'
1221 io_dict = {}
1229 io_dict = {}
1222 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1230 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1223 io_dict[key] = rec[key]
1231 io_dict[key] = rec[key]
1224 content = { 'result_content': rec['result_content'],
1232 content = {
1225 'header': rec['header'],
1233 'header': rec['header'],
1226 'result_header' : rec['result_header'],
1234 'metadata': rec['metadata'],
1227 'received' : rec['received'],
1235 'result_metadata': rec['result_metadata'],
1228 'io' : io_dict,
1236 'result_header' : rec['result_header'],
1229 }
1237 'result_content': rec['result_content'],
1238 'received' : rec['received'],
1239 'io' : io_dict,
1240 }
1230 if rec['result_buffers']:
1241 if rec['result_buffers']:
1231 buffers = map(bytes, rec['result_buffers'])
1242 buffers = map(bytes, rec['result_buffers'])
1232 else:
1243 else:
@@ -129,12 +129,14 b' MET = Dependency([])'
129
129
130 class Job(object):
130 class Job(object):
131 """Simple container for a job"""
131 """Simple container for a job"""
132 def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
132 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
133 targets, after, follow, timeout):
133 self.msg_id = msg_id
134 self.msg_id = msg_id
134 self.raw_msg = raw_msg
135 self.raw_msg = raw_msg
135 self.idents = idents
136 self.idents = idents
136 self.msg = msg
137 self.msg = msg
137 self.header = header
138 self.header = header
139 self.metadata = metadata
138 self.targets = targets
140 self.targets = targets
139 self.after = after
141 self.after = after
140 self.follow = follow
142 self.follow = follow
@@ -327,13 +329,13 b' class TaskScheduler(SessionFactory):'
327 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
329 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
328 except:
330 except:
329 content = error.wrap_exception()
331 content = error.wrap_exception()
330 # build fake header
332 # build fake metadata
331 header = dict(
333 md = dict(
332 status='error',
334 status=u'error',
333 engine=engine,
335 engine=engine,
334 date=datetime.now(),
336 date=datetime.now(),
335 )
337 )
336 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
338 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
337 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
339 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
338 # and dispatch it
340 # and dispatch it
339 self.dispatch_result(raw_reply)
341 self.dispatch_result(raw_reply)
@@ -365,20 +367,21 b' class TaskScheduler(SessionFactory):'
365 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
367 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
366
368
367 header = msg['header']
369 header = msg['header']
370 md = msg['metadata']
368 msg_id = header['msg_id']
371 msg_id = header['msg_id']
369 self.all_ids.add(msg_id)
372 self.all_ids.add(msg_id)
370
373
371 # get targets as a set of bytes objects
374 # get targets as a set of bytes objects
372 # from a list of unicode objects
375 # from a list of unicode objects
373 targets = header.get('targets', [])
376 targets = md.get('targets', [])
374 targets = map(cast_bytes, targets)
377 targets = map(cast_bytes, targets)
375 targets = set(targets)
378 targets = set(targets)
376
379
377 retries = header.get('retries', 0)
380 retries = md.get('retries', 0)
378 self.retries[msg_id] = retries
381 self.retries[msg_id] = retries
379
382
380 # time dependencies
383 # time dependencies
381 after = header.get('after', None)
384 after = md.get('after', None)
382 if after:
385 if after:
383 after = Dependency(after)
386 after = Dependency(after)
384 if after.all:
387 if after.all:
@@ -402,10 +405,10 b' class TaskScheduler(SessionFactory):'
402 after = MET
405 after = MET
403
406
404 # location dependencies
407 # location dependencies
405 follow = Dependency(header.get('follow', []))
408 follow = Dependency(md.get('follow', []))
406
409
407 # turn timeouts into datetime objects:
410 # turn timeouts into datetime objects:
408 timeout = header.get('timeout', None)
411 timeout = md.get('timeout', None)
409 if timeout:
412 if timeout:
410 # cast to float, because jsonlib returns floats as decimal.Decimal,
413 # cast to float, because jsonlib returns floats as decimal.Decimal,
411 # which timedelta does not accept
414 # which timedelta does not accept
@@ -413,7 +416,7 b' class TaskScheduler(SessionFactory):'
413
416
414 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
417 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
415 header=header, targets=targets, after=after, follow=follow,
418 header=header, targets=targets, after=after, follow=follow,
416 timeout=timeout,
419 timeout=timeout, metadata=md,
417 )
420 )
418
421
419 # validate and reduce dependencies:
422 # validate and reduce dependencies:
@@ -585,10 +588,10 b' class TaskScheduler(SessionFactory):'
585 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
588 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
586 return
589 return
587
590
588 header = msg['header']
591 md = msg['metadata']
589 parent = msg['parent_header']
592 parent = msg['parent_header']
590 if header.get('dependencies_met', True):
593 if md.get('dependencies_met', True):
591 success = (header['status'] == 'ok')
594 success = (md['status'] == 'ok')
592 msg_id = parent['msg_id']
595 msg_id = parent['msg_id']
593 retries = self.retries[msg_id]
596 retries = self.retries[msg_id]
594 if not success and retries > 0:
597 if not success and retries > 0:
@@ -109,6 +109,7 b' class SQLiteDB(BaseDB):'
109 # the ordered list of column names
109 # the ordered list of column names
110 _keys = List(['msg_id' ,
110 _keys = List(['msg_id' ,
111 'header' ,
111 'header' ,
112 'metadata',
112 'content',
113 'content',
113 'buffers',
114 'buffers',
114 'submitted',
115 'submitted',
@@ -119,6 +120,7 b' class SQLiteDB(BaseDB):'
119 'resubmitted',
120 'resubmitted',
120 'received',
121 'received',
121 'result_header' ,
122 'result_header' ,
123 'result_metadata',
122 'result_content' ,
124 'result_content' ,
123 'result_buffers' ,
125 'result_buffers' ,
124 'queue' ,
126 'queue' ,
@@ -131,6 +133,7 b' class SQLiteDB(BaseDB):'
131 # sqlite datatypes for checking that db is current format
133 # sqlite datatypes for checking that db is current format
132 _types = Dict({'msg_id' : 'text' ,
134 _types = Dict({'msg_id' : 'text' ,
133 'header' : 'dict text',
135 'header' : 'dict text',
136 'metadata' : 'dict text',
134 'content' : 'dict text',
137 'content' : 'dict text',
135 'buffers' : 'bufs blob',
138 'buffers' : 'bufs blob',
136 'submitted' : 'timestamp',
139 'submitted' : 'timestamp',
@@ -141,6 +144,7 b' class SQLiteDB(BaseDB):'
141 'resubmitted' : 'text',
144 'resubmitted' : 'text',
142 'received' : 'timestamp',
145 'received' : 'timestamp',
143 'result_header' : 'dict text',
146 'result_header' : 'dict text',
147 'result_metadata' : 'dict text',
144 'result_content' : 'dict text',
148 'result_content' : 'dict text',
145 'result_buffers' : 'bufs blob',
149 'result_buffers' : 'bufs blob',
146 'queue' : 'text',
150 'queue' : 'text',
@@ -240,6 +244,7 b' class SQLiteDB(BaseDB):'
240 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
244 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
241 (msg_id text PRIMARY KEY,
245 (msg_id text PRIMARY KEY,
242 header dict text,
246 header dict text,
247 metadata dict text,
243 content dict text,
248 content dict text,
244 buffers bufs blob,
249 buffers bufs blob,
245 submitted timestamp,
250 submitted timestamp,
@@ -250,6 +255,7 b' class SQLiteDB(BaseDB):'
250 resubmitted text,
255 resubmitted text,
251 received timestamp,
256 received timestamp,
252 result_header dict text,
257 result_header dict text,
258 result_metadata dict text,
253 result_content dict text,
259 result_content dict text,
254 result_buffers bufs blob,
260 result_buffers bufs blob,
255 queue text,
261 queue text,
@@ -218,9 +218,9 b' class Kernel(Configurable):'
218 # is it safe to assume a msg_id will not be resubmitted?
218 # is it safe to assume a msg_id will not be resubmitted?
219 reply_type = msg_type.split('_')[0] + '_reply'
219 reply_type = msg_type.split('_')[0] + '_reply'
220 status = {'status' : 'aborted'}
220 status = {'status' : 'aborted'}
221 sub = {'engine' : self.ident}
221 md = {'engine' : self.ident}
222 sub.update(status)
222 md.update(status)
223 reply_msg = self.session.send(stream, reply_type, subheader=sub,
223 reply_msg = self.session.send(stream, reply_type, metadata=md,
224 content=status, parent=msg, ident=idents)
224 content=status, parent=msg, ident=idents)
225 return
225 return
226
226
@@ -293,13 +293,16 b' class Kernel(Configurable):'
293 # Kernel request handlers
293 # Kernel request handlers
294 #---------------------------------------------------------------------------
294 #---------------------------------------------------------------------------
295
295
296 def _make_subheader(self):
296 def _make_metadata(self, other=None):
297 """init subheader dict, for execute/apply_reply"""
297 """init metadata dict, for execute/apply_reply"""
298 return {
298 new_md = {
299 'dependencies_met' : True,
299 'dependencies_met' : True,
300 'engine' : self.ident,
300 'engine' : self.ident,
301 'started': datetime.now(),
301 'started': datetime.now(),
302 }
302 }
303 if other:
304 new_md.update(other)
305 return new_md
303
306
304 def _publish_pyin(self, code, parent, execution_count):
307 def _publish_pyin(self, code, parent, execution_count):
305 """Publish the code request on the pyin stream."""
308 """Publish the code request on the pyin stream."""
@@ -333,7 +336,7 b' class Kernel(Configurable):'
333 self.log.error("%s", parent)
336 self.log.error("%s", parent)
334 return
337 return
335
338
336 sub = self._make_subheader()
339 md = self._make_metadata(parent['metadata'])
337
340
338 shell = self.shell # we'll need this a lot here
341 shell = self.shell # we'll need this a lot here
339
342
@@ -425,13 +428,13 b' class Kernel(Configurable):'
425 # Send the reply.
428 # Send the reply.
426 reply_content = json_clean(reply_content)
429 reply_content = json_clean(reply_content)
427
430
428 sub['status'] = reply_content['status']
431 md['status'] = reply_content['status']
429 if reply_content['status'] == 'error' and \
432 if reply_content['status'] == 'error' and \
430 reply_content['ename'] == 'UnmetDependency':
433 reply_content['ename'] == 'UnmetDependency':
431 sub['dependencies_met'] = False
434 md['dependencies_met'] = False
432
435
433 reply_msg = self.session.send(stream, u'execute_reply',
436 reply_msg = self.session.send(stream, u'execute_reply',
434 reply_content, parent, subheader=sub,
437 reply_content, parent, metadata=md,
435 ident=ident)
438 ident=ident)
436
439
437 self.log.debug("%s", reply_msg)
440 self.log.debug("%s", reply_msg)
@@ -543,7 +546,7 b' class Kernel(Configurable):'
543 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
546 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
544 # self.iopub_socket.send(pyin_msg)
547 # self.iopub_socket.send(pyin_msg)
545 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
548 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
546 sub = self._make_subheader()
549 md = self._make_metadata(parent['metadata'])
547 try:
550 try:
548 working = shell.user_ns
551 working = shell.user_ns
549
552
@@ -589,19 +592,19 b' class Kernel(Configurable):'
589 result_buf = []
592 result_buf = []
590
593
591 if reply_content['ename'] == 'UnmetDependency':
594 if reply_content['ename'] == 'UnmetDependency':
592 sub['dependencies_met'] = False
595 md['dependencies_met'] = False
593 else:
596 else:
594 reply_content = {'status' : 'ok'}
597 reply_content = {'status' : 'ok'}
595
598
596 # put 'ok'/'error' status in header, for scheduler introspection:
599 # put 'ok'/'error' status in header, for scheduler introspection:
597 sub['status'] = reply_content['status']
600 md['status'] = reply_content['status']
598
601
599 # flush i/o
602 # flush i/o
600 sys.stdout.flush()
603 sys.stdout.flush()
601 sys.stderr.flush()
604 sys.stderr.flush()
602
605
603 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
606 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
604 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
607 parent=parent, ident=ident,buffers=result_buf, metadata=md)
605
608
606 self._publish_status(u'idle', parent)
609 self._publish_status(u'idle', parent)
607
610
@@ -672,9 +675,9 b' class Kernel(Configurable):'
672 reply_type = msg_type.split('_')[0] + '_reply'
675 reply_type = msg_type.split('_')[0] + '_reply'
673
676
674 status = {'status' : 'aborted'}
677 status = {'status' : 'aborted'}
675 sub = {'engine' : self.ident}
678 md = {'engine' : self.ident}
676 sub.update(status)
679 md.update(status)
677 reply_msg = self.session.send(stream, reply_type, subheader=sub,
680 reply_msg = self.session.send(stream, reply_type, meatadata=md,
678 content=status, parent=msg, ident=idents)
681 content=status, parent=msg, ident=idents)
679 self.log.debug("%s", reply_msg)
682 self.log.debug("%s", reply_msg)
680 # We need to wait a bit for requests to come in. This can probably
683 # We need to wait a bit for requests to come in. This can probably
General Comments 0
You need to be logged in to leave comments. Login now