diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 09903e0..695b818 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -650,12 +650,16 @@ class Client(HasTraits): e.engine_info['engine_id'] = eid return e - def _extract_metadata(self, header, parent, content): + def _extract_metadata(self, msg): + header = msg['header'] + parent = msg['parent_header'] + msg_meta = msg['metadata'] + content = msg['content'] md = {'msg_id' : parent['msg_id'], 'received' : datetime.now(), - 'engine_uuid' : header.get('engine', None), - 'follow' : parent.get('follow', []), - 'after' : parent.get('after', []), + 'engine_uuid' : msg_meta.get('engine', None), + 'follow' : msg_meta.get('follow', []), + 'after' : msg_meta.get('after', []), 'status' : content['status'], } @@ -664,8 +668,8 @@ class Client(HasTraits): if 'date' in parent: md['submitted'] = parent['date'] - if 'started' in header: - md['started'] = header['started'] + if 'started' in msg_meta: + md['started'] = msg_meta['started'] if 'date' in header: md['completed'] = header['date'] return md @@ -738,7 +742,7 @@ class Client(HasTraits): # construct metadata: md = self.metadata[msg_id] - md.update(self._extract_metadata(header, parent, content)) + md.update(self._extract_metadata(msg)) # is this redundant? self.metadata[msg_id] = md @@ -775,7 +779,7 @@ class Client(HasTraits): # construct metadata: md = self.metadata[msg_id] - md.update(self._extract_metadata(header, parent, content)) + md.update(self._extract_metadata(msg)) # is this redundant? self.metadata[msg_id] = md @@ -1201,7 +1205,7 @@ class Client(HasTraits): return result - def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False, + def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None): """construct and send an apply message via a socket. @@ -1214,7 +1218,7 @@ class Client(HasTraits): # defaults: args = args if args is not None else [] kwargs = kwargs if kwargs is not None else {} - subheader = subheader if subheader is not None else {} + metadata = metadata if metadata is not None else {} # validate arguments if not callable(f) and not isinstance(f, Reference): @@ -1223,13 +1227,13 @@ class Client(HasTraits): raise TypeError("args must be tuple or list, not %s"%type(args)) if not isinstance(kwargs, dict): raise TypeError("kwargs must be dict, not %s"%type(kwargs)) - if not isinstance(subheader, dict): - raise TypeError("subheader must be dict, not %s"%type(subheader)) + if not isinstance(metadata, dict): + raise TypeError("metadata must be dict, not %s"%type(metadata)) bufs = util.pack_apply_message(f,args,kwargs) msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, - subheader=subheader, track=track) + metadata=metadata, track=track) msg_id = msg['header']['msg_id'] self.outstanding.add(msg_id) @@ -1245,7 +1249,7 @@ class Client(HasTraits): return msg - def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None): + def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None): """construct and send an execute request via a socket. """ @@ -1254,19 +1258,19 @@ class Client(HasTraits): raise RuntimeError("Client cannot be used after its sockets have been closed") # defaults: - subheader = subheader if subheader is not None else {} + metadata = metadata if metadata is not None else {} # validate arguments if not isinstance(code, basestring): raise TypeError("code must be text, not %s" % type(code)) - if not isinstance(subheader, dict): - raise TypeError("subheader must be dict, not %s" % type(subheader)) + if not isinstance(metadata, dict): + raise TypeError("metadata must be dict, not %s" % type(metadata)) content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) msg = self.session.send(socket, "execute_request", content=content, ident=ident, - subheader=subheader) + metadata=metadata) msg_id = msg['header']['msg_id'] self.outstanding.add(msg_id) @@ -1401,7 +1405,7 @@ class Client(HasTraits): return ar @spin_first - def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None): + def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None): """Resubmit one or more tasks. in-flight tasks may not be resubmitted. @@ -1539,7 +1543,13 @@ class Client(HasTraits): rcontent = self.session.unpack(rcontent) md = self.metadata[msg_id] - md.update(self._extract_metadata(header, parent, rcontent)) + md_msg = dict( + content=rcontent, + parent_header=parent, + header=header, + metadata=rec['result_metadata'], + ) + md.update(self._extract_metadata(md_msg)) if rec.get('received'): md['received'] = rec['received'] md.update(iodict) diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py index 5d047da..add9cfe 100644 --- a/IPython/parallel/client/view.py +++ b/IPython/parallel/client/view.py @@ -1022,10 +1022,10 @@ class LoadBalancedView(View): after = self._render_dependency(after) follow = self._render_dependency(follow) - subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) + metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, - subheader=subheader) + metadata=metadata) tracker = None if track is False else msg['tracker'] ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 9a5c6fb..533be9c 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -56,6 +56,7 @@ def empty_record(): return { 'msg_id' : None, 'header' : None, + 'metadata' : None, 'content': None, 'buffers': None, 'submitted': None, @@ -66,6 +67,7 @@ def empty_record(): 'resubmitted': None, 'received': None, 'result_header' : None, + 'result_metadata' : None, 'result_content' : None, 'result_buffers' : None, 'queue' : None, @@ -83,6 +85,7 @@ def init_record(msg): 'msg_id' : header['msg_id'], 'header' : header, 'content': msg['content'], + 'metadata': msg['metadata'], 'buffers': msg['buffers'], 'submitted': header['date'], 'client_uuid' : None, @@ -92,6 +95,7 @@ def init_record(msg): 'resubmitted': None, 'received': None, 'result_header' : None, + 'result_metadata': None, 'result_content' : None, 'result_buffers' : None, 'queue' : None, @@ -646,10 +650,12 @@ class Hub(SessionFactory): return # update record anyway, because the unregistration could have been premature rheader = msg['header'] + md = msg['metadata'] completed = rheader['date'] - started = rheader.get('started', None) + started = md.get('started', None) result = { 'result_header' : rheader, + 'result_metadata': md, 'result_content': msg['content'], 'received': datetime.now(), 'started' : started, @@ -736,10 +742,11 @@ class Hub(SessionFactory): self.unassigned.remove(msg_id) header = msg['header'] - engine_uuid = header.get('engine', u'') + md = msg['metadata'] + engine_uuid = md.get('engine', u'') eid = self.by_ident.get(cast_bytes(engine_uuid), None) - status = header.get('status', None) + status = md.get('status', None) if msg_id in self.pending: self.log.info("task::task %r finished on %s", msg_id, eid) @@ -751,9 +758,10 @@ class Hub(SessionFactory): if msg_id in self.tasks[eid]: self.tasks[eid].remove(msg_id) completed = header['date'] - started = header.get('started', None) + started = md.get('started', None) result = { 'result_header' : header, + 'result_metadata': msg['metadata'], 'result_content': msg['content'], 'started' : started, 'completed' : completed, @@ -1221,12 +1229,15 @@ class Hub(SessionFactory): io_dict = {} for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'): io_dict[key] = rec[key] - content = { 'result_content': rec['result_content'], - 'header': rec['header'], - 'result_header' : rec['result_header'], - 'received' : rec['received'], - 'io' : io_dict, - } + content = { + 'header': rec['header'], + 'metadata': rec['metadata'], + 'result_metadata': rec['result_metadata'], + 'result_header' : rec['result_header'], + 'result_content': rec['result_content'], + 'received' : rec['received'], + 'io' : io_dict, + } if rec['result_buffers']: buffers = map(bytes, rec['result_buffers']) else: diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index f339d3e..e503248 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -129,12 +129,14 @@ MET = Dependency([]) class Job(object): """Simple container for a job""" - def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout): + def __init__(self, msg_id, raw_msg, idents, msg, header, metadata, + targets, after, follow, timeout): self.msg_id = msg_id self.raw_msg = raw_msg self.idents = idents self.msg = msg self.header = header + self.metadata = metadata self.targets = targets self.after = after self.follow = follow @@ -327,13 +329,13 @@ class TaskScheduler(SessionFactory): raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) except: content = error.wrap_exception() - # build fake header - header = dict( - status='error', + # build fake metadata + md = dict( + status=u'error', engine=engine, date=datetime.now(), ) - msg = self.session.msg('apply_reply', content, parent=parent, subheader=header) + msg = self.session.msg('apply_reply', content, parent=parent, metadata=md) raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) # and dispatch it self.dispatch_result(raw_reply) @@ -365,20 +367,21 @@ class TaskScheduler(SessionFactory): self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) header = msg['header'] + md = msg['metadata'] msg_id = header['msg_id'] self.all_ids.add(msg_id) # get targets as a set of bytes objects # from a list of unicode objects - targets = header.get('targets', []) + targets = md.get('targets', []) targets = map(cast_bytes, targets) targets = set(targets) - retries = header.get('retries', 0) + retries = md.get('retries', 0) self.retries[msg_id] = retries # time dependencies - after = header.get('after', None) + after = md.get('after', None) if after: after = Dependency(after) if after.all: @@ -402,10 +405,10 @@ class TaskScheduler(SessionFactory): after = MET # location dependencies - follow = Dependency(header.get('follow', [])) + follow = Dependency(md.get('follow', [])) # turn timeouts into datetime objects: - timeout = header.get('timeout', None) + timeout = md.get('timeout', None) if timeout: # cast to float, because jsonlib returns floats as decimal.Decimal, # which timedelta does not accept @@ -413,7 +416,7 @@ class TaskScheduler(SessionFactory): job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, header=header, targets=targets, after=after, follow=follow, - timeout=timeout, + timeout=timeout, metadata=md, ) # validate and reduce dependencies: @@ -585,10 +588,10 @@ class TaskScheduler(SessionFactory): self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) return - header = msg['header'] + md = msg['metadata'] parent = msg['parent_header'] - if header.get('dependencies_met', True): - success = (header['status'] == 'ok') + if md.get('dependencies_met', True): + success = (md['status'] == 'ok') msg_id = parent['msg_id'] retries = self.retries[msg_id] if not success and retries > 0: diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index 2b017d4..36f015d 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -109,6 +109,7 @@ class SQLiteDB(BaseDB): # the ordered list of column names _keys = List(['msg_id' , 'header' , + 'metadata', 'content', 'buffers', 'submitted', @@ -119,6 +120,7 @@ class SQLiteDB(BaseDB): 'resubmitted', 'received', 'result_header' , + 'result_metadata', 'result_content' , 'result_buffers' , 'queue' , @@ -131,6 +133,7 @@ class SQLiteDB(BaseDB): # sqlite datatypes for checking that db is current format _types = Dict({'msg_id' : 'text' , 'header' : 'dict text', + 'metadata' : 'dict text', 'content' : 'dict text', 'buffers' : 'bufs blob', 'submitted' : 'timestamp', @@ -141,6 +144,7 @@ class SQLiteDB(BaseDB): 'resubmitted' : 'text', 'received' : 'timestamp', 'result_header' : 'dict text', + 'result_metadata' : 'dict text', 'result_content' : 'dict text', 'result_buffers' : 'bufs blob', 'queue' : 'text', @@ -240,6 +244,7 @@ class SQLiteDB(BaseDB): self._db.execute("""CREATE TABLE IF NOT EXISTS %s (msg_id text PRIMARY KEY, header dict text, + metadata dict text, content dict text, buffers bufs blob, submitted timestamp, @@ -250,6 +255,7 @@ class SQLiteDB(BaseDB): resubmitted text, received timestamp, result_header dict text, + result_metadata dict text, result_content dict text, result_buffers bufs blob, queue text, diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index a0853f7..21b521d 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -218,9 +218,9 @@ class Kernel(Configurable): # is it safe to assume a msg_id will not be resubmitted? reply_type = msg_type.split('_')[0] + '_reply' status = {'status' : 'aborted'} - sub = {'engine' : self.ident} - sub.update(status) - reply_msg = self.session.send(stream, reply_type, subheader=sub, + md = {'engine' : self.ident} + md.update(status) + reply_msg = self.session.send(stream, reply_type, metadata=md, content=status, parent=msg, ident=idents) return @@ -293,13 +293,16 @@ class Kernel(Configurable): # Kernel request handlers #--------------------------------------------------------------------------- - def _make_subheader(self): - """init subheader dict, for execute/apply_reply""" - return { + def _make_metadata(self, other=None): + """init metadata dict, for execute/apply_reply""" + new_md = { 'dependencies_met' : True, 'engine' : self.ident, 'started': datetime.now(), } + if other: + new_md.update(other) + return new_md def _publish_pyin(self, code, parent, execution_count): """Publish the code request on the pyin stream.""" @@ -333,7 +336,7 @@ class Kernel(Configurable): self.log.error("%s", parent) return - sub = self._make_subheader() + md = self._make_metadata(parent['metadata']) shell = self.shell # we'll need this a lot here @@ -425,13 +428,13 @@ class Kernel(Configurable): # Send the reply. reply_content = json_clean(reply_content) - sub['status'] = reply_content['status'] + md['status'] = reply_content['status'] if reply_content['status'] == 'error' and \ reply_content['ename'] == 'UnmetDependency': - sub['dependencies_met'] = False + md['dependencies_met'] = False reply_msg = self.session.send(stream, u'execute_reply', - reply_content, parent, subheader=sub, + reply_content, parent, metadata=md, ident=ident) self.log.debug("%s", reply_msg) @@ -543,7 +546,7 @@ class Kernel(Configurable): # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_socket.send(pyin_msg) # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) - sub = self._make_subheader() + md = self._make_metadata(parent['metadata']) try: working = shell.user_ns @@ -589,19 +592,19 @@ class Kernel(Configurable): result_buf = [] if reply_content['ename'] == 'UnmetDependency': - sub['dependencies_met'] = False + md['dependencies_met'] = False else: reply_content = {'status' : 'ok'} # put 'ok'/'error' status in header, for scheduler introspection: - sub['status'] = reply_content['status'] + md['status'] = reply_content['status'] # flush i/o sys.stdout.flush() sys.stderr.flush() reply_msg = self.session.send(stream, u'apply_reply', reply_content, - parent=parent, ident=ident,buffers=result_buf, subheader=sub) + parent=parent, ident=ident,buffers=result_buf, metadata=md) self._publish_status(u'idle', parent) @@ -672,9 +675,9 @@ class Kernel(Configurable): reply_type = msg_type.split('_')[0] + '_reply' status = {'status' : 'aborted'} - sub = {'engine' : self.ident} - sub.update(status) - reply_msg = self.session.send(stream, reply_type, subheader=sub, + md = {'engine' : self.ident} + md.update(status) + reply_msg = self.session.send(stream, reply_type, meatadata=md, content=status, parent=msg, ident=idents) self.log.debug("%s", reply_msg) # We need to wait a bit for requests to come in. This can probably