##// END OF EJS Templates
added squash_unicode to prevent unicode pollution of json
MinRK -
Show More
@@ -1,447 +1,498 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4
4
5
5
6 import os
6 import os
7 import sys
7 import sys
8 import traceback
8 import traceback
9 import pprint
9 import pprint
10 import uuid
10 import uuid
11
11
12 import zmq
12 import zmq
13 from zmq.utils import jsonapi
13 from zmq.utils import jsonapi
14 from zmq.eventloop.zmqstream import ZMQStream
14 from zmq.eventloop.zmqstream import ZMQStream
15
15
16 from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
16 from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
17 from IPython.zmq.newserialized import serialize, unserialize
17 from IPython.zmq.newserialized import serialize, unserialize
18
18
19 try:
19 try:
20 import cPickle
20 import cPickle
21 pickle = cPickle
21 pickle = cPickle
22 except:
22 except:
23 cPickle = None
23 cPickle = None
24 import pickle
24 import pickle
25
25
26 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
26 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
27 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
27 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
28 if json_name in ('jsonlib', 'jsonlib2'):
28 if json_name in ('jsonlib', 'jsonlib2'):
29 use_json = True
29 use_json = True
30 elif json_name:
30 elif json_name:
31 if cPickle is None:
31 if cPickle is None:
32 use_json = True
32 use_json = True
33 else:
33 else:
34 use_json = False
34 use_json = False
35 else:
35 else:
36 use_json = False
36 use_json = False
37
37
38 def squash_unicode(obj):
39 if isinstance(obj,dict):
40 for key in obj.keys():
41 obj[key] = squash_unicode(obj[key])
42 if isinstance(key, unicode):
43 obj[squash_unicode(key)] = obj.pop(key)
44 elif isinstance(obj, list):
45 for i,v in enumerate(obj):
46 obj[i] = squash_unicode(v)
47 elif isinstance(obj, unicode):
48 obj = obj.encode('utf8')
49 return obj
50
38 if use_json:
51 if use_json:
39 default_packer = jsonapi.dumps
52 default_packer = jsonapi.dumps
40 default_unpacker = jsonapi.loads
53 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
41 else:
54 else:
42 default_packer = lambda o: pickle.dumps(o,-1)
55 default_packer = lambda o: pickle.dumps(o,-1)
43 default_unpacker = pickle.loads
56 default_unpacker = pickle.loads
44
57
45
58
46 DELIM="<IDS|MSG>"
59 DELIM="<IDS|MSG>"
47
60
48 def wrap_exception():
61 def wrap_exception():
49 etype, evalue, tb = sys.exc_info()
62 etype, evalue, tb = sys.exc_info()
50 tb = traceback.format_exception(etype, evalue, tb)
63 tb = traceback.format_exception(etype, evalue, tb)
51 exc_content = {
64 exc_content = {
52 u'status' : u'error',
65 u'status' : u'error',
53 u'traceback' : tb,
66 u'traceback' : tb,
54 u'etype' : unicode(etype),
67 u'etype' : unicode(etype),
55 u'evalue' : unicode(evalue)
68 u'evalue' : unicode(evalue)
56 }
69 }
57 return exc_content
70 return exc_content
58
71
59 class KernelError(Exception):
72 class KernelError(Exception):
60 pass
73 pass
61
74
62 def unwrap_exception(content):
75 def unwrap_exception(content):
63 err = KernelError(content['etype'], content['evalue'])
76 err = KernelError(content['etype'], content['evalue'])
64 err.evalue = content['evalue']
77 err.evalue = content['evalue']
65 err.etype = content['etype']
78 err.etype = content['etype']
66 err.traceback = ''.join(content['traceback'])
79 err.traceback = ''.join(content['traceback'])
67 return err
80 return err
68
81
69
82
70 class Message(object):
83 class Message(object):
71 """A simple message object that maps dict keys to attributes.
84 """A simple message object that maps dict keys to attributes.
72
85
73 A Message can be created from a dict and a dict from a Message instance
86 A Message can be created from a dict and a dict from a Message instance
74 simply by calling dict(msg_obj)."""
87 simply by calling dict(msg_obj)."""
75
88
76 def __init__(self, msg_dict):
89 def __init__(self, msg_dict):
77 dct = self.__dict__
90 dct = self.__dict__
78 for k, v in dict(msg_dict).iteritems():
91 for k, v in dict(msg_dict).iteritems():
79 if isinstance(v, dict):
92 if isinstance(v, dict):
80 v = Message(v)
93 v = Message(v)
81 dct[k] = v
94 dct[k] = v
82
95
83 # Having this iterator lets dict(msg_obj) work out of the box.
96 # Having this iterator lets dict(msg_obj) work out of the box.
84 def __iter__(self):
97 def __iter__(self):
85 return iter(self.__dict__.iteritems())
98 return iter(self.__dict__.iteritems())
86
99
87 def __repr__(self):
100 def __repr__(self):
88 return repr(self.__dict__)
101 return repr(self.__dict__)
89
102
90 def __str__(self):
103 def __str__(self):
91 return pprint.pformat(self.__dict__)
104 return pprint.pformat(self.__dict__)
92
105
93 def __contains__(self, k):
106 def __contains__(self, k):
94 return k in self.__dict__
107 return k in self.__dict__
95
108
96 def __getitem__(self, k):
109 def __getitem__(self, k):
97 return self.__dict__[k]
110 return self.__dict__[k]
98
111
99
112
100 def msg_header(msg_id, msg_type, username, session):
113 def msg_header(msg_id, msg_type, username, session):
101 return locals()
114 return locals()
102 # return {
115 # return {
103 # 'msg_id' : msg_id,
116 # 'msg_id' : msg_id,
104 # 'msg_type': msg_type,
117 # 'msg_type': msg_type,
105 # 'username' : username,
118 # 'username' : username,
106 # 'session' : session
119 # 'session' : session
107 # }
120 # }
108
121
109
122
110 def extract_header(msg_or_header):
123 def extract_header(msg_or_header):
111 """Given a message or header, return the header."""
124 """Given a message or header, return the header."""
112 if not msg_or_header:
125 if not msg_or_header:
113 return {}
126 return {}
114 try:
127 try:
115 # See if msg_or_header is the entire message.
128 # See if msg_or_header is the entire message.
116 h = msg_or_header['header']
129 h = msg_or_header['header']
117 except KeyError:
130 except KeyError:
118 try:
131 try:
119 # See if msg_or_header is just the header
132 # See if msg_or_header is just the header
120 h = msg_or_header['msg_id']
133 h = msg_or_header['msg_id']
121 except KeyError:
134 except KeyError:
122 raise
135 raise
123 else:
136 else:
124 h = msg_or_header
137 h = msg_or_header
125 if not isinstance(h, dict):
138 if not isinstance(h, dict):
126 h = dict(h)
139 h = dict(h)
127 return h
140 return h
128
141
129 def rekey(dikt):
142 def rekey(dikt):
130 """rekey a dict that has been forced to use str keys where there should be
143 """rekey a dict that has been forced to use str keys where there should be
131 ints by json. This belongs in the jsonutil added by fperez."""
144 ints by json. This belongs in the jsonutil added by fperez."""
132 for k in dikt.iterkeys():
145 for k in dikt.iterkeys():
133 if isinstance(k, str):
146 if isinstance(k, str):
134 ik=fk=None
147 ik=fk=None
135 try:
148 try:
136 ik = int(k)
149 ik = int(k)
137 except ValueError:
150 except ValueError:
138 try:
151 try:
139 fk = float(k)
152 fk = float(k)
140 except ValueError:
153 except ValueError:
141 continue
154 continue
142 if ik is not None:
155 if ik is not None:
143 nk = ik
156 nk = ik
144 else:
157 else:
145 nk = fk
158 nk = fk
146 if nk in dikt:
159 if nk in dikt:
147 raise KeyError("already have key %r"%nk)
160 raise KeyError("already have key %r"%nk)
148 dikt[nk] = dikt.pop(k)
161 dikt[nk] = dikt.pop(k)
149 return dikt
162 return dikt
150
163
151 def serialize_object(obj, threshold=64e-6):
164 def serialize_object(obj, threshold=64e-6):
152 """serialize an object into a list of sendable buffers.
165 """serialize an object into a list of sendable buffers.
153
166
154 Returns: (pmd, bufs)
167 Returns: (pmd, bufs)
155 where pmd is the pickled metadata wrapper, and bufs
168 where pmd is the pickled metadata wrapper, and bufs
156 is a list of data buffers"""
169 is a list of data buffers"""
157 # threshold is 100 B
170 # threshold is 100 B
158 databuffers = []
171 databuffers = []
159 if isinstance(obj, (list, tuple)):
172 if isinstance(obj, (list, tuple)):
160 clist = canSequence(obj)
173 clist = canSequence(obj)
161 slist = map(serialize, clist)
174 slist = map(serialize, clist)
162 for s in slist:
175 for s in slist:
163 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
176 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
164 databuffers.append(s.getData())
177 databuffers.append(s.getData())
165 s.data = None
178 s.data = None
166 return pickle.dumps(slist,-1), databuffers
179 return pickle.dumps(slist,-1), databuffers
167 elif isinstance(obj, dict):
180 elif isinstance(obj, dict):
168 sobj = {}
181 sobj = {}
169 for k in sorted(obj.iterkeys()):
182 for k in sorted(obj.iterkeys()):
170 s = serialize(can(obj[k]))
183 s = serialize(can(obj[k]))
171 if s.getDataSize() > threshold:
184 if s.getDataSize() > threshold:
172 databuffers.append(s.getData())
185 databuffers.append(s.getData())
173 s.data = None
186 s.data = None
174 sobj[k] = s
187 sobj[k] = s
175 return pickle.dumps(sobj,-1),databuffers
188 return pickle.dumps(sobj,-1),databuffers
176 else:
189 else:
177 s = serialize(can(obj))
190 s = serialize(can(obj))
178 if s.getDataSize() > threshold:
191 if s.getDataSize() > threshold:
179 databuffers.append(s.getData())
192 databuffers.append(s.getData())
180 s.data = None
193 s.data = None
181 return pickle.dumps(s,-1),databuffers
194 return pickle.dumps(s,-1),databuffers
182
195
183
196
184 def unserialize_object(bufs):
197 def unserialize_object(bufs):
185 """reconstruct an object serialized by serialize_object from data buffers"""
198 """reconstruct an object serialized by serialize_object from data buffers"""
186 bufs = list(bufs)
199 bufs = list(bufs)
187 sobj = pickle.loads(bufs.pop(0))
200 sobj = pickle.loads(bufs.pop(0))
188 if isinstance(sobj, (list, tuple)):
201 if isinstance(sobj, (list, tuple)):
189 for s in sobj:
202 for s in sobj:
190 if s.data is None:
203 if s.data is None:
191 s.data = bufs.pop(0)
204 s.data = bufs.pop(0)
192 return uncanSequence(map(unserialize, sobj))
205 return uncanSequence(map(unserialize, sobj))
193 elif isinstance(sobj, dict):
206 elif isinstance(sobj, dict):
194 newobj = {}
207 newobj = {}
195 for k in sorted(sobj.iterkeys()):
208 for k in sorted(sobj.iterkeys()):
196 s = sobj[k]
209 s = sobj[k]
197 if s.data is None:
210 if s.data is None:
198 s.data = bufs.pop(0)
211 s.data = bufs.pop(0)
199 newobj[k] = uncan(unserialize(s))
212 newobj[k] = uncan(unserialize(s))
200 return newobj
213 return newobj
201 else:
214 else:
202 if sobj.data is None:
215 if sobj.data is None:
203 sobj.data = bufs.pop(0)
216 sobj.data = bufs.pop(0)
204 return uncan(unserialize(sobj))
217 return uncan(unserialize(sobj))
205
218
206 def pack_apply_message(f, args, kwargs, threshold=64e-6):
219 def pack_apply_message(f, args, kwargs, threshold=64e-6):
207 """pack up a function, args, and kwargs to be sent over the wire
220 """pack up a function, args, and kwargs to be sent over the wire
208 as a series of buffers. Any object whose data is larger than `threshold`
221 as a series of buffers. Any object whose data is larger than `threshold`
209 will not have their data copied (currently only numpy arrays support zero-copy)"""
222 will not have their data copied (currently only numpy arrays support zero-copy)"""
210 msg = [pickle.dumps(can(f),-1)]
223 msg = [pickle.dumps(can(f),-1)]
211 databuffers = [] # for large objects
224 databuffers = [] # for large objects
212 sargs, bufs = serialize_object(args,threshold)
225 sargs, bufs = serialize_object(args,threshold)
213 msg.append(sargs)
226 msg.append(sargs)
214 databuffers.extend(bufs)
227 databuffers.extend(bufs)
215 skwargs, bufs = serialize_object(kwargs,threshold)
228 skwargs, bufs = serialize_object(kwargs,threshold)
216 msg.append(skwargs)
229 msg.append(skwargs)
217 databuffers.extend(bufs)
230 databuffers.extend(bufs)
218 msg.extend(databuffers)
231 msg.extend(databuffers)
219 return msg
232 return msg
220
233
221 def unpack_apply_message(bufs, g=None, copy=True):
234 def unpack_apply_message(bufs, g=None, copy=True):
222 """unpack f,args,kwargs from buffers packed by pack_apply_message()
235 """unpack f,args,kwargs from buffers packed by pack_apply_message()
223 Returns: original f,args,kwargs"""
236 Returns: original f,args,kwargs"""
224 bufs = list(bufs) # allow us to pop
237 bufs = list(bufs) # allow us to pop
225 assert len(bufs) >= 3, "not enough buffers!"
238 assert len(bufs) >= 3, "not enough buffers!"
226 if not copy:
239 if not copy:
227 for i in range(3):
240 for i in range(3):
228 bufs[i] = bufs[i].bytes
241 bufs[i] = bufs[i].bytes
229 cf = pickle.loads(bufs.pop(0))
242 cf = pickle.loads(bufs.pop(0))
230 sargs = list(pickle.loads(bufs.pop(0)))
243 sargs = list(pickle.loads(bufs.pop(0)))
231 skwargs = dict(pickle.loads(bufs.pop(0)))
244 skwargs = dict(pickle.loads(bufs.pop(0)))
232 # print sargs, skwargs
245 # print sargs, skwargs
233 f = uncan(cf, g)
246 f = uncan(cf, g)
234 for sa in sargs:
247 for sa in sargs:
235 if sa.data is None:
248 if sa.data is None:
236 m = bufs.pop(0)
249 m = bufs.pop(0)
237 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
250 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
238 if copy:
251 if copy:
239 sa.data = buffer(m)
252 sa.data = buffer(m)
240 else:
253 else:
241 sa.data = m.buffer
254 sa.data = m.buffer
242 else:
255 else:
243 if copy:
256 if copy:
244 sa.data = m
257 sa.data = m
245 else:
258 else:
246 sa.data = m.bytes
259 sa.data = m.bytes
247
260
248 args = uncanSequence(map(unserialize, sargs), g)
261 args = uncanSequence(map(unserialize, sargs), g)
249 kwargs = {}
262 kwargs = {}
250 for k in sorted(skwargs.iterkeys()):
263 for k in sorted(skwargs.iterkeys()):
251 sa = skwargs[k]
264 sa = skwargs[k]
252 if sa.data is None:
265 if sa.data is None:
253 sa.data = bufs.pop(0)
266 sa.data = bufs.pop(0)
254 kwargs[k] = uncan(unserialize(sa), g)
267 kwargs[k] = uncan(unserialize(sa), g)
255
268
256 return f,args,kwargs
269 return f,args,kwargs
257
270
258 class StreamSession(object):
271 class StreamSession(object):
259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
272 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
260 debug=False
273 debug=False
261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
274 def __init__(self, username=None, session=None, packer=None, unpacker=None):
262 if username is None:
275 if username is None:
263 username = os.environ.get('USER','username')
276 username = os.environ.get('USER','username')
264 self.username = username
277 self.username = username
265 if session is None:
278 if session is None:
266 self.session = str(uuid.uuid4())
279 self.session = str(uuid.uuid4())
267 else:
280 else:
268 self.session = session
281 self.session = session
269 self.msg_id = str(uuid.uuid4())
282 self.msg_id = str(uuid.uuid4())
270 if packer is None:
283 if packer is None:
271 self.pack = default_packer
284 self.pack = default_packer
272 else:
285 else:
273 if not callable(packer):
286 if not callable(packer):
274 raise TypeError("packer must be callable, not %s"%type(packer))
287 raise TypeError("packer must be callable, not %s"%type(packer))
275 self.pack = packer
288 self.pack = packer
276
289
277 if unpacker is None:
290 if unpacker is None:
278 self.unpack = default_unpacker
291 self.unpack = default_unpacker
279 else:
292 else:
280 if not callable(unpacker):
293 if not callable(unpacker):
281 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
294 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
282 self.unpack = unpacker
295 self.unpack = unpacker
283
296
284 self.none = self.pack({})
297 self.none = self.pack({})
285
298
286 def msg_header(self, msg_type):
299 def msg_header(self, msg_type):
287 h = msg_header(self.msg_id, msg_type, self.username, self.session)
300 h = msg_header(self.msg_id, msg_type, self.username, self.session)
288 self.msg_id = str(uuid.uuid4())
301 self.msg_id = str(uuid.uuid4())
289 return h
302 return h
290
303
291 def msg(self, msg_type, content=None, parent=None, subheader=None):
304 def msg(self, msg_type, content=None, parent=None, subheader=None):
292 msg = {}
305 msg = {}
293 msg['header'] = self.msg_header(msg_type)
306 msg['header'] = self.msg_header(msg_type)
294 msg['msg_id'] = msg['header']['msg_id']
307 msg['msg_id'] = msg['header']['msg_id']
295 msg['parent_header'] = {} if parent is None else extract_header(parent)
308 msg['parent_header'] = {} if parent is None else extract_header(parent)
296 msg['msg_type'] = msg_type
309 msg['msg_type'] = msg_type
297 msg['content'] = {} if content is None else content
310 msg['content'] = {} if content is None else content
298 sub = {} if subheader is None else subheader
311 sub = {} if subheader is None else subheader
299 msg['header'].update(sub)
312 msg['header'].update(sub)
300 return msg
313 return msg
301
314
302 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
315 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
303 """send a message via stream"""
316 """Build and send a message via stream or socket.
304 msg = self.msg(msg_type, content, parent, subheader)
317
318 Parameters
319 ----------
320
321 msg_type : str or Message/dict
322 Normally, msg_type will be
323
324
325
326 Returns
327 -------
328 (msg,sent) : tuple
329 msg : Message
330 the nice wrapped dict-like object containing the headers
331
332 """
333 if isinstance(msg_type, (Message, dict)):
334 # we got a Message, not a msg_type
335 # don't build a new Message
336 msg = msg_type
337 content = msg['content']
338 else:
339 msg = self.msg(msg_type, content, parent, subheader)
305 buffers = [] if buffers is None else buffers
340 buffers = [] if buffers is None else buffers
306 to_send = []
341 to_send = []
307 if isinstance(ident, list):
342 if isinstance(ident, list):
308 # accept list of idents
343 # accept list of idents
309 to_send.extend(ident)
344 to_send.extend(ident)
310 elif ident is not None:
345 elif ident is not None:
311 to_send.append(ident)
346 to_send.append(ident)
312 to_send.append(DELIM)
347 to_send.append(DELIM)
313 to_send.append(self.pack(msg['header']))
348 to_send.append(self.pack(msg['header']))
314 to_send.append(self.pack(msg['parent_header']))
349 to_send.append(self.pack(msg['parent_header']))
315 # if parent is None:
350 # if parent is None:
316 # to_send.append(self.none)
351 # to_send.append(self.none)
317 # else:
352 # else:
318 # to_send.append(self.pack(dict(parent)))
353 # to_send.append(self.pack(dict(parent)))
319 if content is None:
354 if content is None:
320 content = self.none
355 content = self.none
321 elif isinstance(content, dict):
356 elif isinstance(content, dict):
322 content = self.pack(content)
357 content = self.pack(content)
323 elif isinstance(content, str):
358 elif isinstance(content, str):
324 # content is already packed, as in a relayed message
359 # content is already packed, as in a relayed message
325 pass
360 pass
326 else:
361 else:
327 raise TypeError("Content incorrect type: %s"%type(content))
362 raise TypeError("Content incorrect type: %s"%type(content))
328 to_send.append(content)
363 to_send.append(content)
329 flag = 0
364 flag = 0
330 if buffers:
365 if buffers:
331 flag = zmq.SNDMORE
366 flag = zmq.SNDMORE
332 stream.send_multipart(to_send, flag, copy=False)
367 stream.send_multipart(to_send, flag, copy=False)
333 for b in buffers[:-1]:
368 for b in buffers[:-1]:
334 stream.send(b, flag, copy=False)
369 stream.send(b, flag, copy=False)
335 if buffers:
370 if buffers:
336 stream.send(buffers[-1], copy=False)
371 stream.send(buffers[-1], copy=False)
337 omsg = Message(msg)
372 omsg = Message(msg)
338 if self.debug:
373 if self.debug:
339 pprint.pprint(omsg)
374 pprint.pprint(omsg)
340 pprint.pprint(to_send)
375 pprint.pprint(to_send)
341 pprint.pprint(buffers)
376 pprint.pprint(buffers)
377 # return both the msg object and the buffers
342 return omsg
378 return omsg
343
379
380 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
381 """send a raw message via idents.
382
383 Parameters
384 ----------
385 msg : list of sendable buffers"""
386 to_send = []
387 if isinstance(ident, str):
388 ident = [ident]
389 if ident is not None:
390 to_send.extend(ident)
391 to_send.append(DELIM)
392 to_send.extend(msg)
393 stream.send_multipart(msg, flags, copy=copy)
394
344 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
395 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
345 """receives and unpacks a message
396 """receives and unpacks a message
346 returns [idents], msg"""
397 returns [idents], msg"""
347 if isinstance(socket, ZMQStream):
398 if isinstance(socket, ZMQStream):
348 socket = socket.socket
399 socket = socket.socket
349 try:
400 try:
350 msg = socket.recv_multipart(mode)
401 msg = socket.recv_multipart(mode)
351 except zmq.ZMQError, e:
402 except zmq.ZMQError, e:
352 if e.errno == zmq.EAGAIN:
403 if e.errno == zmq.EAGAIN:
353 # We can convert EAGAIN to None as we know in this case
404 # We can convert EAGAIN to None as we know in this case
354 # recv_json won't return None.
405 # recv_json won't return None.
355 return None
406 return None
356 else:
407 else:
357 raise
408 raise
358 # return an actual Message object
409 # return an actual Message object
359 # determine the number of idents by trying to unpack them.
410 # determine the number of idents by trying to unpack them.
360 # this is terrible:
411 # this is terrible:
361 idents, msg = self.feed_identities(msg, copy)
412 idents, msg = self.feed_identities(msg, copy)
362 try:
413 try:
363 return idents, self.unpack_message(msg, content=content, copy=copy)
414 return idents, self.unpack_message(msg, content=content, copy=copy)
364 except Exception, e:
415 except Exception, e:
365 print idents, msg
416 print idents, msg
366 # TODO: handle it
417 # TODO: handle it
367 raise e
418 raise e
368
419
369 def feed_identities(self, msg, copy=True):
420 def feed_identities(self, msg, copy=True):
370 """This is a completely horrible thing, but it strips the zmq
421 """This is a completely horrible thing, but it strips the zmq
371 ident prefixes off of a message. It will break if any identities
422 ident prefixes off of a message. It will break if any identities
372 are unpackable by self.unpack."""
423 are unpackable by self.unpack."""
373 msg = list(msg)
424 msg = list(msg)
374 idents = []
425 idents = []
375 while len(msg) > 3:
426 while len(msg) > 3:
376 if copy:
427 if copy:
377 s = msg[0]
428 s = msg[0]
378 else:
429 else:
379 s = msg[0].bytes
430 s = msg[0].bytes
380 if s == DELIM:
431 if s == DELIM:
381 msg.pop(0)
432 msg.pop(0)
382 break
433 break
383 else:
434 else:
384 idents.append(s)
435 idents.append(s)
385 msg.pop(0)
436 msg.pop(0)
386
437
387 return idents, msg
438 return idents, msg
388
439
389 def unpack_message(self, msg, content=True, copy=True):
440 def unpack_message(self, msg, content=True, copy=True):
390 """return a message object from the format
441 """return a message object from the format
391 sent by self.send.
442 sent by self.send.
392
443
393 parameters:
444 parameters:
394
445
395 content : bool (True)
446 content : bool (True)
396 whether to unpack the content dict (True),
447 whether to unpack the content dict (True),
397 or leave it serialized (False)
448 or leave it serialized (False)
398
449
399 copy : bool (True)
450 copy : bool (True)
400 whether to return the bytes (True),
451 whether to return the bytes (True),
401 or the non-copying Message object in each place (False)
452 or the non-copying Message object in each place (False)
402
453
403 """
454 """
404 if not len(msg) >= 3:
455 if not len(msg) >= 3:
405 raise TypeError("malformed message, must have at least 3 elements")
456 raise TypeError("malformed message, must have at least 3 elements")
406 message = {}
457 message = {}
407 if not copy:
458 if not copy:
408 for i in range(3):
459 for i in range(3):
409 msg[i] = msg[i].bytes
460 msg[i] = msg[i].bytes
410 message['header'] = self.unpack(msg[0])
461 message['header'] = self.unpack(msg[0])
411 message['msg_type'] = message['header']['msg_type']
462 message['msg_type'] = message['header']['msg_type']
412 message['parent_header'] = self.unpack(msg[1])
463 message['parent_header'] = self.unpack(msg[1])
413 if content:
464 if content:
414 message['content'] = self.unpack(msg[2])
465 message['content'] = self.unpack(msg[2])
415 else:
466 else:
416 message['content'] = msg[2]
467 message['content'] = msg[2]
417
468
418 # message['buffers'] = msg[3:]
469 # message['buffers'] = msg[3:]
419 # else:
470 # else:
420 # message['header'] = self.unpack(msg[0].bytes)
471 # message['header'] = self.unpack(msg[0].bytes)
421 # message['msg_type'] = message['header']['msg_type']
472 # message['msg_type'] = message['header']['msg_type']
422 # message['parent_header'] = self.unpack(msg[1].bytes)
473 # message['parent_header'] = self.unpack(msg[1].bytes)
423 # if content:
474 # if content:
424 # message['content'] = self.unpack(msg[2].bytes)
475 # message['content'] = self.unpack(msg[2].bytes)
425 # else:
476 # else:
426 # message['content'] = msg[2].bytes
477 # message['content'] = msg[2].bytes
427
478
428 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
479 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
429 return message
480 return message
430
481
431
482
432
483
433 def test_msg2obj():
484 def test_msg2obj():
434 am = dict(x=1)
485 am = dict(x=1)
435 ao = Message(am)
486 ao = Message(am)
436 assert ao.x == am['x']
487 assert ao.x == am['x']
437
488
438 am['y'] = dict(z=1)
489 am['y'] = dict(z=1)
439 ao = Message(am)
490 ao = Message(am)
440 assert ao.y.z == am['y']['z']
491 assert ao.y.z == am['y']['z']
441
492
442 k1, k2 = 'y', 'z'
493 k1, k2 = 'y', 'z'
443 assert ao[k1][k2] == am[k1][k2]
494 assert ao[k1][k2] == am[k1][k2]
444
495
445 am2 = dict(ao)
496 am2 = dict(ao)
446 assert am['x'] == am2['x']
497 assert am['x'] == am2['x']
447 assert am['y']['z'] == am2['y']['z']
498 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now