Show More
@@ -650,12 +650,16 class Client(HasTraits): | |||
|
650 | 650 | e.engine_info['engine_id'] = eid |
|
651 | 651 | return e |
|
652 | 652 | |
|
653 |
def _extract_metadata(self, |
|
|
653 | def _extract_metadata(self, msg): | |
|
654 | header = msg['header'] | |
|
655 | parent = msg['parent_header'] | |
|
656 | msg_meta = msg['metadata'] | |
|
657 | content = msg['content'] | |
|
654 | 658 | md = {'msg_id' : parent['msg_id'], |
|
655 | 659 | 'received' : datetime.now(), |
|
656 |
'engine_uuid' : |
|
|
657 |
'follow' : |
|
|
658 |
'after' : |
|
|
660 | 'engine_uuid' : msg_meta.get('engine', None), | |
|
661 | 'follow' : msg_meta.get('follow', []), | |
|
662 | 'after' : msg_meta.get('after', []), | |
|
659 | 663 | 'status' : content['status'], |
|
660 | 664 | } |
|
661 | 665 | |
@@ -664,8 +668,8 class Client(HasTraits): | |||
|
664 | 668 | |
|
665 | 669 | if 'date' in parent: |
|
666 | 670 | md['submitted'] = parent['date'] |
|
667 |
if 'started' in |
|
|
668 |
md['started'] = |
|
|
671 | if 'started' in msg_meta: | |
|
672 | md['started'] = msg_meta['started'] | |
|
669 | 673 | if 'date' in header: |
|
670 | 674 | md['completed'] = header['date'] |
|
671 | 675 | return md |
@@ -738,7 +742,7 class Client(HasTraits): | |||
|
738 | 742 | |
|
739 | 743 | # construct metadata: |
|
740 | 744 | md = self.metadata[msg_id] |
|
741 |
md.update(self._extract_metadata( |
|
|
745 | md.update(self._extract_metadata(msg)) | |
|
742 | 746 | # is this redundant? |
|
743 | 747 | self.metadata[msg_id] = md |
|
744 | 748 | |
@@ -775,7 +779,7 class Client(HasTraits): | |||
|
775 | 779 | |
|
776 | 780 | # construct metadata: |
|
777 | 781 | md = self.metadata[msg_id] |
|
778 |
md.update(self._extract_metadata( |
|
|
782 | md.update(self._extract_metadata(msg)) | |
|
779 | 783 | # is this redundant? |
|
780 | 784 | self.metadata[msg_id] = md |
|
781 | 785 | |
@@ -1201,7 +1205,7 class Client(HasTraits): | |||
|
1201 | 1205 | |
|
1202 | 1206 | return result |
|
1203 | 1207 | |
|
1204 |
def send_apply_request(self, socket, f, args=None, kwargs=None, |
|
|
1208 | def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False, | |
|
1205 | 1209 | ident=None): |
|
1206 | 1210 | """construct and send an apply message via a socket. |
|
1207 | 1211 | |
@@ -1214,7 +1218,7 class Client(HasTraits): | |||
|
1214 | 1218 | # defaults: |
|
1215 | 1219 | args = args if args is not None else [] |
|
1216 | 1220 | kwargs = kwargs if kwargs is not None else {} |
|
1217 |
|
|
|
1221 | metadata = metadata if metadata is not None else {} | |
|
1218 | 1222 | |
|
1219 | 1223 | # validate arguments |
|
1220 | 1224 | if not callable(f) and not isinstance(f, Reference): |
@@ -1223,13 +1227,13 class Client(HasTraits): | |||
|
1223 | 1227 | raise TypeError("args must be tuple or list, not %s"%type(args)) |
|
1224 | 1228 | if not isinstance(kwargs, dict): |
|
1225 | 1229 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
1226 |
if not isinstance( |
|
|
1227 |
raise TypeError(" |
|
|
1230 | if not isinstance(metadata, dict): | |
|
1231 | raise TypeError("metadata must be dict, not %s"%type(metadata)) | |
|
1228 | 1232 | |
|
1229 | 1233 | bufs = util.pack_apply_message(f,args,kwargs) |
|
1230 | 1234 | |
|
1231 | 1235 | msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, |
|
1232 |
|
|
|
1236 | metadata=metadata, track=track) | |
|
1233 | 1237 | |
|
1234 | 1238 | msg_id = msg['header']['msg_id'] |
|
1235 | 1239 | self.outstanding.add(msg_id) |
@@ -1245,7 +1249,7 class Client(HasTraits): | |||
|
1245 | 1249 | |
|
1246 | 1250 | return msg |
|
1247 | 1251 | |
|
1248 |
def send_execute_request(self, socket, code, silent=True, |
|
|
1252 | def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None): | |
|
1249 | 1253 | """construct and send an execute request via a socket. |
|
1250 | 1254 | |
|
1251 | 1255 | """ |
@@ -1254,19 +1258,19 class Client(HasTraits): | |||
|
1254 | 1258 | raise RuntimeError("Client cannot be used after its sockets have been closed") |
|
1255 | 1259 | |
|
1256 | 1260 | # defaults: |
|
1257 |
|
|
|
1261 | metadata = metadata if metadata is not None else {} | |
|
1258 | 1262 | |
|
1259 | 1263 | # validate arguments |
|
1260 | 1264 | if not isinstance(code, basestring): |
|
1261 | 1265 | raise TypeError("code must be text, not %s" % type(code)) |
|
1262 |
if not isinstance( |
|
|
1263 |
raise TypeError(" |
|
|
1266 | if not isinstance(metadata, dict): | |
|
1267 | raise TypeError("metadata must be dict, not %s" % type(metadata)) | |
|
1264 | 1268 | |
|
1265 | 1269 | content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) |
|
1266 | 1270 | |
|
1267 | 1271 | |
|
1268 | 1272 | msg = self.session.send(socket, "execute_request", content=content, ident=ident, |
|
1269 |
|
|
|
1273 | metadata=metadata) | |
|
1270 | 1274 | |
|
1271 | 1275 | msg_id = msg['header']['msg_id'] |
|
1272 | 1276 | self.outstanding.add(msg_id) |
@@ -1401,7 +1405,7 class Client(HasTraits): | |||
|
1401 | 1405 | return ar |
|
1402 | 1406 | |
|
1403 | 1407 | @spin_first |
|
1404 |
def resubmit(self, indices_or_msg_ids=None, |
|
|
1408 | def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None): | |
|
1405 | 1409 | """Resubmit one or more tasks. |
|
1406 | 1410 | |
|
1407 | 1411 | in-flight tasks may not be resubmitted. |
@@ -1539,7 +1543,13 class Client(HasTraits): | |||
|
1539 | 1543 | rcontent = self.session.unpack(rcontent) |
|
1540 | 1544 | |
|
1541 | 1545 | md = self.metadata[msg_id] |
|
1542 | md.update(self._extract_metadata(header, parent, rcontent)) | |
|
1546 | md_msg = dict( | |
|
1547 | content=rcontent, | |
|
1548 | parent_header=parent, | |
|
1549 | header=header, | |
|
1550 | metadata=rec['result_metadata'], | |
|
1551 | ) | |
|
1552 | md.update(self._extract_metadata(md_msg)) | |
|
1543 | 1553 | if rec.get('received'): |
|
1544 | 1554 | md['received'] = rec['received'] |
|
1545 | 1555 | md.update(iodict) |
@@ -1022,10 +1022,10 class LoadBalancedView(View): | |||
|
1022 | 1022 | |
|
1023 | 1023 | after = self._render_dependency(after) |
|
1024 | 1024 | follow = self._render_dependency(follow) |
|
1025 |
|
|
|
1025 | metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) | |
|
1026 | 1026 | |
|
1027 | 1027 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, |
|
1028 |
|
|
|
1028 | metadata=metadata) | |
|
1029 | 1029 | tracker = None if track is False else msg['tracker'] |
|
1030 | 1030 | |
|
1031 | 1031 | ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker) |
@@ -56,6 +56,7 def empty_record(): | |||
|
56 | 56 | return { |
|
57 | 57 | 'msg_id' : None, |
|
58 | 58 | 'header' : None, |
|
59 | 'metadata' : None, | |
|
59 | 60 | 'content': None, |
|
60 | 61 | 'buffers': None, |
|
61 | 62 | 'submitted': None, |
@@ -66,6 +67,7 def empty_record(): | |||
|
66 | 67 | 'resubmitted': None, |
|
67 | 68 | 'received': None, |
|
68 | 69 | 'result_header' : None, |
|
70 | 'result_metadata' : None, | |
|
69 | 71 | 'result_content' : None, |
|
70 | 72 | 'result_buffers' : None, |
|
71 | 73 | 'queue' : None, |
@@ -83,6 +85,7 def init_record(msg): | |||
|
83 | 85 | 'msg_id' : header['msg_id'], |
|
84 | 86 | 'header' : header, |
|
85 | 87 | 'content': msg['content'], |
|
88 | 'metadata': msg['metadata'], | |
|
86 | 89 | 'buffers': msg['buffers'], |
|
87 | 90 | 'submitted': header['date'], |
|
88 | 91 | 'client_uuid' : None, |
@@ -92,6 +95,7 def init_record(msg): | |||
|
92 | 95 | 'resubmitted': None, |
|
93 | 96 | 'received': None, |
|
94 | 97 | 'result_header' : None, |
|
98 | 'result_metadata': None, | |
|
95 | 99 | 'result_content' : None, |
|
96 | 100 | 'result_buffers' : None, |
|
97 | 101 | 'queue' : None, |
@@ -646,10 +650,12 class Hub(SessionFactory): | |||
|
646 | 650 | return |
|
647 | 651 | # update record anyway, because the unregistration could have been premature |
|
648 | 652 | rheader = msg['header'] |
|
653 | md = msg['metadata'] | |
|
649 | 654 | completed = rheader['date'] |
|
650 |
started = |
|
|
655 | started = md.get('started', None) | |
|
651 | 656 | result = { |
|
652 | 657 | 'result_header' : rheader, |
|
658 | 'result_metadata': md, | |
|
653 | 659 | 'result_content': msg['content'], |
|
654 | 660 | 'received': datetime.now(), |
|
655 | 661 | 'started' : started, |
@@ -736,10 +742,11 class Hub(SessionFactory): | |||
|
736 | 742 | self.unassigned.remove(msg_id) |
|
737 | 743 | |
|
738 | 744 | header = msg['header'] |
|
739 | engine_uuid = header.get('engine', u'') | |
|
745 | md = msg['metadata'] | |
|
746 | engine_uuid = md.get('engine', u'') | |
|
740 | 747 | eid = self.by_ident.get(cast_bytes(engine_uuid), None) |
|
741 | 748 | |
|
742 |
status = |
|
|
749 | status = md.get('status', None) | |
|
743 | 750 | |
|
744 | 751 | if msg_id in self.pending: |
|
745 | 752 | self.log.info("task::task %r finished on %s", msg_id, eid) |
@@ -751,9 +758,10 class Hub(SessionFactory): | |||
|
751 | 758 | if msg_id in self.tasks[eid]: |
|
752 | 759 | self.tasks[eid].remove(msg_id) |
|
753 | 760 | completed = header['date'] |
|
754 |
started = |
|
|
761 | started = md.get('started', None) | |
|
755 | 762 | result = { |
|
756 | 763 | 'result_header' : header, |
|
764 | 'result_metadata': msg['metadata'], | |
|
757 | 765 | 'result_content': msg['content'], |
|
758 | 766 | 'started' : started, |
|
759 | 767 | 'completed' : completed, |
@@ -1221,12 +1229,15 class Hub(SessionFactory): | |||
|
1221 | 1229 | io_dict = {} |
|
1222 | 1230 | for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'): |
|
1223 | 1231 | io_dict[key] = rec[key] |
|
1224 | content = { 'result_content': rec['result_content'], | |
|
1225 |
|
|
|
1226 | 'result_header' : rec['result_header'], | |
|
1227 | 'received' : rec['received'], | |
|
1228 | 'io' : io_dict, | |
|
1229 | } | |
|
1232 | content = { | |
|
1233 | 'header': rec['header'], | |
|
1234 | 'metadata': rec['metadata'], | |
|
1235 | 'result_metadata': rec['result_metadata'], | |
|
1236 | 'result_header' : rec['result_header'], | |
|
1237 | 'result_content': rec['result_content'], | |
|
1238 | 'received' : rec['received'], | |
|
1239 | 'io' : io_dict, | |
|
1240 | } | |
|
1230 | 1241 | if rec['result_buffers']: |
|
1231 | 1242 | buffers = map(bytes, rec['result_buffers']) |
|
1232 | 1243 | else: |
@@ -129,12 +129,14 MET = Dependency([]) | |||
|
129 | 129 | |
|
130 | 130 | class Job(object): |
|
131 | 131 | """Simple container for a job""" |
|
132 |
def __init__(self, msg_id, raw_msg, idents, msg, header, |
|
|
132 | def __init__(self, msg_id, raw_msg, idents, msg, header, metadata, | |
|
133 | targets, after, follow, timeout): | |
|
133 | 134 | self.msg_id = msg_id |
|
134 | 135 | self.raw_msg = raw_msg |
|
135 | 136 | self.idents = idents |
|
136 | 137 | self.msg = msg |
|
137 | 138 | self.header = header |
|
139 | self.metadata = metadata | |
|
138 | 140 | self.targets = targets |
|
139 | 141 | self.after = after |
|
140 | 142 | self.follow = follow |
@@ -327,13 +329,13 class TaskScheduler(SessionFactory): | |||
|
327 | 329 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) |
|
328 | 330 | except: |
|
329 | 331 | content = error.wrap_exception() |
|
330 |
# build fake |
|
|
331 |
|
|
|
332 | status='error', | |
|
332 | # build fake metadata | |
|
333 | md = dict( | |
|
334 | status=u'error', | |
|
333 | 335 | engine=engine, |
|
334 | 336 | date=datetime.now(), |
|
335 | 337 | ) |
|
336 |
msg = self.session.msg('apply_reply', content, parent=parent, |
|
|
338 | msg = self.session.msg('apply_reply', content, parent=parent, metadata=md) | |
|
337 | 339 | raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) |
|
338 | 340 | # and dispatch it |
|
339 | 341 | self.dispatch_result(raw_reply) |
@@ -365,20 +367,21 class TaskScheduler(SessionFactory): | |||
|
365 | 367 | self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) |
|
366 | 368 | |
|
367 | 369 | header = msg['header'] |
|
370 | md = msg['metadata'] | |
|
368 | 371 | msg_id = header['msg_id'] |
|
369 | 372 | self.all_ids.add(msg_id) |
|
370 | 373 | |
|
371 | 374 | # get targets as a set of bytes objects |
|
372 | 375 | # from a list of unicode objects |
|
373 |
targets = |
|
|
376 | targets = md.get('targets', []) | |
|
374 | 377 | targets = map(cast_bytes, targets) |
|
375 | 378 | targets = set(targets) |
|
376 | 379 | |
|
377 |
retries = |
|
|
380 | retries = md.get('retries', 0) | |
|
378 | 381 | self.retries[msg_id] = retries |
|
379 | 382 | |
|
380 | 383 | # time dependencies |
|
381 |
after = |
|
|
384 | after = md.get('after', None) | |
|
382 | 385 | if after: |
|
383 | 386 | after = Dependency(after) |
|
384 | 387 | if after.all: |
@@ -402,10 +405,10 class TaskScheduler(SessionFactory): | |||
|
402 | 405 | after = MET |
|
403 | 406 | |
|
404 | 407 | # location dependencies |
|
405 |
follow = Dependency( |
|
|
408 | follow = Dependency(md.get('follow', [])) | |
|
406 | 409 | |
|
407 | 410 | # turn timeouts into datetime objects: |
|
408 |
timeout = |
|
|
411 | timeout = md.get('timeout', None) | |
|
409 | 412 | if timeout: |
|
410 | 413 | # cast to float, because jsonlib returns floats as decimal.Decimal, |
|
411 | 414 | # which timedelta does not accept |
@@ -413,7 +416,7 class TaskScheduler(SessionFactory): | |||
|
413 | 416 | |
|
414 | 417 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, |
|
415 | 418 | header=header, targets=targets, after=after, follow=follow, |
|
416 | timeout=timeout, | |
|
419 | timeout=timeout, metadata=md, | |
|
417 | 420 | ) |
|
418 | 421 | |
|
419 | 422 | # validate and reduce dependencies: |
@@ -585,10 +588,10 class TaskScheduler(SessionFactory): | |||
|
585 | 588 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) |
|
586 | 589 | return |
|
587 | 590 | |
|
588 |
|
|
|
591 | md = msg['metadata'] | |
|
589 | 592 | parent = msg['parent_header'] |
|
590 |
if |
|
|
591 |
success = ( |
|
|
593 | if md.get('dependencies_met', True): | |
|
594 | success = (md['status'] == 'ok') | |
|
592 | 595 | msg_id = parent['msg_id'] |
|
593 | 596 | retries = self.retries[msg_id] |
|
594 | 597 | if not success and retries > 0: |
@@ -109,6 +109,7 class SQLiteDB(BaseDB): | |||
|
109 | 109 | # the ordered list of column names |
|
110 | 110 | _keys = List(['msg_id' , |
|
111 | 111 | 'header' , |
|
112 | 'metadata', | |
|
112 | 113 | 'content', |
|
113 | 114 | 'buffers', |
|
114 | 115 | 'submitted', |
@@ -119,6 +120,7 class SQLiteDB(BaseDB): | |||
|
119 | 120 | 'resubmitted', |
|
120 | 121 | 'received', |
|
121 | 122 | 'result_header' , |
|
123 | 'result_metadata', | |
|
122 | 124 | 'result_content' , |
|
123 | 125 | 'result_buffers' , |
|
124 | 126 | 'queue' , |
@@ -131,6 +133,7 class SQLiteDB(BaseDB): | |||
|
131 | 133 | # sqlite datatypes for checking that db is current format |
|
132 | 134 | _types = Dict({'msg_id' : 'text' , |
|
133 | 135 | 'header' : 'dict text', |
|
136 | 'metadata' : 'dict text', | |
|
134 | 137 | 'content' : 'dict text', |
|
135 | 138 | 'buffers' : 'bufs blob', |
|
136 | 139 | 'submitted' : 'timestamp', |
@@ -141,6 +144,7 class SQLiteDB(BaseDB): | |||
|
141 | 144 | 'resubmitted' : 'text', |
|
142 | 145 | 'received' : 'timestamp', |
|
143 | 146 | 'result_header' : 'dict text', |
|
147 | 'result_metadata' : 'dict text', | |
|
144 | 148 | 'result_content' : 'dict text', |
|
145 | 149 | 'result_buffers' : 'bufs blob', |
|
146 | 150 | 'queue' : 'text', |
@@ -240,6 +244,7 class SQLiteDB(BaseDB): | |||
|
240 | 244 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s |
|
241 | 245 | (msg_id text PRIMARY KEY, |
|
242 | 246 | header dict text, |
|
247 | metadata dict text, | |
|
243 | 248 | content dict text, |
|
244 | 249 | buffers bufs blob, |
|
245 | 250 | submitted timestamp, |
@@ -250,6 +255,7 class SQLiteDB(BaseDB): | |||
|
250 | 255 | resubmitted text, |
|
251 | 256 | received timestamp, |
|
252 | 257 | result_header dict text, |
|
258 | result_metadata dict text, | |
|
253 | 259 | result_content dict text, |
|
254 | 260 | result_buffers bufs blob, |
|
255 | 261 | queue text, |
@@ -218,9 +218,9 class Kernel(Configurable): | |||
|
218 | 218 | # is it safe to assume a msg_id will not be resubmitted? |
|
219 | 219 | reply_type = msg_type.split('_')[0] + '_reply' |
|
220 | 220 | status = {'status' : 'aborted'} |
|
221 |
|
|
|
222 |
|
|
|
223 |
reply_msg = self.session.send(stream, reply_type, |
|
|
221 | md = {'engine' : self.ident} | |
|
222 | md.update(status) | |
|
223 | reply_msg = self.session.send(stream, reply_type, metadata=md, | |
|
224 | 224 | content=status, parent=msg, ident=idents) |
|
225 | 225 | return |
|
226 | 226 | |
@@ -293,13 +293,16 class Kernel(Configurable): | |||
|
293 | 293 | # Kernel request handlers |
|
294 | 294 | #--------------------------------------------------------------------------- |
|
295 | 295 | |
|
296 |
def _make_ |
|
|
297 |
"""init |
|
|
298 |
|
|
|
296 | def _make_metadata(self, other=None): | |
|
297 | """init metadata dict, for execute/apply_reply""" | |
|
298 | new_md = { | |
|
299 | 299 | 'dependencies_met' : True, |
|
300 | 300 | 'engine' : self.ident, |
|
301 | 301 | 'started': datetime.now(), |
|
302 | 302 | } |
|
303 | if other: | |
|
304 | new_md.update(other) | |
|
305 | return new_md | |
|
303 | 306 | |
|
304 | 307 | def _publish_pyin(self, code, parent, execution_count): |
|
305 | 308 | """Publish the code request on the pyin stream.""" |
@@ -333,7 +336,7 class Kernel(Configurable): | |||
|
333 | 336 | self.log.error("%s", parent) |
|
334 | 337 | return |
|
335 | 338 | |
|
336 |
|
|
|
339 | md = self._make_metadata(parent['metadata']) | |
|
337 | 340 | |
|
338 | 341 | shell = self.shell # we'll need this a lot here |
|
339 | 342 | |
@@ -425,13 +428,13 class Kernel(Configurable): | |||
|
425 | 428 | # Send the reply. |
|
426 | 429 | reply_content = json_clean(reply_content) |
|
427 | 430 | |
|
428 |
|
|
|
431 | md['status'] = reply_content['status'] | |
|
429 | 432 | if reply_content['status'] == 'error' and \ |
|
430 | 433 | reply_content['ename'] == 'UnmetDependency': |
|
431 |
|
|
|
434 | md['dependencies_met'] = False | |
|
432 | 435 | |
|
433 | 436 | reply_msg = self.session.send(stream, u'execute_reply', |
|
434 |
reply_content, parent, |
|
|
437 | reply_content, parent, metadata=md, | |
|
435 | 438 | ident=ident) |
|
436 | 439 | |
|
437 | 440 | self.log.debug("%s", reply_msg) |
@@ -543,7 +546,7 class Kernel(Configurable): | |||
|
543 | 546 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
544 | 547 | # self.iopub_socket.send(pyin_msg) |
|
545 | 548 | # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) |
|
546 |
|
|
|
549 | md = self._make_metadata(parent['metadata']) | |
|
547 | 550 | try: |
|
548 | 551 | working = shell.user_ns |
|
549 | 552 | |
@@ -589,19 +592,19 class Kernel(Configurable): | |||
|
589 | 592 | result_buf = [] |
|
590 | 593 | |
|
591 | 594 | if reply_content['ename'] == 'UnmetDependency': |
|
592 |
|
|
|
595 | md['dependencies_met'] = False | |
|
593 | 596 | else: |
|
594 | 597 | reply_content = {'status' : 'ok'} |
|
595 | 598 | |
|
596 | 599 | # put 'ok'/'error' status in header, for scheduler introspection: |
|
597 |
|
|
|
600 | md['status'] = reply_content['status'] | |
|
598 | 601 | |
|
599 | 602 | # flush i/o |
|
600 | 603 | sys.stdout.flush() |
|
601 | 604 | sys.stderr.flush() |
|
602 | 605 | |
|
603 | 606 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
604 |
parent=parent, ident=ident,buffers=result_buf, |
|
|
607 | parent=parent, ident=ident,buffers=result_buf, metadata=md) | |
|
605 | 608 | |
|
606 | 609 | self._publish_status(u'idle', parent) |
|
607 | 610 | |
@@ -672,9 +675,9 class Kernel(Configurable): | |||
|
672 | 675 | reply_type = msg_type.split('_')[0] + '_reply' |
|
673 | 676 | |
|
674 | 677 | status = {'status' : 'aborted'} |
|
675 |
|
|
|
676 |
|
|
|
677 |
reply_msg = self.session.send(stream, reply_type, |
|
|
678 | md = {'engine' : self.ident} | |
|
679 | md.update(status) | |
|
680 | reply_msg = self.session.send(stream, reply_type, meatadata=md, | |
|
678 | 681 | content=status, parent=msg, ident=idents) |
|
679 | 682 | self.log.debug("%s", reply_msg) |
|
680 | 683 | # We need to wait a bit for requests to come in. This can probably |
General Comments 0
You need to be logged in to leave comments.
Login now