Show More
@@ -35,9 +35,22 elif json_name: | |||
|
35 | 35 | else: |
|
36 | 36 | use_json = False |
|
37 | 37 | |
|
38 | def squash_unicode(obj): | |
|
39 | if isinstance(obj,dict): | |
|
40 | for key in obj.keys(): | |
|
41 | obj[key] = squash_unicode(obj[key]) | |
|
42 | if isinstance(key, unicode): | |
|
43 | obj[squash_unicode(key)] = obj.pop(key) | |
|
44 | elif isinstance(obj, list): | |
|
45 | for i,v in enumerate(obj): | |
|
46 | obj[i] = squash_unicode(v) | |
|
47 | elif isinstance(obj, unicode): | |
|
48 | obj = obj.encode('utf8') | |
|
49 | return obj | |
|
50 | ||
|
38 | 51 | if use_json: |
|
39 | 52 | default_packer = jsonapi.dumps |
|
40 | default_unpacker = jsonapi.loads | |
|
53 | default_unpacker = lambda s: squash_unicode(jsonapi.loads(s)) | |
|
41 | 54 | else: |
|
42 | 55 | default_packer = lambda o: pickle.dumps(o,-1) |
|
43 | 56 | default_unpacker = pickle.loads |
@@ -300,8 +313,30 class StreamSession(object): | |||
|
300 | 313 | return msg |
|
301 | 314 | |
|
302 | 315 | def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): |
|
303 |
"""send a message via stream |
|
|
304 | msg = self.msg(msg_type, content, parent, subheader) | |
|
316 | """Build and send a message via stream or socket. | |
|
317 | ||
|
318 | Parameters | |
|
319 | ---------- | |
|
320 | ||
|
321 | msg_type : str or Message/dict | |
|
322 | Normally, msg_type will be | |
|
323 | ||
|
324 | ||
|
325 | ||
|
326 | Returns | |
|
327 | ------- | |
|
328 | (msg,sent) : tuple | |
|
329 | msg : Message | |
|
330 | the nice wrapped dict-like object containing the headers | |
|
331 | ||
|
332 | """ | |
|
333 | if isinstance(msg_type, (Message, dict)): | |
|
334 | # we got a Message, not a msg_type | |
|
335 | # don't build a new Message | |
|
336 | msg = msg_type | |
|
337 | content = msg['content'] | |
|
338 | else: | |
|
339 | msg = self.msg(msg_type, content, parent, subheader) | |
|
305 | 340 | buffers = [] if buffers is None else buffers |
|
306 | 341 | to_send = [] |
|
307 | 342 | if isinstance(ident, list): |
@@ -339,8 +374,24 class StreamSession(object): | |||
|
339 | 374 | pprint.pprint(omsg) |
|
340 | 375 | pprint.pprint(to_send) |
|
341 | 376 | pprint.pprint(buffers) |
|
377 | # return both the msg object and the buffers | |
|
342 | 378 | return omsg |
|
343 | ||
|
379 | ||
|
380 | def send_raw(self, stream, msg, flags=0, copy=True, idents=None): | |
|
381 | """send a raw message via idents. | |
|
382 | ||
|
383 | Parameters | |
|
384 | ---------- | |
|
385 | msg : list of sendable buffers""" | |
|
386 | to_send = [] | |
|
387 | if isinstance(ident, str): | |
|
388 | ident = [ident] | |
|
389 | if ident is not None: | |
|
390 | to_send.extend(ident) | |
|
391 | to_send.append(DELIM) | |
|
392 | to_send.extend(msg) | |
|
393 | stream.send_multipart(msg, flags, copy=copy) | |
|
394 | ||
|
344 | 395 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): |
|
345 | 396 | """receives and unpacks a message |
|
346 | 397 | returns [idents], msg""" |
General Comments 0
You need to be logged in to leave comments.
Login now