diff --git a/IPython/parallel/streamsession.py b/IPython/parallel/streamsession.py index e5e8bd4..3d1b9cf 100644 --- a/IPython/parallel/streamsession.py +++ b/IPython/parallel/streamsession.py @@ -176,6 +176,34 @@ class StreamSession(object): header = extract_header(msg_or_header) return header.get('key', None) == self.key + + def serialize(self, msg, ident=None): + content = msg.get('content', {}) + if content is None: + content = self.none + elif isinstance(content, dict): + content = self.pack(content) + elif isinstance(content, bytes): + # content is already packed, as in a relayed message + pass + else: + raise TypeError("Content incorrect type: %s"%type(content)) + + to_send = [] + + if isinstance(ident, list): + # accept list of idents + to_send.extend(ident) + elif ident is not None: + to_send.append(ident) + to_send.append(DELIM) + if self.key is not None: + to_send.append(self.key) + to_send.append(self.pack(msg['header'])) + to_send.append(self.pack(msg['parent_header'])) + to_send.append(content) + + return to_send def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False): """Build and send a message via stream or socket. @@ -221,33 +249,11 @@ class StreamSession(object): # we got a Message, not a msg_type # don't build a new Message msg = msg_or_type - content = msg['content'] else: msg = self.msg(msg_or_type, content, parent, subheader) buffers = [] if buffers is None else buffers - to_send = [] - if isinstance(ident, list): - # accept list of idents - to_send.extend(ident) - elif ident is not None: - to_send.append(ident) - to_send.append(DELIM) - if self.key is not None: - to_send.append(self.key) - to_send.append(self.pack(msg['header'])) - to_send.append(self.pack(msg['parent_header'])) - - if content is None: - content = self.none - elif isinstance(content, dict): - content = self.pack(content) - elif isinstance(content, bytes): - # content is already packed, as in a relayed message - pass - else: - raise TypeError("Content incorrect type: %s"%type(content)) - to_send.append(content) + to_send = self.serialize(msg, ident) flag = 0 if buffers: flag = zmq.SNDMORE