##// END OF EJS Templates
migrate subheader usage to new metadata
MinRK -
Show More
@@ -650,12 +650,16 b' class Client(HasTraits):'
650 650 e.engine_info['engine_id'] = eid
651 651 return e
652 652
653 def _extract_metadata(self, header, parent, content):
653 def _extract_metadata(self, msg):
654 header = msg['header']
655 parent = msg['parent_header']
656 msg_meta = msg['metadata']
657 content = msg['content']
654 658 md = {'msg_id' : parent['msg_id'],
655 659 'received' : datetime.now(),
656 'engine_uuid' : header.get('engine', None),
657 'follow' : parent.get('follow', []),
658 'after' : parent.get('after', []),
660 'engine_uuid' : msg_meta.get('engine', None),
661 'follow' : msg_meta.get('follow', []),
662 'after' : msg_meta.get('after', []),
659 663 'status' : content['status'],
660 664 }
661 665
@@ -664,8 +668,8 b' class Client(HasTraits):'
664 668
665 669 if 'date' in parent:
666 670 md['submitted'] = parent['date']
667 if 'started' in header:
668 md['started'] = header['started']
671 if 'started' in msg_meta:
672 md['started'] = msg_meta['started']
669 673 if 'date' in header:
670 674 md['completed'] = header['date']
671 675 return md
@@ -738,7 +742,7 b' class Client(HasTraits):'
738 742
739 743 # construct metadata:
740 744 md = self.metadata[msg_id]
741 md.update(self._extract_metadata(header, parent, content))
745 md.update(self._extract_metadata(msg))
742 746 # is this redundant?
743 747 self.metadata[msg_id] = md
744 748
@@ -775,7 +779,7 b' class Client(HasTraits):'
775 779
776 780 # construct metadata:
777 781 md = self.metadata[msg_id]
778 md.update(self._extract_metadata(header, parent, content))
782 md.update(self._extract_metadata(msg))
779 783 # is this redundant?
780 784 self.metadata[msg_id] = md
781 785
@@ -1201,7 +1205,7 b' class Client(HasTraits):'
1201 1205
1202 1206 return result
1203 1207
1204 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1208 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1205 1209 ident=None):
1206 1210 """construct and send an apply message via a socket.
1207 1211
@@ -1214,7 +1218,7 b' class Client(HasTraits):'
1214 1218 # defaults:
1215 1219 args = args if args is not None else []
1216 1220 kwargs = kwargs if kwargs is not None else {}
1217 subheader = subheader if subheader is not None else {}
1221 metadata = metadata if metadata is not None else {}
1218 1222
1219 1223 # validate arguments
1220 1224 if not callable(f) and not isinstance(f, Reference):
@@ -1223,13 +1227,13 b' class Client(HasTraits):'
1223 1227 raise TypeError("args must be tuple or list, not %s"%type(args))
1224 1228 if not isinstance(kwargs, dict):
1225 1229 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1226 if not isinstance(subheader, dict):
1227 raise TypeError("subheader must be dict, not %s"%type(subheader))
1230 if not isinstance(metadata, dict):
1231 raise TypeError("metadata must be dict, not %s"%type(metadata))
1228 1232
1229 1233 bufs = util.pack_apply_message(f,args,kwargs)
1230 1234
1231 1235 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1232 subheader=subheader, track=track)
1236 metadata=metadata, track=track)
1233 1237
1234 1238 msg_id = msg['header']['msg_id']
1235 1239 self.outstanding.add(msg_id)
@@ -1245,7 +1249,7 b' class Client(HasTraits):'
1245 1249
1246 1250 return msg
1247 1251
1248 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1252 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1249 1253 """construct and send an execute request via a socket.
1250 1254
1251 1255 """
@@ -1254,19 +1258,19 b' class Client(HasTraits):'
1254 1258 raise RuntimeError("Client cannot be used after its sockets have been closed")
1255 1259
1256 1260 # defaults:
1257 subheader = subheader if subheader is not None else {}
1261 metadata = metadata if metadata is not None else {}
1258 1262
1259 1263 # validate arguments
1260 1264 if not isinstance(code, basestring):
1261 1265 raise TypeError("code must be text, not %s" % type(code))
1262 if not isinstance(subheader, dict):
1263 raise TypeError("subheader must be dict, not %s" % type(subheader))
1266 if not isinstance(metadata, dict):
1267 raise TypeError("metadata must be dict, not %s" % type(metadata))
1264 1268
1265 1269 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1266 1270
1267 1271
1268 1272 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1269 subheader=subheader)
1273 metadata=metadata)
1270 1274
1271 1275 msg_id = msg['header']['msg_id']
1272 1276 self.outstanding.add(msg_id)
@@ -1401,7 +1405,7 b' class Client(HasTraits):'
1401 1405 return ar
1402 1406
1403 1407 @spin_first
1404 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1408 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1405 1409 """Resubmit one or more tasks.
1406 1410
1407 1411 in-flight tasks may not be resubmitted.
@@ -1539,7 +1543,13 b' class Client(HasTraits):'
1539 1543 rcontent = self.session.unpack(rcontent)
1540 1544
1541 1545 md = self.metadata[msg_id]
1542 md.update(self._extract_metadata(header, parent, rcontent))
1546 md_msg = dict(
1547 content=rcontent,
1548 parent_header=parent,
1549 header=header,
1550 metadata=rec['result_metadata'],
1551 )
1552 md.update(self._extract_metadata(md_msg))
1543 1553 if rec.get('received'):
1544 1554 md['received'] = rec['received']
1545 1555 md.update(iodict)
@@ -1022,10 +1022,10 b' class LoadBalancedView(View):'
1022 1022
1023 1023 after = self._render_dependency(after)
1024 1024 follow = self._render_dependency(follow)
1025 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1025 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1026 1026
1027 1027 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1028 subheader=subheader)
1028 metadata=metadata)
1029 1029 tracker = None if track is False else msg['tracker']
1030 1030
1031 1031 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
@@ -56,6 +56,7 b' def empty_record():'
56 56 return {
57 57 'msg_id' : None,
58 58 'header' : None,
59 'metadata' : None,
59 60 'content': None,
60 61 'buffers': None,
61 62 'submitted': None,
@@ -66,6 +67,7 b' def empty_record():'
66 67 'resubmitted': None,
67 68 'received': None,
68 69 'result_header' : None,
70 'result_metadata' : None,
69 71 'result_content' : None,
70 72 'result_buffers' : None,
71 73 'queue' : None,
@@ -83,6 +85,7 b' def init_record(msg):'
83 85 'msg_id' : header['msg_id'],
84 86 'header' : header,
85 87 'content': msg['content'],
88 'metadata': msg['metadata'],
86 89 'buffers': msg['buffers'],
87 90 'submitted': header['date'],
88 91 'client_uuid' : None,
@@ -92,6 +95,7 b' def init_record(msg):'
92 95 'resubmitted': None,
93 96 'received': None,
94 97 'result_header' : None,
98 'result_metadata': None,
95 99 'result_content' : None,
96 100 'result_buffers' : None,
97 101 'queue' : None,
@@ -646,10 +650,12 b' class Hub(SessionFactory):'
646 650 return
647 651 # update record anyway, because the unregistration could have been premature
648 652 rheader = msg['header']
653 md = msg['metadata']
649 654 completed = rheader['date']
650 started = rheader.get('started', None)
655 started = md.get('started', None)
651 656 result = {
652 657 'result_header' : rheader,
658 'result_metadata': md,
653 659 'result_content': msg['content'],
654 660 'received': datetime.now(),
655 661 'started' : started,
@@ -736,10 +742,11 b' class Hub(SessionFactory):'
736 742 self.unassigned.remove(msg_id)
737 743
738 744 header = msg['header']
739 engine_uuid = header.get('engine', u'')
745 md = msg['metadata']
746 engine_uuid = md.get('engine', u'')
740 747 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
741 748
742 status = header.get('status', None)
749 status = md.get('status', None)
743 750
744 751 if msg_id in self.pending:
745 752 self.log.info("task::task %r finished on %s", msg_id, eid)
@@ -751,9 +758,10 b' class Hub(SessionFactory):'
751 758 if msg_id in self.tasks[eid]:
752 759 self.tasks[eid].remove(msg_id)
753 760 completed = header['date']
754 started = header.get('started', None)
761 started = md.get('started', None)
755 762 result = {
756 763 'result_header' : header,
764 'result_metadata': msg['metadata'],
757 765 'result_content': msg['content'],
758 766 'started' : started,
759 767 'completed' : completed,
@@ -1221,12 +1229,15 b' class Hub(SessionFactory):'
1221 1229 io_dict = {}
1222 1230 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1223 1231 io_dict[key] = rec[key]
1224 content = { 'result_content': rec['result_content'],
1225 'header': rec['header'],
1226 'result_header' : rec['result_header'],
1227 'received' : rec['received'],
1228 'io' : io_dict,
1229 }
1232 content = {
1233 'header': rec['header'],
1234 'metadata': rec['metadata'],
1235 'result_metadata': rec['result_metadata'],
1236 'result_header' : rec['result_header'],
1237 'result_content': rec['result_content'],
1238 'received' : rec['received'],
1239 'io' : io_dict,
1240 }
1230 1241 if rec['result_buffers']:
1231 1242 buffers = map(bytes, rec['result_buffers'])
1232 1243 else:
@@ -129,12 +129,14 b' MET = Dependency([])'
129 129
130 130 class Job(object):
131 131 """Simple container for a job"""
132 def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
132 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
133 targets, after, follow, timeout):
133 134 self.msg_id = msg_id
134 135 self.raw_msg = raw_msg
135 136 self.idents = idents
136 137 self.msg = msg
137 138 self.header = header
139 self.metadata = metadata
138 140 self.targets = targets
139 141 self.after = after
140 142 self.follow = follow
@@ -327,13 +329,13 b' class TaskScheduler(SessionFactory):'
327 329 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
328 330 except:
329 331 content = error.wrap_exception()
330 # build fake header
331 header = dict(
332 status='error',
332 # build fake metadata
333 md = dict(
334 status=u'error',
333 335 engine=engine,
334 336 date=datetime.now(),
335 337 )
336 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
338 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
337 339 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
338 340 # and dispatch it
339 341 self.dispatch_result(raw_reply)
@@ -365,20 +367,21 b' class TaskScheduler(SessionFactory):'
365 367 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
366 368
367 369 header = msg['header']
370 md = msg['metadata']
368 371 msg_id = header['msg_id']
369 372 self.all_ids.add(msg_id)
370 373
371 374 # get targets as a set of bytes objects
372 375 # from a list of unicode objects
373 targets = header.get('targets', [])
376 targets = md.get('targets', [])
374 377 targets = map(cast_bytes, targets)
375 378 targets = set(targets)
376 379
377 retries = header.get('retries', 0)
380 retries = md.get('retries', 0)
378 381 self.retries[msg_id] = retries
379 382
380 383 # time dependencies
381 after = header.get('after', None)
384 after = md.get('after', None)
382 385 if after:
383 386 after = Dependency(after)
384 387 if after.all:
@@ -402,10 +405,10 b' class TaskScheduler(SessionFactory):'
402 405 after = MET
403 406
404 407 # location dependencies
405 follow = Dependency(header.get('follow', []))
408 follow = Dependency(md.get('follow', []))
406 409
407 410 # turn timeouts into datetime objects:
408 timeout = header.get('timeout', None)
411 timeout = md.get('timeout', None)
409 412 if timeout:
410 413 # cast to float, because jsonlib returns floats as decimal.Decimal,
411 414 # which timedelta does not accept
@@ -413,7 +416,7 b' class TaskScheduler(SessionFactory):'
413 416
414 417 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
415 418 header=header, targets=targets, after=after, follow=follow,
416 timeout=timeout,
419 timeout=timeout, metadata=md,
417 420 )
418 421
419 422 # validate and reduce dependencies:
@@ -585,10 +588,10 b' class TaskScheduler(SessionFactory):'
585 588 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
586 589 return
587 590
588 header = msg['header']
591 md = msg['metadata']
589 592 parent = msg['parent_header']
590 if header.get('dependencies_met', True):
591 success = (header['status'] == 'ok')
593 if md.get('dependencies_met', True):
594 success = (md['status'] == 'ok')
592 595 msg_id = parent['msg_id']
593 596 retries = self.retries[msg_id]
594 597 if not success and retries > 0:
@@ -109,6 +109,7 b' class SQLiteDB(BaseDB):'
109 109 # the ordered list of column names
110 110 _keys = List(['msg_id' ,
111 111 'header' ,
112 'metadata',
112 113 'content',
113 114 'buffers',
114 115 'submitted',
@@ -119,6 +120,7 b' class SQLiteDB(BaseDB):'
119 120 'resubmitted',
120 121 'received',
121 122 'result_header' ,
123 'result_metadata',
122 124 'result_content' ,
123 125 'result_buffers' ,
124 126 'queue' ,
@@ -131,6 +133,7 b' class SQLiteDB(BaseDB):'
131 133 # sqlite datatypes for checking that db is current format
132 134 _types = Dict({'msg_id' : 'text' ,
133 135 'header' : 'dict text',
136 'metadata' : 'dict text',
134 137 'content' : 'dict text',
135 138 'buffers' : 'bufs blob',
136 139 'submitted' : 'timestamp',
@@ -141,6 +144,7 b' class SQLiteDB(BaseDB):'
141 144 'resubmitted' : 'text',
142 145 'received' : 'timestamp',
143 146 'result_header' : 'dict text',
147 'result_metadata' : 'dict text',
144 148 'result_content' : 'dict text',
145 149 'result_buffers' : 'bufs blob',
146 150 'queue' : 'text',
@@ -240,6 +244,7 b' class SQLiteDB(BaseDB):'
240 244 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
241 245 (msg_id text PRIMARY KEY,
242 246 header dict text,
247 metadata dict text,
243 248 content dict text,
244 249 buffers bufs blob,
245 250 submitted timestamp,
@@ -250,6 +255,7 b' class SQLiteDB(BaseDB):'
250 255 resubmitted text,
251 256 received timestamp,
252 257 result_header dict text,
258 result_metadata dict text,
253 259 result_content dict text,
254 260 result_buffers bufs blob,
255 261 queue text,
@@ -218,9 +218,9 b' class Kernel(Configurable):'
218 218 # is it safe to assume a msg_id will not be resubmitted?
219 219 reply_type = msg_type.split('_')[0] + '_reply'
220 220 status = {'status' : 'aborted'}
221 sub = {'engine' : self.ident}
222 sub.update(status)
223 reply_msg = self.session.send(stream, reply_type, subheader=sub,
221 md = {'engine' : self.ident}
222 md.update(status)
223 reply_msg = self.session.send(stream, reply_type, metadata=md,
224 224 content=status, parent=msg, ident=idents)
225 225 return
226 226
@@ -293,13 +293,16 b' class Kernel(Configurable):'
293 293 # Kernel request handlers
294 294 #---------------------------------------------------------------------------
295 295
296 def _make_subheader(self):
297 """init subheader dict, for execute/apply_reply"""
298 return {
296 def _make_metadata(self, other=None):
297 """init metadata dict, for execute/apply_reply"""
298 new_md = {
299 299 'dependencies_met' : True,
300 300 'engine' : self.ident,
301 301 'started': datetime.now(),
302 302 }
303 if other:
304 new_md.update(other)
305 return new_md
303 306
304 307 def _publish_pyin(self, code, parent, execution_count):
305 308 """Publish the code request on the pyin stream."""
@@ -333,7 +336,7 b' class Kernel(Configurable):'
333 336 self.log.error("%s", parent)
334 337 return
335 338
336 sub = self._make_subheader()
339 md = self._make_metadata(parent['metadata'])
337 340
338 341 shell = self.shell # we'll need this a lot here
339 342
@@ -425,13 +428,13 b' class Kernel(Configurable):'
425 428 # Send the reply.
426 429 reply_content = json_clean(reply_content)
427 430
428 sub['status'] = reply_content['status']
431 md['status'] = reply_content['status']
429 432 if reply_content['status'] == 'error' and \
430 433 reply_content['ename'] == 'UnmetDependency':
431 sub['dependencies_met'] = False
434 md['dependencies_met'] = False
432 435
433 436 reply_msg = self.session.send(stream, u'execute_reply',
434 reply_content, parent, subheader=sub,
437 reply_content, parent, metadata=md,
435 438 ident=ident)
436 439
437 440 self.log.debug("%s", reply_msg)
@@ -543,7 +546,7 b' class Kernel(Configurable):'
543 546 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
544 547 # self.iopub_socket.send(pyin_msg)
545 548 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
546 sub = self._make_subheader()
549 md = self._make_metadata(parent['metadata'])
547 550 try:
548 551 working = shell.user_ns
549 552
@@ -589,19 +592,19 b' class Kernel(Configurable):'
589 592 result_buf = []
590 593
591 594 if reply_content['ename'] == 'UnmetDependency':
592 sub['dependencies_met'] = False
595 md['dependencies_met'] = False
593 596 else:
594 597 reply_content = {'status' : 'ok'}
595 598
596 599 # put 'ok'/'error' status in header, for scheduler introspection:
597 sub['status'] = reply_content['status']
600 md['status'] = reply_content['status']
598 601
599 602 # flush i/o
600 603 sys.stdout.flush()
601 604 sys.stderr.flush()
602 605
603 606 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
604 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
607 parent=parent, ident=ident,buffers=result_buf, metadata=md)
605 608
606 609 self._publish_status(u'idle', parent)
607 610
@@ -672,9 +675,9 b' class Kernel(Configurable):'
672 675 reply_type = msg_type.split('_')[0] + '_reply'
673 676
674 677 status = {'status' : 'aborted'}
675 sub = {'engine' : self.ident}
676 sub.update(status)
677 reply_msg = self.session.send(stream, reply_type, subheader=sub,
678 md = {'engine' : self.ident}
679 md.update(status)
680 reply_msg = self.session.send(stream, reply_type, meatadata=md,
678 681 content=status, parent=msg, ident=idents)
679 682 self.log.debug("%s", reply_msg)
680 683 # We need to wait a bit for requests to come in. This can probably
General Comments 0
You need to be logged in to leave comments. Login now