Show More
@@ -176,6 +176,34 b' class StreamSession(object):' | |||||
176 | header = extract_header(msg_or_header) |
|
176 | header = extract_header(msg_or_header) | |
177 | return header.get('key', None) == self.key |
|
177 | return header.get('key', None) == self.key | |
178 |
|
178 | |||
|
179 | ||||
|
180 | def serialize(self, msg, ident=None): | |||
|
181 | content = msg.get('content', {}) | |||
|
182 | if content is None: | |||
|
183 | content = self.none | |||
|
184 | elif isinstance(content, dict): | |||
|
185 | content = self.pack(content) | |||
|
186 | elif isinstance(content, bytes): | |||
|
187 | # content is already packed, as in a relayed message | |||
|
188 | pass | |||
|
189 | else: | |||
|
190 | raise TypeError("Content incorrect type: %s"%type(content)) | |||
|
191 | ||||
|
192 | to_send = [] | |||
|
193 | ||||
|
194 | if isinstance(ident, list): | |||
|
195 | # accept list of idents | |||
|
196 | to_send.extend(ident) | |||
|
197 | elif ident is not None: | |||
|
198 | to_send.append(ident) | |||
|
199 | to_send.append(DELIM) | |||
|
200 | if self.key is not None: | |||
|
201 | to_send.append(self.key) | |||
|
202 | to_send.append(self.pack(msg['header'])) | |||
|
203 | to_send.append(self.pack(msg['parent_header'])) | |||
|
204 | to_send.append(content) | |||
|
205 | ||||
|
206 | return to_send | |||
179 |
|
207 | |||
180 | def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False): |
|
208 | def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False): | |
181 | """Build and send a message via stream or socket. |
|
209 | """Build and send a message via stream or socket. | |
@@ -221,33 +249,11 b' class StreamSession(object):' | |||||
221 | # we got a Message, not a msg_type |
|
249 | # we got a Message, not a msg_type | |
222 | # don't build a new Message |
|
250 | # don't build a new Message | |
223 | msg = msg_or_type |
|
251 | msg = msg_or_type | |
224 | content = msg['content'] |
|
|||
225 | else: |
|
252 | else: | |
226 | msg = self.msg(msg_or_type, content, parent, subheader) |
|
253 | msg = self.msg(msg_or_type, content, parent, subheader) | |
227 |
|
254 | |||
228 | buffers = [] if buffers is None else buffers |
|
255 | buffers = [] if buffers is None else buffers | |
229 | to_send = [] |
|
256 | to_send = self.serialize(msg, ident) | |
230 | if isinstance(ident, list): |
|
|||
231 | # accept list of idents |
|
|||
232 | to_send.extend(ident) |
|
|||
233 | elif ident is not None: |
|
|||
234 | to_send.append(ident) |
|
|||
235 | to_send.append(DELIM) |
|
|||
236 | if self.key is not None: |
|
|||
237 | to_send.append(self.key) |
|
|||
238 | to_send.append(self.pack(msg['header'])) |
|
|||
239 | to_send.append(self.pack(msg['parent_header'])) |
|
|||
240 |
|
||||
241 | if content is None: |
|
|||
242 | content = self.none |
|
|||
243 | elif isinstance(content, dict): |
|
|||
244 | content = self.pack(content) |
|
|||
245 | elif isinstance(content, bytes): |
|
|||
246 | # content is already packed, as in a relayed message |
|
|||
247 | pass |
|
|||
248 | else: |
|
|||
249 | raise TypeError("Content incorrect type: %s"%type(content)) |
|
|||
250 | to_send.append(content) |
|
|||
251 | flag = 0 |
|
257 | flag = 0 | |
252 | if buffers: |
|
258 | if buffers: | |
253 | flag = zmq.SNDMORE |
|
259 | flag = zmq.SNDMORE |
General Comments 0
You need to be logged in to leave comments.
Login now