diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 6b3f497..326213a 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -1165,7 +1165,7 @@ class Hub(SessionFactory): msg = self.session.msg(header['msg_type']) msg['content'] = rec['content'] msg['header'] = header - msg['msg_id'] = rec['msg_id'] + msg['header']['msg_id'] = rec['msg_id'] self.session.send(self.resubmit, msg, buffers=rec['buffers']) finish(dict(status='ok')) diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index 5f495b0..75572c8 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -350,15 +350,15 @@ class Session(Configurable): def msg_header(self, msg_type): return msg_header(self.msg_id, msg_type, self.username, self.session) - def msg(self, msg_type, content=None, parent=None, subheader=None): + def msg(self, msg_type, content=None, parent=None, subheader=None, header=None): """Return the nested message dict. This format is different from what is sent over the wire. The - self.serialize method converts this nested message dict to the wire - format, which uses a message list. + serialize/unserialize methods converts this nested message dict to the wire + format, which is a list of message parts. """ msg = {} - msg['header'] = self.msg_header(msg_type) + msg['header'] = self.msg_header(msg_type) if header is None else header msg['parent_header'] = {} if parent is None else extract_header(parent) msg['content'] = {} if content is None else content sub = {} if subheader is None else subheader @@ -436,8 +436,8 @@ class Session(Configurable): return to_send - def send(self, stream, msg_or_type, content=None, parent=None, ident=None, - buffers=None, subheader=None, track=False): + def send(self, stream, msg_or_type, content=None, parent=None, ident=None + buffers=None, subheader=None, track=False, header=None): """Build and send a message via stream or socket. The message format used by this function internally is as follows: @@ -445,37 +445,41 @@ class Session(Configurable): [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, buffer1,buffer2,...] - The self.serialize method converts the nested message dict into this + The serialize/unserialize methods convert the nested message dict into this format. Parameters ---------- stream : zmq.Socket or ZMQStream - the socket-like object used to send the data + The socket-like object used to send the data. msg_or_type : str or Message/dict Normally, msg_or_type will be a msg_type unless a message is being sent more than once. content : dict or None - the content of the message (ignored if msg_or_type is a message) + The content of the message (ignored if msg_or_type is a message). + header : dict or None + The header dict for the message (ignores if msg_to_type is a message). parent : Message or dict or None - the parent or parent header describing the parent of this message + The parent or parent header describing the parent of this message + (ignored if msg_or_type is a message). ident : bytes or list of bytes - the zmq.IDENTITY routing path + The zmq.IDENTITY routing path. subheader : dict or None - extra header keys for this message's header + Extra header keys for this message's header (ignored if msg_or_type + is a message). buffers : list or None - the already-serialized buffers to be appended to the message + The already-serialized buffers to be appended to the message. track : bool - whether to track. Only for use with Sockets, - because ZMQStream objects cannot track messages. + Whether to track. Only for use with Sockets, because ZMQStream + objects cannot track messages. Returns ------- - msg : message dict - the constructed message - (msg,tracker) : (message dict, MessageTracker) + msg : dict + The constructed message. + (msg,tracker) : (dict, MessageTracker) if track=True, then a 2-tuple will be returned, the first element being the constructed message, and the second being the MessageTracker @@ -488,12 +492,13 @@ class Session(Configurable): raise TypeError("ZMQStream cannot track messages") if isinstance(msg_or_type, (Message, dict)): - # we got a Message, not a msg_type - # don't build a new Message + # We got a Message or message dict, not a msg_type so don't + # build a new Message. msg = msg_or_type else: - msg = self.msg(msg_or_type, content, parent, subheader) - + msg = self.msg(msg_or_type, content=content, parent=parent, + subheader=subheader, header=header) + buffers = [] if buffers is None else buffers to_send = self.serialize(msg, ident) flag = 0 @@ -523,7 +528,7 @@ class Session(Configurable): msg['tracker'] = tracker return msg - + def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None): """Send a raw message via ident path. @@ -545,7 +550,7 @@ class Session(Configurable): ident = [ident] if ident is not None: to_send.extend(ident) - + to_send.append(DELIM) to_send.append(self.sign(msg_list)) to_send.extend(msg_list)