diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index e2001a2..7d2ebc9 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -351,6 +351,12 @@ class Session(Configurable): return msg_header(self.msg_id, msg_type, self.username, self.session) def msg(self, msg_type, content=None, parent=None, subheader=None): + """Return the nested message dict. + + This format is different from what is sent over the wire. The + self.serialize method converts this nested message dict to the wire + format, which uses a message list. + """ msg = {} msg['header'] = self.msg_header(msg_type) msg['msg_id'] = msg['header']['msg_id'] @@ -361,23 +367,37 @@ class Session(Configurable): msg['header'].update(sub) return msg - def sign(self, msg): - """Sign a message with HMAC digest. If no auth, return b''.""" + def sign(self, msg_list): + """Sign a message with HMAC digest. If no auth, return b''. + + Parameters + ---------- + msg_list : list + The [p_header,p_parent,p_content] part of the message list. + """ if self.auth is None: return b'' h = self.auth.copy() - for m in msg: + for m in msg_list: h.update(m) return h.hexdigest() def serialize(self, msg, ident=None): """Serialize the message components to bytes. - + + Parameters + ---------- + msg : dict or Message + The nexted message dict as returned by the self.msg method. + Returns ------- - - list of bytes objects - + msg_list : list + The list of bytes objects to be sent with the format: + [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, + buffer1,buffer2,...]. In this list, the p_* entities are + the packed or serialized versions, so if JSON is used, these + are uft8 encoded JSON strings. """ content = msg.get('content', {}) if content is None: @@ -417,7 +437,15 @@ class Session(Configurable): def send(self, stream, msg_or_type, content=None, parent=None, ident=None, buffers=None, subheader=None, track=False): """Build and send a message via stream or socket. - + + The message format used by this function internally is as follows: + + [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, + buffer1,buffer2,...] + + The self.serialize method converts the nested message dict into this + format. + Parameters ---------- @@ -483,7 +511,7 @@ class Session(Configurable): tracker = stream.send(buffers[-1], copy=False, track=track) else: tracker = stream.send(buffers[-1], copy=False) - + # omsg = Message(msg) if self.debug: pprint.pprint(msg) @@ -494,12 +522,22 @@ class Session(Configurable): return msg - def send_raw(self, stream, msg, flags=0, copy=True, ident=None): + def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None): """Send a raw message via ident path. - + + This method is used to send a already serialized message. + Parameters ---------- - msg : list of sendable buffers""" + stream : ZMQStream or Socket + The ZMQ stream or socket to use for sending the message. + msg_list : list + The serialized list of messages to send. This only includes the + [p_header,p_parent,p_content,buffer1,buffer2,...] portion of + the message. + ident : ident or list + A single ident or a list of idents to use in sending. + """ to_send = [] if isinstance(ident, bytes): ident = [ident] @@ -507,17 +545,28 @@ class Session(Configurable): to_send.extend(ident) to_send.append(DELIM) - to_send.append(self.sign(msg)) - to_send.extend(msg) - stream.send_multipart(msg, flags, copy=copy) + to_send.append(self.sign(msg_list)) + to_send.extend(msg_list) + stream.send_multipart(msg_list, flags, copy=copy) def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): - """receives and unpacks a message - returns [idents], msg""" + """Receive and unpack a message. + + Parameters + ---------- + socket : ZMQStream or Socket + The socket or stream to use in receiving. + + Returns + ------- + [idents], msg + [idents] is a list of idents and msg is a nested message dict of + same format as self.msg returns. + """ if isinstance(socket, ZMQStream): socket = socket.socket try: - msg = socket.recv_multipart(mode) + msg_list = socket.recv_multipart(mode) except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: # We can convert EAGAIN to None as we know in this case @@ -527,48 +576,50 @@ class Session(Configurable): raise # split multipart message into identity list and message dict # invalid large messages can cause very expensive string comparisons - idents, msg = self.feed_identities(msg, copy) + idents, msg_list = self.feed_identities(msg_list, copy) try: - return idents, self.unpack_message(msg, content=content, copy=copy) + return idents, self.unpack_message(msg_list, content=content, copy=copy) except Exception as e: - print (idents, msg) + print (idents, msg_list) # TODO: handle it raise e - def feed_identities(self, msg, copy=True): - """feed until DELIM is reached, then return the prefix as idents and - remainder as msg. This is easily broken by setting an IDENT to DELIM, + def feed_identities(self, msg_list, copy=True): + """Split the identities from the rest of the message. + + Feed until DELIM is reached, then return the prefix as idents and + remainder as msg_list. This is easily broken by setting an IDENT to DELIM, but that would be silly. Parameters ---------- - msg : a list of Message or bytes objects - the message to be split + msg_list : a list of Message or bytes objects + The message to be split. copy : bool flag determining whether the arguments are bytes or Messages Returns ------- - (idents,msg) : two lists + (idents,msg_list) : two lists idents will always be a list of bytes - the indentity prefix - msg will be a list of bytes or Messages, unchanged from input - msg should be unpackable via self.unpack_message at this point. + msg_list will be a list of bytes or Messages, unchanged from input + msg_list should be unpackable via self.unpack_message at this point. """ if copy: - idx = msg.index(DELIM) - return msg[:idx], msg[idx+1:] + idx = msg_list.index(DELIM) + return msg_list[:idx], msg_list[idx+1:] else: failed = True - for idx,m in enumerate(msg): + for idx,m in enumerate(msg_list): if m.bytes == DELIM: failed = False break if failed: - raise ValueError("DELIM not in msg") - idents, msg = msg[:idx], msg[idx+1:] - return [m.bytes for m in idents], msg + raise ValueError("DELIM not in msg_list") + idents, msg_list = msg_list[:idx], msg_list[idx+1:] + return [m.bytes for m in idents], msg_list - def unpack_message(self, msg, content=True, copy=True): + def unpack_message(self, msg_list, content=True, copy=True): """Return a message object from the format sent by self.send. @@ -588,26 +639,26 @@ class Session(Configurable): message = {} if not copy: for i in range(minlen): - msg[i] = msg[i].bytes + msg_list[i] = msg_list[i].bytes if self.auth is not None: - signature = msg[0] + signature = msg_list[0] if signature in self.digest_history: raise ValueError("Duplicate Signature: %r"%signature) self.digest_history.add(signature) - check = self.sign(msg[1:4]) + check = self.sign(msg_list[1:4]) if not signature == check: raise ValueError("Invalid Signature: %r"%signature) - if not len(msg) >= minlen: + if not len(msg_list) >= minlen: raise TypeError("malformed message, must have at least %i elements"%minlen) - message['header'] = self.unpack(msg[1]) + message['header'] = self.unpack(msg_list[1]) message['msg_type'] = message['header']['msg_type'] - message['parent_header'] = self.unpack(msg[2]) + message['parent_header'] = self.unpack(msg_list[2]) if content: - message['content'] = self.unpack(msg[3]) + message['content'] = self.unpack(msg_list[3]) else: - message['content'] = msg[3] + message['content'] = msg_list[3] - message['buffers'] = msg[4:] + message['buffers'] = msg_list[4:] return message def test_msg2obj():