##// END OF EJS Templates
str(etype)
MinRK -
Show More
@@ -1,531 +1,531 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4
4
5
5
6 import os
6 import os
7 import sys
7 import sys
8 import traceback
8 import traceback
9 import pprint
9 import pprint
10 import uuid
10 import uuid
11 from datetime import datetime
11 from datetime import datetime
12
12
13 import zmq
13 import zmq
14 from zmq.utils import jsonapi
14 from zmq.utils import jsonapi
15 from zmq.eventloop.zmqstream import ZMQStream
15 from zmq.eventloop.zmqstream import ZMQStream
16
16
17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
18 from IPython.utils.newserialized import serialize, unserialize
18 from IPython.utils.newserialized import serialize, unserialize
19
19
20 try:
20 try:
21 import cPickle
21 import cPickle
22 pickle = cPickle
22 pickle = cPickle
23 except:
23 except:
24 cPickle = None
24 cPickle = None
25 import pickle
25 import pickle
26
26
27 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
27 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
28 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
28 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
29 if json_name in ('jsonlib', 'jsonlib2'):
29 if json_name in ('jsonlib', 'jsonlib2'):
30 use_json = True
30 use_json = True
31 elif json_name:
31 elif json_name:
32 if cPickle is None:
32 if cPickle is None:
33 use_json = True
33 use_json = True
34 else:
34 else:
35 use_json = False
35 use_json = False
36 else:
36 else:
37 use_json = False
37 use_json = False
38
38
39 def squash_unicode(obj):
39 def squash_unicode(obj):
40 if isinstance(obj,dict):
40 if isinstance(obj,dict):
41 for key in obj.keys():
41 for key in obj.keys():
42 obj[key] = squash_unicode(obj[key])
42 obj[key] = squash_unicode(obj[key])
43 if isinstance(key, unicode):
43 if isinstance(key, unicode):
44 obj[squash_unicode(key)] = obj.pop(key)
44 obj[squash_unicode(key)] = obj.pop(key)
45 elif isinstance(obj, list):
45 elif isinstance(obj, list):
46 for i,v in enumerate(obj):
46 for i,v in enumerate(obj):
47 obj[i] = squash_unicode(v)
47 obj[i] = squash_unicode(v)
48 elif isinstance(obj, unicode):
48 elif isinstance(obj, unicode):
49 obj = obj.encode('utf8')
49 obj = obj.encode('utf8')
50 return obj
50 return obj
51
51
52 if use_json:
52 if use_json:
53 default_packer = jsonapi.dumps
53 default_packer = jsonapi.dumps
54 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
54 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
55 else:
55 else:
56 default_packer = lambda o: pickle.dumps(o,-1)
56 default_packer = lambda o: pickle.dumps(o,-1)
57 default_unpacker = pickle.loads
57 default_unpacker = pickle.loads
58
58
59
59
60 DELIM="<IDS|MSG>"
60 DELIM="<IDS|MSG>"
61 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
61 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
62
62
63 def wrap_exception():
63 def wrap_exception():
64 etype, evalue, tb = sys.exc_info()
64 etype, evalue, tb = sys.exc_info()
65 tb = traceback.format_exception(etype, evalue, tb)
65 tb = traceback.format_exception(etype, evalue, tb)
66 exc_content = {
66 exc_content = {
67 'status' : 'error',
67 'status' : 'error',
68 'traceback' : [ line.encode('utf8') for line in tb ],
68 'traceback' : [ line.encode('utf8') for line in tb ],
69 'etype' : etype.encode('utf8'),
69 'etype' : str(etype).encode('utf8'),
70 'evalue' : evalue.encode('utf8')
70 'evalue' : evalue.encode('utf8')
71 }
71 }
72 return exc_content
72 return exc_content
73
73
74 class KernelError(Exception):
74 class KernelError(Exception):
75 pass
75 pass
76
76
77 def unwrap_exception(content):
77 def unwrap_exception(content):
78 err = KernelError(content['etype'], content['evalue'])
78 err = KernelError(content['etype'], content['evalue'])
79 err.evalue = content['evalue']
79 err.evalue = content['evalue']
80 err.etype = content['etype']
80 err.etype = content['etype']
81 err.traceback = ''.join(content['traceback'])
81 err.traceback = ''.join(content['traceback'])
82 return err
82 return err
83
83
84
84
85 class Message(object):
85 class Message(object):
86 """A simple message object that maps dict keys to attributes.
86 """A simple message object that maps dict keys to attributes.
87
87
88 A Message can be created from a dict and a dict from a Message instance
88 A Message can be created from a dict and a dict from a Message instance
89 simply by calling dict(msg_obj)."""
89 simply by calling dict(msg_obj)."""
90
90
91 def __init__(self, msg_dict):
91 def __init__(self, msg_dict):
92 dct = self.__dict__
92 dct = self.__dict__
93 for k, v in dict(msg_dict).iteritems():
93 for k, v in dict(msg_dict).iteritems():
94 if isinstance(v, dict):
94 if isinstance(v, dict):
95 v = Message(v)
95 v = Message(v)
96 dct[k] = v
96 dct[k] = v
97
97
98 # Having this iterator lets dict(msg_obj) work out of the box.
98 # Having this iterator lets dict(msg_obj) work out of the box.
99 def __iter__(self):
99 def __iter__(self):
100 return iter(self.__dict__.iteritems())
100 return iter(self.__dict__.iteritems())
101
101
102 def __repr__(self):
102 def __repr__(self):
103 return repr(self.__dict__)
103 return repr(self.__dict__)
104
104
105 def __str__(self):
105 def __str__(self):
106 return pprint.pformat(self.__dict__)
106 return pprint.pformat(self.__dict__)
107
107
108 def __contains__(self, k):
108 def __contains__(self, k):
109 return k in self.__dict__
109 return k in self.__dict__
110
110
111 def __getitem__(self, k):
111 def __getitem__(self, k):
112 return self.__dict__[k]
112 return self.__dict__[k]
113
113
114
114
115 def msg_header(msg_id, msg_type, username, session):
115 def msg_header(msg_id, msg_type, username, session):
116 date=datetime.now().strftime(ISO8601)
116 date=datetime.now().strftime(ISO8601)
117 return locals()
117 return locals()
118
118
119 def extract_header(msg_or_header):
119 def extract_header(msg_or_header):
120 """Given a message or header, return the header."""
120 """Given a message or header, return the header."""
121 if not msg_or_header:
121 if not msg_or_header:
122 return {}
122 return {}
123 try:
123 try:
124 # See if msg_or_header is the entire message.
124 # See if msg_or_header is the entire message.
125 h = msg_or_header['header']
125 h = msg_or_header['header']
126 except KeyError:
126 except KeyError:
127 try:
127 try:
128 # See if msg_or_header is just the header
128 # See if msg_or_header is just the header
129 h = msg_or_header['msg_id']
129 h = msg_or_header['msg_id']
130 except KeyError:
130 except KeyError:
131 raise
131 raise
132 else:
132 else:
133 h = msg_or_header
133 h = msg_or_header
134 if not isinstance(h, dict):
134 if not isinstance(h, dict):
135 h = dict(h)
135 h = dict(h)
136 return h
136 return h
137
137
138 def rekey(dikt):
138 def rekey(dikt):
139 """Rekey a dict that has been forced to use str keys where there should be
139 """Rekey a dict that has been forced to use str keys where there should be
140 ints by json. This belongs in the jsonutil added by fperez."""
140 ints by json. This belongs in the jsonutil added by fperez."""
141 for k in dikt.iterkeys():
141 for k in dikt.iterkeys():
142 if isinstance(k, str):
142 if isinstance(k, str):
143 ik=fk=None
143 ik=fk=None
144 try:
144 try:
145 ik = int(k)
145 ik = int(k)
146 except ValueError:
146 except ValueError:
147 try:
147 try:
148 fk = float(k)
148 fk = float(k)
149 except ValueError:
149 except ValueError:
150 continue
150 continue
151 if ik is not None:
151 if ik is not None:
152 nk = ik
152 nk = ik
153 else:
153 else:
154 nk = fk
154 nk = fk
155 if nk in dikt:
155 if nk in dikt:
156 raise KeyError("already have key %r"%nk)
156 raise KeyError("already have key %r"%nk)
157 dikt[nk] = dikt.pop(k)
157 dikt[nk] = dikt.pop(k)
158 return dikt
158 return dikt
159
159
160 def serialize_object(obj, threshold=64e-6):
160 def serialize_object(obj, threshold=64e-6):
161 """Serialize an object into a list of sendable buffers.
161 """Serialize an object into a list of sendable buffers.
162
162
163 Parameters
163 Parameters
164 ----------
164 ----------
165
165
166 obj : object
166 obj : object
167 The object to be serialized
167 The object to be serialized
168 threshold : float
168 threshold : float
169 The threshold for not double-pickling the content.
169 The threshold for not double-pickling the content.
170
170
171
171
172 Returns
172 Returns
173 -------
173 -------
174 ('pmd', [bufs]) :
174 ('pmd', [bufs]) :
175 where pmd is the pickled metadata wrapper,
175 where pmd is the pickled metadata wrapper,
176 bufs is a list of data buffers
176 bufs is a list of data buffers
177 """
177 """
178 databuffers = []
178 databuffers = []
179 if isinstance(obj, (list, tuple)):
179 if isinstance(obj, (list, tuple)):
180 clist = canSequence(obj)
180 clist = canSequence(obj)
181 slist = map(serialize, clist)
181 slist = map(serialize, clist)
182 for s in slist:
182 for s in slist:
183 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
183 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
184 databuffers.append(s.getData())
184 databuffers.append(s.getData())
185 s.data = None
185 s.data = None
186 return pickle.dumps(slist,-1), databuffers
186 return pickle.dumps(slist,-1), databuffers
187 elif isinstance(obj, dict):
187 elif isinstance(obj, dict):
188 sobj = {}
188 sobj = {}
189 for k in sorted(obj.iterkeys()):
189 for k in sorted(obj.iterkeys()):
190 s = serialize(can(obj[k]))
190 s = serialize(can(obj[k]))
191 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
191 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
192 databuffers.append(s.getData())
192 databuffers.append(s.getData())
193 s.data = None
193 s.data = None
194 sobj[k] = s
194 sobj[k] = s
195 return pickle.dumps(sobj,-1),databuffers
195 return pickle.dumps(sobj,-1),databuffers
196 else:
196 else:
197 s = serialize(can(obj))
197 s = serialize(can(obj))
198 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
198 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
199 databuffers.append(s.getData())
199 databuffers.append(s.getData())
200 s.data = None
200 s.data = None
201 return pickle.dumps(s,-1),databuffers
201 return pickle.dumps(s,-1),databuffers
202
202
203
203
204 def unserialize_object(bufs):
204 def unserialize_object(bufs):
205 """reconstruct an object serialized by serialize_object from data buffers"""
205 """reconstruct an object serialized by serialize_object from data buffers"""
206 bufs = list(bufs)
206 bufs = list(bufs)
207 sobj = pickle.loads(bufs.pop(0))
207 sobj = pickle.loads(bufs.pop(0))
208 if isinstance(sobj, (list, tuple)):
208 if isinstance(sobj, (list, tuple)):
209 for s in sobj:
209 for s in sobj:
210 if s.data is None:
210 if s.data is None:
211 s.data = bufs.pop(0)
211 s.data = bufs.pop(0)
212 return uncanSequence(map(unserialize, sobj))
212 return uncanSequence(map(unserialize, sobj))
213 elif isinstance(sobj, dict):
213 elif isinstance(sobj, dict):
214 newobj = {}
214 newobj = {}
215 for k in sorted(sobj.iterkeys()):
215 for k in sorted(sobj.iterkeys()):
216 s = sobj[k]
216 s = sobj[k]
217 if s.data is None:
217 if s.data is None:
218 s.data = bufs.pop(0)
218 s.data = bufs.pop(0)
219 newobj[k] = uncan(unserialize(s))
219 newobj[k] = uncan(unserialize(s))
220 return newobj
220 return newobj
221 else:
221 else:
222 if sobj.data is None:
222 if sobj.data is None:
223 sobj.data = bufs.pop(0)
223 sobj.data = bufs.pop(0)
224 return uncan(unserialize(sobj))
224 return uncan(unserialize(sobj))
225
225
226 def pack_apply_message(f, args, kwargs, threshold=64e-6):
226 def pack_apply_message(f, args, kwargs, threshold=64e-6):
227 """pack up a function, args, and kwargs to be sent over the wire
227 """pack up a function, args, and kwargs to be sent over the wire
228 as a series of buffers. Any object whose data is larger than `threshold`
228 as a series of buffers. Any object whose data is larger than `threshold`
229 will not have their data copied (currently only numpy arrays support zero-copy)"""
229 will not have their data copied (currently only numpy arrays support zero-copy)"""
230 msg = [pickle.dumps(can(f),-1)]
230 msg = [pickle.dumps(can(f),-1)]
231 databuffers = [] # for large objects
231 databuffers = [] # for large objects
232 sargs, bufs = serialize_object(args,threshold)
232 sargs, bufs = serialize_object(args,threshold)
233 msg.append(sargs)
233 msg.append(sargs)
234 databuffers.extend(bufs)
234 databuffers.extend(bufs)
235 skwargs, bufs = serialize_object(kwargs,threshold)
235 skwargs, bufs = serialize_object(kwargs,threshold)
236 msg.append(skwargs)
236 msg.append(skwargs)
237 databuffers.extend(bufs)
237 databuffers.extend(bufs)
238 msg.extend(databuffers)
238 msg.extend(databuffers)
239 return msg
239 return msg
240
240
241 def unpack_apply_message(bufs, g=None, copy=True):
241 def unpack_apply_message(bufs, g=None, copy=True):
242 """unpack f,args,kwargs from buffers packed by pack_apply_message()
242 """unpack f,args,kwargs from buffers packed by pack_apply_message()
243 Returns: original f,args,kwargs"""
243 Returns: original f,args,kwargs"""
244 bufs = list(bufs) # allow us to pop
244 bufs = list(bufs) # allow us to pop
245 assert len(bufs) >= 3, "not enough buffers!"
245 assert len(bufs) >= 3, "not enough buffers!"
246 if not copy:
246 if not copy:
247 for i in range(3):
247 for i in range(3):
248 bufs[i] = bufs[i].bytes
248 bufs[i] = bufs[i].bytes
249 cf = pickle.loads(bufs.pop(0))
249 cf = pickle.loads(bufs.pop(0))
250 sargs = list(pickle.loads(bufs.pop(0)))
250 sargs = list(pickle.loads(bufs.pop(0)))
251 skwargs = dict(pickle.loads(bufs.pop(0)))
251 skwargs = dict(pickle.loads(bufs.pop(0)))
252 # print sargs, skwargs
252 # print sargs, skwargs
253 f = uncan(cf, g)
253 f = uncan(cf, g)
254 for sa in sargs:
254 for sa in sargs:
255 if sa.data is None:
255 if sa.data is None:
256 m = bufs.pop(0)
256 m = bufs.pop(0)
257 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
257 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
258 if copy:
258 if copy:
259 sa.data = buffer(m)
259 sa.data = buffer(m)
260 else:
260 else:
261 sa.data = m.buffer
261 sa.data = m.buffer
262 else:
262 else:
263 if copy:
263 if copy:
264 sa.data = m
264 sa.data = m
265 else:
265 else:
266 sa.data = m.bytes
266 sa.data = m.bytes
267
267
268 args = uncanSequence(map(unserialize, sargs), g)
268 args = uncanSequence(map(unserialize, sargs), g)
269 kwargs = {}
269 kwargs = {}
270 for k in sorted(skwargs.iterkeys()):
270 for k in sorted(skwargs.iterkeys()):
271 sa = skwargs[k]
271 sa = skwargs[k]
272 if sa.data is None:
272 if sa.data is None:
273 sa.data = bufs.pop(0)
273 sa.data = bufs.pop(0)
274 kwargs[k] = uncan(unserialize(sa), g)
274 kwargs[k] = uncan(unserialize(sa), g)
275
275
276 return f,args,kwargs
276 return f,args,kwargs
277
277
278 class StreamSession(object):
278 class StreamSession(object):
279 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
279 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
280 debug=False
280 debug=False
281 key=None
281 key=None
282
282
283 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
283 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
284 if username is None:
284 if username is None:
285 username = os.environ.get('USER','username')
285 username = os.environ.get('USER','username')
286 self.username = username
286 self.username = username
287 if session is None:
287 if session is None:
288 self.session = str(uuid.uuid4())
288 self.session = str(uuid.uuid4())
289 else:
289 else:
290 self.session = session
290 self.session = session
291 self.msg_id = str(uuid.uuid4())
291 self.msg_id = str(uuid.uuid4())
292 if packer is None:
292 if packer is None:
293 self.pack = default_packer
293 self.pack = default_packer
294 else:
294 else:
295 if not callable(packer):
295 if not callable(packer):
296 raise TypeError("packer must be callable, not %s"%type(packer))
296 raise TypeError("packer must be callable, not %s"%type(packer))
297 self.pack = packer
297 self.pack = packer
298
298
299 if unpacker is None:
299 if unpacker is None:
300 self.unpack = default_unpacker
300 self.unpack = default_unpacker
301 else:
301 else:
302 if not callable(unpacker):
302 if not callable(unpacker):
303 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
303 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
304 self.unpack = unpacker
304 self.unpack = unpacker
305
305
306 if key is not None and keyfile is not None:
306 if key is not None and keyfile is not None:
307 raise TypeError("Must specify key OR keyfile, not both")
307 raise TypeError("Must specify key OR keyfile, not both")
308 if keyfile is not None:
308 if keyfile is not None:
309 with open(keyfile) as f:
309 with open(keyfile) as f:
310 self.key = f.read().strip()
310 self.key = f.read().strip()
311 else:
311 else:
312 self.key = key
312 self.key = key
313 # print key, keyfile, self.key
313 # print key, keyfile, self.key
314 self.none = self.pack({})
314 self.none = self.pack({})
315
315
316 def msg_header(self, msg_type):
316 def msg_header(self, msg_type):
317 h = msg_header(self.msg_id, msg_type, self.username, self.session)
317 h = msg_header(self.msg_id, msg_type, self.username, self.session)
318 self.msg_id = str(uuid.uuid4())
318 self.msg_id = str(uuid.uuid4())
319 return h
319 return h
320
320
321 def msg(self, msg_type, content=None, parent=None, subheader=None):
321 def msg(self, msg_type, content=None, parent=None, subheader=None):
322 msg = {}
322 msg = {}
323 msg['header'] = self.msg_header(msg_type)
323 msg['header'] = self.msg_header(msg_type)
324 msg['msg_id'] = msg['header']['msg_id']
324 msg['msg_id'] = msg['header']['msg_id']
325 msg['parent_header'] = {} if parent is None else extract_header(parent)
325 msg['parent_header'] = {} if parent is None else extract_header(parent)
326 msg['msg_type'] = msg_type
326 msg['msg_type'] = msg_type
327 msg['content'] = {} if content is None else content
327 msg['content'] = {} if content is None else content
328 sub = {} if subheader is None else subheader
328 sub = {} if subheader is None else subheader
329 msg['header'].update(sub)
329 msg['header'].update(sub)
330 return msg
330 return msg
331
331
332 def check_key(self, msg_or_header):
332 def check_key(self, msg_or_header):
333 """Check that a message's header has the right key"""
333 """Check that a message's header has the right key"""
334 if self.key is None:
334 if self.key is None:
335 return True
335 return True
336 header = extract_header(msg_or_header)
336 header = extract_header(msg_or_header)
337 return header.get('key', None) == self.key
337 return header.get('key', None) == self.key
338
338
339
339
340 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
340 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
341 """Build and send a message via stream or socket.
341 """Build and send a message via stream or socket.
342
342
343 Parameters
343 Parameters
344 ----------
344 ----------
345
345
346 stream : zmq.Socket or ZMQStream
346 stream : zmq.Socket or ZMQStream
347 the socket-like object used to send the data
347 the socket-like object used to send the data
348 msg_type : str or Message/dict
348 msg_type : str or Message/dict
349 Normally, msg_type will be
349 Normally, msg_type will be
350
350
351
351
352
352
353 Returns
353 Returns
354 -------
354 -------
355 (msg,sent) : tuple
355 (msg,sent) : tuple
356 msg : Message
356 msg : Message
357 the nice wrapped dict-like object containing the headers
357 the nice wrapped dict-like object containing the headers
358
358
359 """
359 """
360 if isinstance(msg_type, (Message, dict)):
360 if isinstance(msg_type, (Message, dict)):
361 # we got a Message, not a msg_type
361 # we got a Message, not a msg_type
362 # don't build a new Message
362 # don't build a new Message
363 msg = msg_type
363 msg = msg_type
364 content = msg['content']
364 content = msg['content']
365 else:
365 else:
366 msg = self.msg(msg_type, content, parent, subheader)
366 msg = self.msg(msg_type, content, parent, subheader)
367 buffers = [] if buffers is None else buffers
367 buffers = [] if buffers is None else buffers
368 to_send = []
368 to_send = []
369 if isinstance(ident, list):
369 if isinstance(ident, list):
370 # accept list of idents
370 # accept list of idents
371 to_send.extend(ident)
371 to_send.extend(ident)
372 elif ident is not None:
372 elif ident is not None:
373 to_send.append(ident)
373 to_send.append(ident)
374 to_send.append(DELIM)
374 to_send.append(DELIM)
375 if self.key is not None:
375 if self.key is not None:
376 to_send.append(self.key)
376 to_send.append(self.key)
377 to_send.append(self.pack(msg['header']))
377 to_send.append(self.pack(msg['header']))
378 to_send.append(self.pack(msg['parent_header']))
378 to_send.append(self.pack(msg['parent_header']))
379
379
380 if content is None:
380 if content is None:
381 content = self.none
381 content = self.none
382 elif isinstance(content, dict):
382 elif isinstance(content, dict):
383 content = self.pack(content)
383 content = self.pack(content)
384 elif isinstance(content, str):
384 elif isinstance(content, str):
385 # content is already packed, as in a relayed message
385 # content is already packed, as in a relayed message
386 pass
386 pass
387 else:
387 else:
388 raise TypeError("Content incorrect type: %s"%type(content))
388 raise TypeError("Content incorrect type: %s"%type(content))
389 to_send.append(content)
389 to_send.append(content)
390 flag = 0
390 flag = 0
391 if buffers:
391 if buffers:
392 flag = zmq.SNDMORE
392 flag = zmq.SNDMORE
393 stream.send_multipart(to_send, flag, copy=False)
393 stream.send_multipart(to_send, flag, copy=False)
394 for b in buffers[:-1]:
394 for b in buffers[:-1]:
395 stream.send(b, flag, copy=False)
395 stream.send(b, flag, copy=False)
396 if buffers:
396 if buffers:
397 stream.send(buffers[-1], copy=False)
397 stream.send(buffers[-1], copy=False)
398 omsg = Message(msg)
398 omsg = Message(msg)
399 if self.debug:
399 if self.debug:
400 pprint.pprint(omsg)
400 pprint.pprint(omsg)
401 pprint.pprint(to_send)
401 pprint.pprint(to_send)
402 pprint.pprint(buffers)
402 pprint.pprint(buffers)
403 return omsg
403 return omsg
404
404
405 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
405 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
406 """Send a raw message via idents.
406 """Send a raw message via idents.
407
407
408 Parameters
408 Parameters
409 ----------
409 ----------
410 msg : list of sendable buffers"""
410 msg : list of sendable buffers"""
411 to_send = []
411 to_send = []
412 if isinstance(ident, str):
412 if isinstance(ident, str):
413 ident = [ident]
413 ident = [ident]
414 if ident is not None:
414 if ident is not None:
415 to_send.extend(ident)
415 to_send.extend(ident)
416 to_send.append(DELIM)
416 to_send.append(DELIM)
417 if self.key is not None:
417 if self.key is not None:
418 to_send.append(self.key)
418 to_send.append(self.key)
419 to_send.extend(msg)
419 to_send.extend(msg)
420 stream.send_multipart(msg, flags, copy=copy)
420 stream.send_multipart(msg, flags, copy=copy)
421
421
422 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
422 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
423 """receives and unpacks a message
423 """receives and unpacks a message
424 returns [idents], msg"""
424 returns [idents], msg"""
425 if isinstance(socket, ZMQStream):
425 if isinstance(socket, ZMQStream):
426 socket = socket.socket
426 socket = socket.socket
427 try:
427 try:
428 msg = socket.recv_multipart(mode)
428 msg = socket.recv_multipart(mode)
429 except zmq.ZMQError as e:
429 except zmq.ZMQError as e:
430 if e.errno == zmq.EAGAIN:
430 if e.errno == zmq.EAGAIN:
431 # We can convert EAGAIN to None as we know in this case
431 # We can convert EAGAIN to None as we know in this case
432 # recv_json won't return None.
432 # recv_json won't return None.
433 return None
433 return None
434 else:
434 else:
435 raise
435 raise
436 # return an actual Message object
436 # return an actual Message object
437 # determine the number of idents by trying to unpack them.
437 # determine the number of idents by trying to unpack them.
438 # this is terrible:
438 # this is terrible:
439 idents, msg = self.feed_identities(msg, copy)
439 idents, msg = self.feed_identities(msg, copy)
440 try:
440 try:
441 return idents, self.unpack_message(msg, content=content, copy=copy)
441 return idents, self.unpack_message(msg, content=content, copy=copy)
442 except Exception as e:
442 except Exception as e:
443 print (idents, msg)
443 print (idents, msg)
444 # TODO: handle it
444 # TODO: handle it
445 raise e
445 raise e
446
446
447 def feed_identities(self, msg, copy=True):
447 def feed_identities(self, msg, copy=True):
448 """This is a completely horrible thing, but it strips the zmq
448 """This is a completely horrible thing, but it strips the zmq
449 ident prefixes off of a message. It will break if any identities
449 ident prefixes off of a message. It will break if any identities
450 are unpackable by self.unpack."""
450 are unpackable by self.unpack."""
451 msg = list(msg)
451 msg = list(msg)
452 idents = []
452 idents = []
453 while len(msg) > 3:
453 while len(msg) > 3:
454 if copy:
454 if copy:
455 s = msg[0]
455 s = msg[0]
456 else:
456 else:
457 s = msg[0].bytes
457 s = msg[0].bytes
458 if s == DELIM:
458 if s == DELIM:
459 msg.pop(0)
459 msg.pop(0)
460 break
460 break
461 else:
461 else:
462 idents.append(s)
462 idents.append(s)
463 msg.pop(0)
463 msg.pop(0)
464
464
465 return idents, msg
465 return idents, msg
466
466
467 def unpack_message(self, msg, content=True, copy=True):
467 def unpack_message(self, msg, content=True, copy=True):
468 """Return a message object from the format
468 """Return a message object from the format
469 sent by self.send.
469 sent by self.send.
470
470
471 Parameters:
471 Parameters:
472 -----------
472 -----------
473
473
474 content : bool (True)
474 content : bool (True)
475 whether to unpack the content dict (True),
475 whether to unpack the content dict (True),
476 or leave it serialized (False)
476 or leave it serialized (False)
477
477
478 copy : bool (True)
478 copy : bool (True)
479 whether to return the bytes (True),
479 whether to return the bytes (True),
480 or the non-copying Message object in each place (False)
480 or the non-copying Message object in each place (False)
481
481
482 """
482 """
483 ikey = int(self.key is not None)
483 ikey = int(self.key is not None)
484 minlen = 3 + ikey
484 minlen = 3 + ikey
485 if not len(msg) >= minlen:
485 if not len(msg) >= minlen:
486 raise TypeError("malformed message, must have at least %i elements"%minlen)
486 raise TypeError("malformed message, must have at least %i elements"%minlen)
487 message = {}
487 message = {}
488 if not copy:
488 if not copy:
489 for i in range(minlen):
489 for i in range(minlen):
490 msg[i] = msg[i].bytes
490 msg[i] = msg[i].bytes
491 if ikey:
491 if ikey:
492 if not self.key == msg[0]:
492 if not self.key == msg[0]:
493 raise KeyError("Invalid Session Key: %s"%msg[0])
493 raise KeyError("Invalid Session Key: %s"%msg[0])
494 message['header'] = self.unpack(msg[ikey+0])
494 message['header'] = self.unpack(msg[ikey+0])
495 message['msg_type'] = message['header']['msg_type']
495 message['msg_type'] = message['header']['msg_type']
496 message['parent_header'] = self.unpack(msg[ikey+1])
496 message['parent_header'] = self.unpack(msg[ikey+1])
497 if content:
497 if content:
498 message['content'] = self.unpack(msg[ikey+2])
498 message['content'] = self.unpack(msg[ikey+2])
499 else:
499 else:
500 message['content'] = msg[ikey+2]
500 message['content'] = msg[ikey+2]
501
501
502 # message['buffers'] = msg[3:]
502 # message['buffers'] = msg[3:]
503 # else:
503 # else:
504 # message['header'] = self.unpack(msg[0].bytes)
504 # message['header'] = self.unpack(msg[0].bytes)
505 # message['msg_type'] = message['header']['msg_type']
505 # message['msg_type'] = message['header']['msg_type']
506 # message['parent_header'] = self.unpack(msg[1].bytes)
506 # message['parent_header'] = self.unpack(msg[1].bytes)
507 # if content:
507 # if content:
508 # message['content'] = self.unpack(msg[2].bytes)
508 # message['content'] = self.unpack(msg[2].bytes)
509 # else:
509 # else:
510 # message['content'] = msg[2].bytes
510 # message['content'] = msg[2].bytes
511
511
512 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
512 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
513 return message
513 return message
514
514
515
515
516
516
517 def test_msg2obj():
517 def test_msg2obj():
518 am = dict(x=1)
518 am = dict(x=1)
519 ao = Message(am)
519 ao = Message(am)
520 assert ao.x == am['x']
520 assert ao.x == am['x']
521
521
522 am['y'] = dict(z=1)
522 am['y'] = dict(z=1)
523 ao = Message(am)
523 ao = Message(am)
524 assert ao.y.z == am['y']['z']
524 assert ao.y.z == am['y']['z']
525
525
526 k1, k2 = 'y', 'z'
526 k1, k2 = 'y', 'z'
527 assert ao[k1][k2] == am[k1][k2]
527 assert ao[k1][k2] == am[k1][k2]
528
528
529 am2 = dict(ao)
529 am2 = dict(ao)
530 assert am['x'] == am2['x']
530 assert am['x'] == am2['x']
531 assert am['y']['z'] == am2['y']['z']
531 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now