##// END OF EJS Templates
util files into utils dir
MinRK -
Show More
1 NO CONTENT: file renamed from IPython/zmq/codeutil.py to IPython/utils/codeutil.py
NO CONTENT: file renamed from IPython/zmq/codeutil.py to IPython/utils/codeutil.py
1 NO CONTENT: file renamed from IPython/zmq/newserialized.py to IPython/utils/newserialized.py
NO CONTENT: file renamed from IPython/zmq/newserialized.py to IPython/utils/newserialized.py
1 NO CONTENT: file renamed from IPython/zmq/pickleutil.py to IPython/utils/pickleutil.py
NO CONTENT: file renamed from IPython/zmq/pickleutil.py to IPython/utils/pickleutil.py
@@ -1,510 +1,510 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4
4
5
5
6 import os
6 import os
7 import sys
7 import sys
8 import traceback
8 import traceback
9 import pprint
9 import pprint
10 import uuid
10 import uuid
11 from datetime import datetime
11 from datetime import datetime
12
12
13 import zmq
13 import zmq
14 from zmq.utils import jsonapi
14 from zmq.utils import jsonapi
15 from zmq.eventloop.zmqstream import ZMQStream
15 from zmq.eventloop.zmqstream import ZMQStream
16
16
17 from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
18 from IPython.zmq.newserialized import serialize, unserialize
18 from IPython.utils.newserialized import serialize, unserialize
19
19
20 try:
20 try:
21 import cPickle
21 import cPickle
22 pickle = cPickle
22 pickle = cPickle
23 except:
23 except:
24 cPickle = None
24 cPickle = None
25 import pickle
25 import pickle
26
26
27 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
27 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
28 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
28 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
29 if json_name in ('jsonlib', 'jsonlib2'):
29 if json_name in ('jsonlib', 'jsonlib2'):
30 use_json = True
30 use_json = True
31 elif json_name:
31 elif json_name:
32 if cPickle is None:
32 if cPickle is None:
33 use_json = True
33 use_json = True
34 else:
34 else:
35 use_json = False
35 use_json = False
36 else:
36 else:
37 use_json = False
37 use_json = False
38
38
39 def squash_unicode(obj):
39 def squash_unicode(obj):
40 if isinstance(obj,dict):
40 if isinstance(obj,dict):
41 for key in obj.keys():
41 for key in obj.keys():
42 obj[key] = squash_unicode(obj[key])
42 obj[key] = squash_unicode(obj[key])
43 if isinstance(key, unicode):
43 if isinstance(key, unicode):
44 obj[squash_unicode(key)] = obj.pop(key)
44 obj[squash_unicode(key)] = obj.pop(key)
45 elif isinstance(obj, list):
45 elif isinstance(obj, list):
46 for i,v in enumerate(obj):
46 for i,v in enumerate(obj):
47 obj[i] = squash_unicode(v)
47 obj[i] = squash_unicode(v)
48 elif isinstance(obj, unicode):
48 elif isinstance(obj, unicode):
49 obj = obj.encode('utf8')
49 obj = obj.encode('utf8')
50 return obj
50 return obj
51
51
52 if use_json:
52 if use_json:
53 default_packer = jsonapi.dumps
53 default_packer = jsonapi.dumps
54 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
54 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
55 else:
55 else:
56 default_packer = lambda o: pickle.dumps(o,-1)
56 default_packer = lambda o: pickle.dumps(o,-1)
57 default_unpacker = pickle.loads
57 default_unpacker = pickle.loads
58
58
59
59
60 DELIM="<IDS|MSG>"
60 DELIM="<IDS|MSG>"
61
61
62 def wrap_exception():
62 def wrap_exception():
63 etype, evalue, tb = sys.exc_info()
63 etype, evalue, tb = sys.exc_info()
64 tb = traceback.format_exception(etype, evalue, tb)
64 tb = traceback.format_exception(etype, evalue, tb)
65 exc_content = {
65 exc_content = {
66 'status' : 'error',
66 'status' : 'error',
67 'traceback' : str(tb),
67 'traceback' : str(tb),
68 'etype' : str(etype),
68 'etype' : str(etype),
69 'evalue' : str(evalue)
69 'evalue' : str(evalue)
70 }
70 }
71 return exc_content
71 return exc_content
72
72
73 class KernelError(Exception):
73 class KernelError(Exception):
74 pass
74 pass
75
75
76 def unwrap_exception(content):
76 def unwrap_exception(content):
77 err = KernelError(content['etype'], content['evalue'])
77 err = KernelError(content['etype'], content['evalue'])
78 err.evalue = content['evalue']
78 err.evalue = content['evalue']
79 err.etype = content['etype']
79 err.etype = content['etype']
80 err.traceback = ''.join(content['traceback'])
80 err.traceback = ''.join(content['traceback'])
81 return err
81 return err
82
82
83
83
84 class Message(object):
84 class Message(object):
85 """A simple message object that maps dict keys to attributes.
85 """A simple message object that maps dict keys to attributes.
86
86
87 A Message can be created from a dict and a dict from a Message instance
87 A Message can be created from a dict and a dict from a Message instance
88 simply by calling dict(msg_obj)."""
88 simply by calling dict(msg_obj)."""
89
89
90 def __init__(self, msg_dict):
90 def __init__(self, msg_dict):
91 dct = self.__dict__
91 dct = self.__dict__
92 for k, v in dict(msg_dict).iteritems():
92 for k, v in dict(msg_dict).iteritems():
93 if isinstance(v, dict):
93 if isinstance(v, dict):
94 v = Message(v)
94 v = Message(v)
95 dct[k] = v
95 dct[k] = v
96
96
97 # Having this iterator lets dict(msg_obj) work out of the box.
97 # Having this iterator lets dict(msg_obj) work out of the box.
98 def __iter__(self):
98 def __iter__(self):
99 return iter(self.__dict__.iteritems())
99 return iter(self.__dict__.iteritems())
100
100
101 def __repr__(self):
101 def __repr__(self):
102 return repr(self.__dict__)
102 return repr(self.__dict__)
103
103
104 def __str__(self):
104 def __str__(self):
105 return pprint.pformat(self.__dict__)
105 return pprint.pformat(self.__dict__)
106
106
107 def __contains__(self, k):
107 def __contains__(self, k):
108 return k in self.__dict__
108 return k in self.__dict__
109
109
110 def __getitem__(self, k):
110 def __getitem__(self, k):
111 return self.__dict__[k]
111 return self.__dict__[k]
112
112
113
113
114 def msg_header(msg_id, msg_type, username, session):
114 def msg_header(msg_id, msg_type, username, session):
115 date=datetime.now().isoformat()
115 date=datetime.now().isoformat()
116 return locals()
116 return locals()
117 # return {
117 # return {
118 # 'msg_id' : msg_id,
118 # 'msg_id' : msg_id,
119 # 'msg_type': msg_type,
119 # 'msg_type': msg_type,
120 # 'username' : username,
120 # 'username' : username,
121 # 'session' : session
121 # 'session' : session
122 # }
122 # }
123
123
124
124
125 def extract_header(msg_or_header):
125 def extract_header(msg_or_header):
126 """Given a message or header, return the header."""
126 """Given a message or header, return the header."""
127 if not msg_or_header:
127 if not msg_or_header:
128 return {}
128 return {}
129 try:
129 try:
130 # See if msg_or_header is the entire message.
130 # See if msg_or_header is the entire message.
131 h = msg_or_header['header']
131 h = msg_or_header['header']
132 except KeyError:
132 except KeyError:
133 try:
133 try:
134 # See if msg_or_header is just the header
134 # See if msg_or_header is just the header
135 h = msg_or_header['msg_id']
135 h = msg_or_header['msg_id']
136 except KeyError:
136 except KeyError:
137 raise
137 raise
138 else:
138 else:
139 h = msg_or_header
139 h = msg_or_header
140 if not isinstance(h, dict):
140 if not isinstance(h, dict):
141 h = dict(h)
141 h = dict(h)
142 return h
142 return h
143
143
144 def rekey(dikt):
144 def rekey(dikt):
145 """Rekey a dict that has been forced to use str keys where there should be
145 """Rekey a dict that has been forced to use str keys where there should be
146 ints by json. This belongs in the jsonutil added by fperez."""
146 ints by json. This belongs in the jsonutil added by fperez."""
147 for k in dikt.iterkeys():
147 for k in dikt.iterkeys():
148 if isinstance(k, str):
148 if isinstance(k, str):
149 ik=fk=None
149 ik=fk=None
150 try:
150 try:
151 ik = int(k)
151 ik = int(k)
152 except ValueError:
152 except ValueError:
153 try:
153 try:
154 fk = float(k)
154 fk = float(k)
155 except ValueError:
155 except ValueError:
156 continue
156 continue
157 if ik is not None:
157 if ik is not None:
158 nk = ik
158 nk = ik
159 else:
159 else:
160 nk = fk
160 nk = fk
161 if nk in dikt:
161 if nk in dikt:
162 raise KeyError("already have key %r"%nk)
162 raise KeyError("already have key %r"%nk)
163 dikt[nk] = dikt.pop(k)
163 dikt[nk] = dikt.pop(k)
164 return dikt
164 return dikt
165
165
166 def serialize_object(obj, threshold=64e-6):
166 def serialize_object(obj, threshold=64e-6):
167 """Serialize an object into a list of sendable buffers.
167 """Serialize an object into a list of sendable buffers.
168
168
169 Parameters
169 Parameters
170 ----------
170 ----------
171
171
172 obj : object
172 obj : object
173 The object to be serialized
173 The object to be serialized
174 threshold : float
174 threshold : float
175 The threshold for not double-pickling the content.
175 The threshold for not double-pickling the content.
176
176
177
177
178 Returns
178 Returns
179 -------
179 -------
180 ('pmd', [bufs]) :
180 ('pmd', [bufs]) :
181 where pmd is the pickled metadata wrapper,
181 where pmd is the pickled metadata wrapper,
182 bufs is a list of data buffers"""
182 bufs is a list of data buffers"""
183 # threshold is 100 B
183 # threshold is 100 B
184 databuffers = []
184 databuffers = []
185 if isinstance(obj, (list, tuple)):
185 if isinstance(obj, (list, tuple)):
186 clist = canSequence(obj)
186 clist = canSequence(obj)
187 slist = map(serialize, clist)
187 slist = map(serialize, clist)
188 for s in slist:
188 for s in slist:
189 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
189 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
190 databuffers.append(s.getData())
190 databuffers.append(s.getData())
191 s.data = None
191 s.data = None
192 return pickle.dumps(slist,-1), databuffers
192 return pickle.dumps(slist,-1), databuffers
193 elif isinstance(obj, dict):
193 elif isinstance(obj, dict):
194 sobj = {}
194 sobj = {}
195 for k in sorted(obj.iterkeys()):
195 for k in sorted(obj.iterkeys()):
196 s = serialize(can(obj[k]))
196 s = serialize(can(obj[k]))
197 if s.getDataSize() > threshold:
197 if s.getDataSize() > threshold:
198 databuffers.append(s.getData())
198 databuffers.append(s.getData())
199 s.data = None
199 s.data = None
200 sobj[k] = s
200 sobj[k] = s
201 return pickle.dumps(sobj,-1),databuffers
201 return pickle.dumps(sobj,-1),databuffers
202 else:
202 else:
203 s = serialize(can(obj))
203 s = serialize(can(obj))
204 if s.getDataSize() > threshold:
204 if s.getDataSize() > threshold:
205 databuffers.append(s.getData())
205 databuffers.append(s.getData())
206 s.data = None
206 s.data = None
207 return pickle.dumps(s,-1),databuffers
207 return pickle.dumps(s,-1),databuffers
208
208
209
209
210 def unserialize_object(bufs):
210 def unserialize_object(bufs):
211 """reconstruct an object serialized by serialize_object from data buffers"""
211 """reconstruct an object serialized by serialize_object from data buffers"""
212 bufs = list(bufs)
212 bufs = list(bufs)
213 sobj = pickle.loads(bufs.pop(0))
213 sobj = pickle.loads(bufs.pop(0))
214 if isinstance(sobj, (list, tuple)):
214 if isinstance(sobj, (list, tuple)):
215 for s in sobj:
215 for s in sobj:
216 if s.data is None:
216 if s.data is None:
217 s.data = bufs.pop(0)
217 s.data = bufs.pop(0)
218 return uncanSequence(map(unserialize, sobj))
218 return uncanSequence(map(unserialize, sobj))
219 elif isinstance(sobj, dict):
219 elif isinstance(sobj, dict):
220 newobj = {}
220 newobj = {}
221 for k in sorted(sobj.iterkeys()):
221 for k in sorted(sobj.iterkeys()):
222 s = sobj[k]
222 s = sobj[k]
223 if s.data is None:
223 if s.data is None:
224 s.data = bufs.pop(0)
224 s.data = bufs.pop(0)
225 newobj[k] = uncan(unserialize(s))
225 newobj[k] = uncan(unserialize(s))
226 return newobj
226 return newobj
227 else:
227 else:
228 if sobj.data is None:
228 if sobj.data is None:
229 sobj.data = bufs.pop(0)
229 sobj.data = bufs.pop(0)
230 return uncan(unserialize(sobj))
230 return uncan(unserialize(sobj))
231
231
232 def pack_apply_message(f, args, kwargs, threshold=64e-6):
232 def pack_apply_message(f, args, kwargs, threshold=64e-6):
233 """pack up a function, args, and kwargs to be sent over the wire
233 """pack up a function, args, and kwargs to be sent over the wire
234 as a series of buffers. Any object whose data is larger than `threshold`
234 as a series of buffers. Any object whose data is larger than `threshold`
235 will not have their data copied (currently only numpy arrays support zero-copy)"""
235 will not have their data copied (currently only numpy arrays support zero-copy)"""
236 msg = [pickle.dumps(can(f),-1)]
236 msg = [pickle.dumps(can(f),-1)]
237 databuffers = [] # for large objects
237 databuffers = [] # for large objects
238 sargs, bufs = serialize_object(args,threshold)
238 sargs, bufs = serialize_object(args,threshold)
239 msg.append(sargs)
239 msg.append(sargs)
240 databuffers.extend(bufs)
240 databuffers.extend(bufs)
241 skwargs, bufs = serialize_object(kwargs,threshold)
241 skwargs, bufs = serialize_object(kwargs,threshold)
242 msg.append(skwargs)
242 msg.append(skwargs)
243 databuffers.extend(bufs)
243 databuffers.extend(bufs)
244 msg.extend(databuffers)
244 msg.extend(databuffers)
245 return msg
245 return msg
246
246
247 def unpack_apply_message(bufs, g=None, copy=True):
247 def unpack_apply_message(bufs, g=None, copy=True):
248 """unpack f,args,kwargs from buffers packed by pack_apply_message()
248 """unpack f,args,kwargs from buffers packed by pack_apply_message()
249 Returns: original f,args,kwargs"""
249 Returns: original f,args,kwargs"""
250 bufs = list(bufs) # allow us to pop
250 bufs = list(bufs) # allow us to pop
251 assert len(bufs) >= 3, "not enough buffers!"
251 assert len(bufs) >= 3, "not enough buffers!"
252 if not copy:
252 if not copy:
253 for i in range(3):
253 for i in range(3):
254 bufs[i] = bufs[i].bytes
254 bufs[i] = bufs[i].bytes
255 cf = pickle.loads(bufs.pop(0))
255 cf = pickle.loads(bufs.pop(0))
256 sargs = list(pickle.loads(bufs.pop(0)))
256 sargs = list(pickle.loads(bufs.pop(0)))
257 skwargs = dict(pickle.loads(bufs.pop(0)))
257 skwargs = dict(pickle.loads(bufs.pop(0)))
258 # print sargs, skwargs
258 # print sargs, skwargs
259 f = uncan(cf, g)
259 f = uncan(cf, g)
260 for sa in sargs:
260 for sa in sargs:
261 if sa.data is None:
261 if sa.data is None:
262 m = bufs.pop(0)
262 m = bufs.pop(0)
263 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
263 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
264 if copy:
264 if copy:
265 sa.data = buffer(m)
265 sa.data = buffer(m)
266 else:
266 else:
267 sa.data = m.buffer
267 sa.data = m.buffer
268 else:
268 else:
269 if copy:
269 if copy:
270 sa.data = m
270 sa.data = m
271 else:
271 else:
272 sa.data = m.bytes
272 sa.data = m.bytes
273
273
274 args = uncanSequence(map(unserialize, sargs), g)
274 args = uncanSequence(map(unserialize, sargs), g)
275 kwargs = {}
275 kwargs = {}
276 for k in sorted(skwargs.iterkeys()):
276 for k in sorted(skwargs.iterkeys()):
277 sa = skwargs[k]
277 sa = skwargs[k]
278 if sa.data is None:
278 if sa.data is None:
279 sa.data = bufs.pop(0)
279 sa.data = bufs.pop(0)
280 kwargs[k] = uncan(unserialize(sa), g)
280 kwargs[k] = uncan(unserialize(sa), g)
281
281
282 return f,args,kwargs
282 return f,args,kwargs
283
283
284 class StreamSession(object):
284 class StreamSession(object):
285 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
285 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
286 debug=False
286 debug=False
287 def __init__(self, username=None, session=None, packer=None, unpacker=None):
287 def __init__(self, username=None, session=None, packer=None, unpacker=None):
288 if username is None:
288 if username is None:
289 username = os.environ.get('USER','username')
289 username = os.environ.get('USER','username')
290 self.username = username
290 self.username = username
291 if session is None:
291 if session is None:
292 self.session = str(uuid.uuid4())
292 self.session = str(uuid.uuid4())
293 else:
293 else:
294 self.session = session
294 self.session = session
295 self.msg_id = str(uuid.uuid4())
295 self.msg_id = str(uuid.uuid4())
296 if packer is None:
296 if packer is None:
297 self.pack = default_packer
297 self.pack = default_packer
298 else:
298 else:
299 if not callable(packer):
299 if not callable(packer):
300 raise TypeError("packer must be callable, not %s"%type(packer))
300 raise TypeError("packer must be callable, not %s"%type(packer))
301 self.pack = packer
301 self.pack = packer
302
302
303 if unpacker is None:
303 if unpacker is None:
304 self.unpack = default_unpacker
304 self.unpack = default_unpacker
305 else:
305 else:
306 if not callable(unpacker):
306 if not callable(unpacker):
307 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
307 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
308 self.unpack = unpacker
308 self.unpack = unpacker
309
309
310 self.none = self.pack({})
310 self.none = self.pack({})
311
311
312 def msg_header(self, msg_type):
312 def msg_header(self, msg_type):
313 h = msg_header(self.msg_id, msg_type, self.username, self.session)
313 h = msg_header(self.msg_id, msg_type, self.username, self.session)
314 self.msg_id = str(uuid.uuid4())
314 self.msg_id = str(uuid.uuid4())
315 return h
315 return h
316
316
317 def msg(self, msg_type, content=None, parent=None, subheader=None):
317 def msg(self, msg_type, content=None, parent=None, subheader=None):
318 msg = {}
318 msg = {}
319 msg['header'] = self.msg_header(msg_type)
319 msg['header'] = self.msg_header(msg_type)
320 msg['msg_id'] = msg['header']['msg_id']
320 msg['msg_id'] = msg['header']['msg_id']
321 msg['parent_header'] = {} if parent is None else extract_header(parent)
321 msg['parent_header'] = {} if parent is None else extract_header(parent)
322 msg['msg_type'] = msg_type
322 msg['msg_type'] = msg_type
323 msg['content'] = {} if content is None else content
323 msg['content'] = {} if content is None else content
324 sub = {} if subheader is None else subheader
324 sub = {} if subheader is None else subheader
325 msg['header'].update(sub)
325 msg['header'].update(sub)
326 return msg
326 return msg
327
327
328 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
328 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
329 """Build and send a message via stream or socket.
329 """Build and send a message via stream or socket.
330
330
331 Parameters
331 Parameters
332 ----------
332 ----------
333
333
334 stream : zmq.Socket or ZMQStream
334 stream : zmq.Socket or ZMQStream
335 the socket-like object used to send the data
335 the socket-like object used to send the data
336 msg_type : str or Message/dict
336 msg_type : str or Message/dict
337 Normally, msg_type will be
337 Normally, msg_type will be
338
338
339
339
340
340
341 Returns
341 Returns
342 -------
342 -------
343 (msg,sent) : tuple
343 (msg,sent) : tuple
344 msg : Message
344 msg : Message
345 the nice wrapped dict-like object containing the headers
345 the nice wrapped dict-like object containing the headers
346
346
347 """
347 """
348 if isinstance(msg_type, (Message, dict)):
348 if isinstance(msg_type, (Message, dict)):
349 # we got a Message, not a msg_type
349 # we got a Message, not a msg_type
350 # don't build a new Message
350 # don't build a new Message
351 msg = msg_type
351 msg = msg_type
352 content = msg['content']
352 content = msg['content']
353 else:
353 else:
354 msg = self.msg(msg_type, content, parent, subheader)
354 msg = self.msg(msg_type, content, parent, subheader)
355 buffers = [] if buffers is None else buffers
355 buffers = [] if buffers is None else buffers
356 to_send = []
356 to_send = []
357 if isinstance(ident, list):
357 if isinstance(ident, list):
358 # accept list of idents
358 # accept list of idents
359 to_send.extend(ident)
359 to_send.extend(ident)
360 elif ident is not None:
360 elif ident is not None:
361 to_send.append(ident)
361 to_send.append(ident)
362 to_send.append(DELIM)
362 to_send.append(DELIM)
363 to_send.append(self.pack(msg['header']))
363 to_send.append(self.pack(msg['header']))
364 to_send.append(self.pack(msg['parent_header']))
364 to_send.append(self.pack(msg['parent_header']))
365
365
366 if content is None:
366 if content is None:
367 content = self.none
367 content = self.none
368 elif isinstance(content, dict):
368 elif isinstance(content, dict):
369 content = self.pack(content)
369 content = self.pack(content)
370 elif isinstance(content, str):
370 elif isinstance(content, str):
371 # content is already packed, as in a relayed message
371 # content is already packed, as in a relayed message
372 pass
372 pass
373 else:
373 else:
374 raise TypeError("Content incorrect type: %s"%type(content))
374 raise TypeError("Content incorrect type: %s"%type(content))
375 to_send.append(content)
375 to_send.append(content)
376 flag = 0
376 flag = 0
377 if buffers:
377 if buffers:
378 flag = zmq.SNDMORE
378 flag = zmq.SNDMORE
379 stream.send_multipart(to_send, flag, copy=False)
379 stream.send_multipart(to_send, flag, copy=False)
380 for b in buffers[:-1]:
380 for b in buffers[:-1]:
381 stream.send(b, flag, copy=False)
381 stream.send(b, flag, copy=False)
382 if buffers:
382 if buffers:
383 stream.send(buffers[-1], copy=False)
383 stream.send(buffers[-1], copy=False)
384 omsg = Message(msg)
384 omsg = Message(msg)
385 if self.debug:
385 if self.debug:
386 pprint.pprint(omsg)
386 pprint.pprint(omsg)
387 pprint.pprint(to_send)
387 pprint.pprint(to_send)
388 pprint.pprint(buffers)
388 pprint.pprint(buffers)
389 return omsg
389 return omsg
390
390
391 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
391 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
392 """Send a raw message via idents.
392 """Send a raw message via idents.
393
393
394 Parameters
394 Parameters
395 ----------
395 ----------
396 msg : list of sendable buffers"""
396 msg : list of sendable buffers"""
397 to_send = []
397 to_send = []
398 if isinstance(ident, str):
398 if isinstance(ident, str):
399 ident = [ident]
399 ident = [ident]
400 if ident is not None:
400 if ident is not None:
401 to_send.extend(ident)
401 to_send.extend(ident)
402 to_send.append(DELIM)
402 to_send.append(DELIM)
403 to_send.extend(msg)
403 to_send.extend(msg)
404 stream.send_multipart(msg, flags, copy=copy)
404 stream.send_multipart(msg, flags, copy=copy)
405
405
406 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
406 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
407 """receives and unpacks a message
407 """receives and unpacks a message
408 returns [idents], msg"""
408 returns [idents], msg"""
409 if isinstance(socket, ZMQStream):
409 if isinstance(socket, ZMQStream):
410 socket = socket.socket
410 socket = socket.socket
411 try:
411 try:
412 msg = socket.recv_multipart(mode)
412 msg = socket.recv_multipart(mode)
413 except zmq.ZMQError as e:
413 except zmq.ZMQError as e:
414 if e.errno == zmq.EAGAIN:
414 if e.errno == zmq.EAGAIN:
415 # We can convert EAGAIN to None as we know in this case
415 # We can convert EAGAIN to None as we know in this case
416 # recv_json won't return None.
416 # recv_json won't return None.
417 return None
417 return None
418 else:
418 else:
419 raise
419 raise
420 # return an actual Message object
420 # return an actual Message object
421 # determine the number of idents by trying to unpack them.
421 # determine the number of idents by trying to unpack them.
422 # this is terrible:
422 # this is terrible:
423 idents, msg = self.feed_identities(msg, copy)
423 idents, msg = self.feed_identities(msg, copy)
424 try:
424 try:
425 return idents, self.unpack_message(msg, content=content, copy=copy)
425 return idents, self.unpack_message(msg, content=content, copy=copy)
426 except Exception as e:
426 except Exception as e:
427 print (idents, msg)
427 print (idents, msg)
428 # TODO: handle it
428 # TODO: handle it
429 raise e
429 raise e
430
430
431 def feed_identities(self, msg, copy=True):
431 def feed_identities(self, msg, copy=True):
432 """This is a completely horrible thing, but it strips the zmq
432 """This is a completely horrible thing, but it strips the zmq
433 ident prefixes off of a message. It will break if any identities
433 ident prefixes off of a message. It will break if any identities
434 are unpackable by self.unpack."""
434 are unpackable by self.unpack."""
435 msg = list(msg)
435 msg = list(msg)
436 idents = []
436 idents = []
437 while len(msg) > 3:
437 while len(msg) > 3:
438 if copy:
438 if copy:
439 s = msg[0]
439 s = msg[0]
440 else:
440 else:
441 s = msg[0].bytes
441 s = msg[0].bytes
442 if s == DELIM:
442 if s == DELIM:
443 msg.pop(0)
443 msg.pop(0)
444 break
444 break
445 else:
445 else:
446 idents.append(s)
446 idents.append(s)
447 msg.pop(0)
447 msg.pop(0)
448
448
449 return idents, msg
449 return idents, msg
450
450
451 def unpack_message(self, msg, content=True, copy=True):
451 def unpack_message(self, msg, content=True, copy=True):
452 """Return a message object from the format
452 """Return a message object from the format
453 sent by self.send.
453 sent by self.send.
454
454
455 Parameters:
455 Parameters:
456 -----------
456 -----------
457
457
458 content : bool (True)
458 content : bool (True)
459 whether to unpack the content dict (True),
459 whether to unpack the content dict (True),
460 or leave it serialized (False)
460 or leave it serialized (False)
461
461
462 copy : bool (True)
462 copy : bool (True)
463 whether to return the bytes (True),
463 whether to return the bytes (True),
464 or the non-copying Message object in each place (False)
464 or the non-copying Message object in each place (False)
465
465
466 """
466 """
467 if not len(msg) >= 3:
467 if not len(msg) >= 3:
468 raise TypeError("malformed message, must have at least 3 elements")
468 raise TypeError("malformed message, must have at least 3 elements")
469 message = {}
469 message = {}
470 if not copy:
470 if not copy:
471 for i in range(3):
471 for i in range(3):
472 msg[i] = msg[i].bytes
472 msg[i] = msg[i].bytes
473 message['header'] = self.unpack(msg[0])
473 message['header'] = self.unpack(msg[0])
474 message['msg_type'] = message['header']['msg_type']
474 message['msg_type'] = message['header']['msg_type']
475 message['parent_header'] = self.unpack(msg[1])
475 message['parent_header'] = self.unpack(msg[1])
476 if content:
476 if content:
477 message['content'] = self.unpack(msg[2])
477 message['content'] = self.unpack(msg[2])
478 else:
478 else:
479 message['content'] = msg[2]
479 message['content'] = msg[2]
480
480
481 # message['buffers'] = msg[3:]
481 # message['buffers'] = msg[3:]
482 # else:
482 # else:
483 # message['header'] = self.unpack(msg[0].bytes)
483 # message['header'] = self.unpack(msg[0].bytes)
484 # message['msg_type'] = message['header']['msg_type']
484 # message['msg_type'] = message['header']['msg_type']
485 # message['parent_header'] = self.unpack(msg[1].bytes)
485 # message['parent_header'] = self.unpack(msg[1].bytes)
486 # if content:
486 # if content:
487 # message['content'] = self.unpack(msg[2].bytes)
487 # message['content'] = self.unpack(msg[2].bytes)
488 # else:
488 # else:
489 # message['content'] = msg[2].bytes
489 # message['content'] = msg[2].bytes
490
490
491 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
491 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
492 return message
492 return message
493
493
494
494
495
495
496 def test_msg2obj():
496 def test_msg2obj():
497 am = dict(x=1)
497 am = dict(x=1)
498 ao = Message(am)
498 ao = Message(am)
499 assert ao.x == am['x']
499 assert ao.x == am['x']
500
500
501 am['y'] = dict(z=1)
501 am['y'] = dict(z=1)
502 ao = Message(am)
502 ao = Message(am)
503 assert ao.y.z == am['y']['z']
503 assert ao.y.z == am['y']['z']
504
504
505 k1, k2 = 'y', 'z'
505 k1, k2 = 'y', 'z'
506 assert ao[k1][k2] == am[k1][k2]
506 assert ao[k1][k2] == am[k1][k2]
507
507
508 am2 = dict(ao)
508 am2 = dict(ao)
509 assert am['x'] == am2['x']
509 assert am['x'] == am2['x']
510 assert am['y']['z'] == am2['y']['z']
510 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now