##// END OF EJS Templates
Do not flatten unicode on unpacking
Thomas Kluyver -
Show More
@@ -1,627 +1,627 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Session object for building, serializing, sending, and receiving messages in
2 """Session object for building, serializing, sending, and receiving messages in
3 IPython. The Session object supports serialization, HMAC signatures, and
3 IPython. The Session object supports serialization, HMAC signatures, and
4 metadata on messages.
4 metadata on messages.
5
5
6 Also defined here are utilities for working with Sessions:
6 Also defined here are utilities for working with Sessions:
7 * A SessionFactory to be used as a base class for configurables that work with
7 * A SessionFactory to be used as a base class for configurables that work with
8 Sessions.
8 Sessions.
9 * A Message object for convenience that allows attribute-access to the msg dict.
9 * A Message object for convenience that allows attribute-access to the msg dict.
10
10
11 Authors:
11 Authors:
12
12
13 * Min RK
13 * Min RK
14 * Brian Granger
14 * Brian Granger
15 * Fernando Perez
15 * Fernando Perez
16 """
16 """
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Copyright (C) 2010-2011 The IPython Development Team
18 # Copyright (C) 2010-2011 The IPython Development Team
19 #
19 #
20 # Distributed under the terms of the BSD License. The full license is in
20 # Distributed under the terms of the BSD License. The full license is in
21 # the file COPYING, distributed as part of this software.
21 # the file COPYING, distributed as part of this software.
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Imports
25 # Imports
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 import hmac
28 import hmac
29 import logging
29 import logging
30 import os
30 import os
31 import pprint
31 import pprint
32 import uuid
32 import uuid
33 from datetime import datetime
33 from datetime import datetime
34
34
35 try:
35 try:
36 import cPickle
36 import cPickle
37 pickle = cPickle
37 pickle = cPickle
38 except:
38 except:
39 cPickle = None
39 cPickle = None
40 import pickle
40 import pickle
41
41
42 import zmq
42 import zmq
43 from zmq.utils import jsonapi
43 from zmq.utils import jsonapi
44 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.ioloop import IOLoop
45 from zmq.eventloop.zmqstream import ZMQStream
45 from zmq.eventloop.zmqstream import ZMQStream
46
46
47 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils.importstring import import_item
48 from IPython.utils.importstring import import_item
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
50 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # utility functions
53 # utility functions
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 def squash_unicode(obj):
56 def squash_unicode(obj):
57 """coerce unicode back to bytestrings."""
57 """coerce unicode back to bytestrings."""
58 if isinstance(obj,dict):
58 if isinstance(obj,dict):
59 for key in obj.keys():
59 for key in obj.keys():
60 obj[key] = squash_unicode(obj[key])
60 obj[key] = squash_unicode(obj[key])
61 if isinstance(key, unicode):
61 if isinstance(key, unicode):
62 obj[squash_unicode(key)] = obj.pop(key)
62 obj[squash_unicode(key)] = obj.pop(key)
63 elif isinstance(obj, list):
63 elif isinstance(obj, list):
64 for i,v in enumerate(obj):
64 for i,v in enumerate(obj):
65 obj[i] = squash_unicode(v)
65 obj[i] = squash_unicode(v)
66 elif isinstance(obj, unicode):
66 elif isinstance(obj, unicode):
67 obj = obj.encode('utf8')
67 obj = obj.encode('utf8')
68 return obj
68 return obj
69
69
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 # globals and defaults
71 # globals and defaults
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
73 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
74 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s)))
75 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76
76
77 pickle_packer = lambda o: pickle.dumps(o,-1)
77 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_unpacker = pickle.loads
78 pickle_unpacker = pickle.loads
79
79
80 default_packer = json_packer
80 default_packer = json_packer
81 default_unpacker = json_unpacker
81 default_unpacker = json_unpacker
82
82
83
83
84 DELIM="<IDS|MSG>"
84 DELIM="<IDS|MSG>"
85
85
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 # Classes
87 # Classes
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89
89
90 class SessionFactory(LoggingConfigurable):
90 class SessionFactory(LoggingConfigurable):
91 """The Base class for configurables that have a Session, Context, logger,
91 """The Base class for configurables that have a Session, Context, logger,
92 and IOLoop.
92 and IOLoop.
93 """
93 """
94
94
95 logname = Unicode('')
95 logname = Unicode('')
96 def _logname_changed(self, name, old, new):
96 def _logname_changed(self, name, old, new):
97 self.log = logging.getLogger(new)
97 self.log = logging.getLogger(new)
98
98
99 # not configurable:
99 # not configurable:
100 context = Instance('zmq.Context')
100 context = Instance('zmq.Context')
101 def _context_default(self):
101 def _context_default(self):
102 return zmq.Context.instance()
102 return zmq.Context.instance()
103
103
104 session = Instance('IPython.zmq.session.Session')
104 session = Instance('IPython.zmq.session.Session')
105
105
106 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
106 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 def _loop_default(self):
107 def _loop_default(self):
108 return IOLoop.instance()
108 return IOLoop.instance()
109
109
110 def __init__(self, **kwargs):
110 def __init__(self, **kwargs):
111 super(SessionFactory, self).__init__(**kwargs)
111 super(SessionFactory, self).__init__(**kwargs)
112
112
113 if self.session is None:
113 if self.session is None:
114 # construct the session
114 # construct the session
115 self.session = Session(**kwargs)
115 self.session = Session(**kwargs)
116
116
117
117
118 class Message(object):
118 class Message(object):
119 """A simple message object that maps dict keys to attributes.
119 """A simple message object that maps dict keys to attributes.
120
120
121 A Message can be created from a dict and a dict from a Message instance
121 A Message can be created from a dict and a dict from a Message instance
122 simply by calling dict(msg_obj)."""
122 simply by calling dict(msg_obj)."""
123
123
124 def __init__(self, msg_dict):
124 def __init__(self, msg_dict):
125 dct = self.__dict__
125 dct = self.__dict__
126 for k, v in dict(msg_dict).iteritems():
126 for k, v in dict(msg_dict).iteritems():
127 if isinstance(v, dict):
127 if isinstance(v, dict):
128 v = Message(v)
128 v = Message(v)
129 dct[k] = v
129 dct[k] = v
130
130
131 # Having this iterator lets dict(msg_obj) work out of the box.
131 # Having this iterator lets dict(msg_obj) work out of the box.
132 def __iter__(self):
132 def __iter__(self):
133 return iter(self.__dict__.iteritems())
133 return iter(self.__dict__.iteritems())
134
134
135 def __repr__(self):
135 def __repr__(self):
136 return repr(self.__dict__)
136 return repr(self.__dict__)
137
137
138 def __str__(self):
138 def __str__(self):
139 return pprint.pformat(self.__dict__)
139 return pprint.pformat(self.__dict__)
140
140
141 def __contains__(self, k):
141 def __contains__(self, k):
142 return k in self.__dict__
142 return k in self.__dict__
143
143
144 def __getitem__(self, k):
144 def __getitem__(self, k):
145 return self.__dict__[k]
145 return self.__dict__[k]
146
146
147
147
148 def msg_header(msg_id, msg_type, username, session):
148 def msg_header(msg_id, msg_type, username, session):
149 date = datetime.now()
149 date = datetime.now()
150 return locals()
150 return locals()
151
151
152 def extract_header(msg_or_header):
152 def extract_header(msg_or_header):
153 """Given a message or header, return the header."""
153 """Given a message or header, return the header."""
154 if not msg_or_header:
154 if not msg_or_header:
155 return {}
155 return {}
156 try:
156 try:
157 # See if msg_or_header is the entire message.
157 # See if msg_or_header is the entire message.
158 h = msg_or_header['header']
158 h = msg_or_header['header']
159 except KeyError:
159 except KeyError:
160 try:
160 try:
161 # See if msg_or_header is just the header
161 # See if msg_or_header is just the header
162 h = msg_or_header['msg_id']
162 h = msg_or_header['msg_id']
163 except KeyError:
163 except KeyError:
164 raise
164 raise
165 else:
165 else:
166 h = msg_or_header
166 h = msg_or_header
167 if not isinstance(h, dict):
167 if not isinstance(h, dict):
168 h = dict(h)
168 h = dict(h)
169 return h
169 return h
170
170
171 class Session(Configurable):
171 class Session(Configurable):
172 """Object for handling serialization and sending of messages.
172 """Object for handling serialization and sending of messages.
173
173
174 The Session object handles building messages and sending them
174 The Session object handles building messages and sending them
175 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
175 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 other over the network via Session objects, and only need to work with the
176 other over the network via Session objects, and only need to work with the
177 dict-based IPython message spec. The Session will handle
177 dict-based IPython message spec. The Session will handle
178 serialization/deserialization, security, and metadata.
178 serialization/deserialization, security, and metadata.
179
179
180 Sessions support configurable serialiization via packer/unpacker traits,
180 Sessions support configurable serialiization via packer/unpacker traits,
181 and signing with HMAC digests via the key/keyfile traits.
181 and signing with HMAC digests via the key/keyfile traits.
182
182
183 Parameters
183 Parameters
184 ----------
184 ----------
185
185
186 debug : bool
186 debug : bool
187 whether to trigger extra debugging statements
187 whether to trigger extra debugging statements
188 packer/unpacker : str : 'json', 'pickle' or import_string
188 packer/unpacker : str : 'json', 'pickle' or import_string
189 importstrings for methods to serialize message parts. If just
189 importstrings for methods to serialize message parts. If just
190 'json' or 'pickle', predefined JSON and pickle packers will be used.
190 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 Otherwise, the entire importstring must be used.
191 Otherwise, the entire importstring must be used.
192
192
193 The functions must accept at least valid JSON input, and output *bytes*.
193 The functions must accept at least valid JSON input, and output *bytes*.
194
194
195 For example, to use msgpack:
195 For example, to use msgpack:
196 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
196 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 pack/unpack : callables
197 pack/unpack : callables
198 You can also set the pack/unpack callables for serialization directly.
198 You can also set the pack/unpack callables for serialization directly.
199 session : bytes
199 session : bytes
200 the ID of this Session object. The default is to generate a new UUID.
200 the ID of this Session object. The default is to generate a new UUID.
201 username : unicode
201 username : unicode
202 username added to message headers. The default is to ask the OS.
202 username added to message headers. The default is to ask the OS.
203 key : bytes
203 key : bytes
204 The key used to initialize an HMAC signature. If unset, messages
204 The key used to initialize an HMAC signature. If unset, messages
205 will not be signed or checked.
205 will not be signed or checked.
206 keyfile : filepath
206 keyfile : filepath
207 The file containing a key. If this is set, `key` will be initialized
207 The file containing a key. If this is set, `key` will be initialized
208 to the contents of the file.
208 to the contents of the file.
209
209
210 """
210 """
211
211
212 debug=Bool(False, config=True, help="""Debug output in the Session""")
212 debug=Bool(False, config=True, help="""Debug output in the Session""")
213
213
214 packer = Unicode('json',config=True,
214 packer = Unicode('json',config=True,
215 help="""The name of the packer for serializing messages.
215 help="""The name of the packer for serializing messages.
216 Should be one of 'json', 'pickle', or an import name
216 Should be one of 'json', 'pickle', or an import name
217 for a custom callable serializer.""")
217 for a custom callable serializer.""")
218 def _packer_changed(self, name, old, new):
218 def _packer_changed(self, name, old, new):
219 if new.lower() == 'json':
219 if new.lower() == 'json':
220 self.pack = json_packer
220 self.pack = json_packer
221 self.unpack = json_unpacker
221 self.unpack = json_unpacker
222 elif new.lower() == 'pickle':
222 elif new.lower() == 'pickle':
223 self.pack = pickle_packer
223 self.pack = pickle_packer
224 self.unpack = pickle_unpacker
224 self.unpack = pickle_unpacker
225 else:
225 else:
226 self.pack = import_item(str(new))
226 self.pack = import_item(str(new))
227
227
228 unpacker = Unicode('json', config=True,
228 unpacker = Unicode('json', config=True,
229 help="""The name of the unpacker for unserializing messages.
229 help="""The name of the unpacker for unserializing messages.
230 Only used with custom functions for `packer`.""")
230 Only used with custom functions for `packer`.""")
231 def _unpacker_changed(self, name, old, new):
231 def _unpacker_changed(self, name, old, new):
232 if new.lower() == 'json':
232 if new.lower() == 'json':
233 self.pack = json_packer
233 self.pack = json_packer
234 self.unpack = json_unpacker
234 self.unpack = json_unpacker
235 elif new.lower() == 'pickle':
235 elif new.lower() == 'pickle':
236 self.pack = pickle_packer
236 self.pack = pickle_packer
237 self.unpack = pickle_unpacker
237 self.unpack = pickle_unpacker
238 else:
238 else:
239 self.unpack = import_item(str(new))
239 self.unpack = import_item(str(new))
240
240
241 session = CStr('', config=True,
241 session = CStr('', config=True,
242 help="""The UUID identifying this session.""")
242 help="""The UUID identifying this session.""")
243 def _session_default(self):
243 def _session_default(self):
244 return bytes(uuid.uuid4())
244 return bytes(uuid.uuid4())
245
245
246 username = Unicode(os.environ.get('USER','username'), config=True,
246 username = Unicode(os.environ.get('USER','username'), config=True,
247 help="""Username for the Session. Default is your system username.""")
247 help="""Username for the Session. Default is your system username.""")
248
248
249 # message signature related traits:
249 # message signature related traits:
250 key = CStr('', config=True,
250 key = CStr('', config=True,
251 help="""execution key, for extra authentication.""")
251 help="""execution key, for extra authentication.""")
252 def _key_changed(self, name, old, new):
252 def _key_changed(self, name, old, new):
253 if new:
253 if new:
254 self.auth = hmac.HMAC(new)
254 self.auth = hmac.HMAC(new)
255 else:
255 else:
256 self.auth = None
256 self.auth = None
257 auth = Instance(hmac.HMAC)
257 auth = Instance(hmac.HMAC)
258 digest_history = Set()
258 digest_history = Set()
259
259
260 keyfile = Unicode('', config=True,
260 keyfile = Unicode('', config=True,
261 help="""path to file containing execution key.""")
261 help="""path to file containing execution key.""")
262 def _keyfile_changed(self, name, old, new):
262 def _keyfile_changed(self, name, old, new):
263 with open(new, 'rb') as f:
263 with open(new, 'rb') as f:
264 self.key = f.read().strip()
264 self.key = f.read().strip()
265
265
266 pack = Any(default_packer) # the actual packer function
266 pack = Any(default_packer) # the actual packer function
267 def _pack_changed(self, name, old, new):
267 def _pack_changed(self, name, old, new):
268 if not callable(new):
268 if not callable(new):
269 raise TypeError("packer must be callable, not %s"%type(new))
269 raise TypeError("packer must be callable, not %s"%type(new))
270
270
271 unpack = Any(default_unpacker) # the actual packer function
271 unpack = Any(default_unpacker) # the actual packer function
272 def _unpack_changed(self, name, old, new):
272 def _unpack_changed(self, name, old, new):
273 # unpacker is not checked - it is assumed to be
273 # unpacker is not checked - it is assumed to be
274 if not callable(new):
274 if not callable(new):
275 raise TypeError("unpacker must be callable, not %s"%type(new))
275 raise TypeError("unpacker must be callable, not %s"%type(new))
276
276
277 def __init__(self, **kwargs):
277 def __init__(self, **kwargs):
278 """create a Session object
278 """create a Session object
279
279
280 Parameters
280 Parameters
281 ----------
281 ----------
282
282
283 debug : bool
283 debug : bool
284 whether to trigger extra debugging statements
284 whether to trigger extra debugging statements
285 packer/unpacker : str : 'json', 'pickle' or import_string
285 packer/unpacker : str : 'json', 'pickle' or import_string
286 importstrings for methods to serialize message parts. If just
286 importstrings for methods to serialize message parts. If just
287 'json' or 'pickle', predefined JSON and pickle packers will be used.
287 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 Otherwise, the entire importstring must be used.
288 Otherwise, the entire importstring must be used.
289
289
290 The functions must accept at least valid JSON input, and output
290 The functions must accept at least valid JSON input, and output
291 *bytes*.
291 *bytes*.
292
292
293 For example, to use msgpack:
293 For example, to use msgpack:
294 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
294 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 pack/unpack : callables
295 pack/unpack : callables
296 You can also set the pack/unpack callables for serialization
296 You can also set the pack/unpack callables for serialization
297 directly.
297 directly.
298 session : bytes
298 session : bytes
299 the ID of this Session object. The default is to generate a new
299 the ID of this Session object. The default is to generate a new
300 UUID.
300 UUID.
301 username : unicode
301 username : unicode
302 username added to message headers. The default is to ask the OS.
302 username added to message headers. The default is to ask the OS.
303 key : bytes
303 key : bytes
304 The key used to initialize an HMAC signature. If unset, messages
304 The key used to initialize an HMAC signature. If unset, messages
305 will not be signed or checked.
305 will not be signed or checked.
306 keyfile : filepath
306 keyfile : filepath
307 The file containing a key. If this is set, `key` will be
307 The file containing a key. If this is set, `key` will be
308 initialized to the contents of the file.
308 initialized to the contents of the file.
309 """
309 """
310 super(Session, self).__init__(**kwargs)
310 super(Session, self).__init__(**kwargs)
311 self._check_packers()
311 self._check_packers()
312 self.none = self.pack({})
312 self.none = self.pack({})
313
313
314 @property
314 @property
315 def msg_id(self):
315 def msg_id(self):
316 """always return new uuid"""
316 """always return new uuid"""
317 return str(uuid.uuid4())
317 return str(uuid.uuid4())
318
318
319 def _check_packers(self):
319 def _check_packers(self):
320 """check packers for binary data and datetime support."""
320 """check packers for binary data and datetime support."""
321 pack = self.pack
321 pack = self.pack
322 unpack = self.unpack
322 unpack = self.unpack
323
323
324 # check simple serialization
324 # check simple serialization
325 msg = dict(a=[1,'hi'])
325 msg = dict(a=[1,'hi'])
326 try:
326 try:
327 packed = pack(msg)
327 packed = pack(msg)
328 except Exception:
328 except Exception:
329 raise ValueError("packer could not serialize a simple message")
329 raise ValueError("packer could not serialize a simple message")
330
330
331 # ensure packed message is bytes
331 # ensure packed message is bytes
332 if not isinstance(packed, bytes):
332 if not isinstance(packed, bytes):
333 raise ValueError("message packed to %r, but bytes are required"%type(packed))
333 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334
334
335 # check that unpack is pack's inverse
335 # check that unpack is pack's inverse
336 try:
336 try:
337 unpacked = unpack(packed)
337 unpacked = unpack(packed)
338 except Exception:
338 except Exception:
339 raise ValueError("unpacker could not handle the packer's output")
339 raise ValueError("unpacker could not handle the packer's output")
340
340
341 # check datetime support
341 # check datetime support
342 msg = dict(t=datetime.now())
342 msg = dict(t=datetime.now())
343 try:
343 try:
344 unpacked = unpack(pack(msg))
344 unpacked = unpack(pack(msg))
345 except Exception:
345 except Exception:
346 self.pack = lambda o: pack(squash_dates(o))
346 self.pack = lambda o: pack(squash_dates(o))
347 self.unpack = lambda s: extract_dates(unpack(s))
347 self.unpack = lambda s: extract_dates(unpack(s))
348
348
349 def msg_header(self, msg_type):
349 def msg_header(self, msg_type):
350 return msg_header(self.msg_id, msg_type, self.username, self.session)
350 return msg_header(self.msg_id, msg_type, self.username, self.session)
351
351
352 def msg(self, msg_type, content=None, parent=None, subheader=None):
352 def msg(self, msg_type, content=None, parent=None, subheader=None):
353 msg = {}
353 msg = {}
354 msg['header'] = self.msg_header(msg_type)
354 msg['header'] = self.msg_header(msg_type)
355 msg['msg_id'] = msg['header']['msg_id']
355 msg['msg_id'] = msg['header']['msg_id']
356 msg['parent_header'] = {} if parent is None else extract_header(parent)
356 msg['parent_header'] = {} if parent is None else extract_header(parent)
357 msg['msg_type'] = msg_type
357 msg['msg_type'] = msg_type
358 msg['content'] = {} if content is None else content
358 msg['content'] = {} if content is None else content
359 sub = {} if subheader is None else subheader
359 sub = {} if subheader is None else subheader
360 msg['header'].update(sub)
360 msg['header'].update(sub)
361 return msg
361 return msg
362
362
363 def sign(self, msg):
363 def sign(self, msg):
364 """Sign a message with HMAC digest. If no auth, return b''."""
364 """Sign a message with HMAC digest. If no auth, return b''."""
365 if self.auth is None:
365 if self.auth is None:
366 return b''
366 return b''
367 h = self.auth.copy()
367 h = self.auth.copy()
368 for m in msg:
368 for m in msg:
369 h.update(m)
369 h.update(m)
370 return h.hexdigest()
370 return h.hexdigest()
371
371
372 def serialize(self, msg, ident=None):
372 def serialize(self, msg, ident=None):
373 """Serialize the message components to bytes.
373 """Serialize the message components to bytes.
374
374
375 Returns
375 Returns
376 -------
376 -------
377
377
378 list of bytes objects
378 list of bytes objects
379
379
380 """
380 """
381 content = msg.get('content', {})
381 content = msg.get('content', {})
382 if content is None:
382 if content is None:
383 content = self.none
383 content = self.none
384 elif isinstance(content, dict):
384 elif isinstance(content, dict):
385 content = self.pack(content)
385 content = self.pack(content)
386 elif isinstance(content, bytes):
386 elif isinstance(content, bytes):
387 # content is already packed, as in a relayed message
387 # content is already packed, as in a relayed message
388 pass
388 pass
389 elif isinstance(content, unicode):
389 elif isinstance(content, unicode):
390 # should be bytes, but JSON often spits out unicode
390 # should be bytes, but JSON often spits out unicode
391 content = content.encode('utf8')
391 content = content.encode('utf8')
392 else:
392 else:
393 raise TypeError("Content incorrect type: %s"%type(content))
393 raise TypeError("Content incorrect type: %s"%type(content))
394
394
395 real_message = [self.pack(msg['header']),
395 real_message = [self.pack(msg['header']),
396 self.pack(msg['parent_header']),
396 self.pack(msg['parent_header']),
397 content
397 content
398 ]
398 ]
399
399
400 to_send = []
400 to_send = []
401
401
402 if isinstance(ident, list):
402 if isinstance(ident, list):
403 # accept list of idents
403 # accept list of idents
404 to_send.extend(ident)
404 to_send.extend(ident)
405 elif ident is not None:
405 elif ident is not None:
406 to_send.append(ident)
406 to_send.append(ident)
407 to_send.append(DELIM)
407 to_send.append(DELIM)
408
408
409 signature = self.sign(real_message)
409 signature = self.sign(real_message)
410 to_send.append(signature)
410 to_send.append(signature)
411
411
412 to_send.extend(real_message)
412 to_send.extend(real_message)
413
413
414 return to_send
414 return to_send
415
415
416 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
416 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
417 buffers=None, subheader=None, track=False):
417 buffers=None, subheader=None, track=False):
418 """Build and send a message via stream or socket.
418 """Build and send a message via stream or socket.
419
419
420 Parameters
420 Parameters
421 ----------
421 ----------
422
422
423 stream : zmq.Socket or ZMQStream
423 stream : zmq.Socket or ZMQStream
424 the socket-like object used to send the data
424 the socket-like object used to send the data
425 msg_or_type : str or Message/dict
425 msg_or_type : str or Message/dict
426 Normally, msg_or_type will be a msg_type unless a message is being
426 Normally, msg_or_type will be a msg_type unless a message is being
427 sent more than once.
427 sent more than once.
428
428
429 content : dict or None
429 content : dict or None
430 the content of the message (ignored if msg_or_type is a message)
430 the content of the message (ignored if msg_or_type is a message)
431 parent : Message or dict or None
431 parent : Message or dict or None
432 the parent or parent header describing the parent of this message
432 the parent or parent header describing the parent of this message
433 ident : bytes or list of bytes
433 ident : bytes or list of bytes
434 the zmq.IDENTITY routing path
434 the zmq.IDENTITY routing path
435 subheader : dict or None
435 subheader : dict or None
436 extra header keys for this message's header
436 extra header keys for this message's header
437 buffers : list or None
437 buffers : list or None
438 the already-serialized buffers to be appended to the message
438 the already-serialized buffers to be appended to the message
439 track : bool
439 track : bool
440 whether to track. Only for use with Sockets,
440 whether to track. Only for use with Sockets,
441 because ZMQStream objects cannot track messages.
441 because ZMQStream objects cannot track messages.
442
442
443 Returns
443 Returns
444 -------
444 -------
445 msg : message dict
445 msg : message dict
446 the constructed message
446 the constructed message
447 (msg,tracker) : (message dict, MessageTracker)
447 (msg,tracker) : (message dict, MessageTracker)
448 if track=True, then a 2-tuple will be returned,
448 if track=True, then a 2-tuple will be returned,
449 the first element being the constructed
449 the first element being the constructed
450 message, and the second being the MessageTracker
450 message, and the second being the MessageTracker
451
451
452 """
452 """
453
453
454 if not isinstance(stream, (zmq.Socket, ZMQStream)):
454 if not isinstance(stream, (zmq.Socket, ZMQStream)):
455 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
455 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
456 elif track and isinstance(stream, ZMQStream):
456 elif track and isinstance(stream, ZMQStream):
457 raise TypeError("ZMQStream cannot track messages")
457 raise TypeError("ZMQStream cannot track messages")
458
458
459 if isinstance(msg_or_type, (Message, dict)):
459 if isinstance(msg_or_type, (Message, dict)):
460 # we got a Message, not a msg_type
460 # we got a Message, not a msg_type
461 # don't build a new Message
461 # don't build a new Message
462 msg = msg_or_type
462 msg = msg_or_type
463 else:
463 else:
464 msg = self.msg(msg_or_type, content, parent, subheader)
464 msg = self.msg(msg_or_type, content, parent, subheader)
465
465
466 buffers = [] if buffers is None else buffers
466 buffers = [] if buffers is None else buffers
467 to_send = self.serialize(msg, ident)
467 to_send = self.serialize(msg, ident)
468 flag = 0
468 flag = 0
469 if buffers:
469 if buffers:
470 flag = zmq.SNDMORE
470 flag = zmq.SNDMORE
471 _track = False
471 _track = False
472 else:
472 else:
473 _track=track
473 _track=track
474 if track:
474 if track:
475 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
475 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
476 else:
476 else:
477 tracker = stream.send_multipart(to_send, flag, copy=False)
477 tracker = stream.send_multipart(to_send, flag, copy=False)
478 for b in buffers[:-1]:
478 for b in buffers[:-1]:
479 stream.send(b, flag, copy=False)
479 stream.send(b, flag, copy=False)
480 if buffers:
480 if buffers:
481 if track:
481 if track:
482 tracker = stream.send(buffers[-1], copy=False, track=track)
482 tracker = stream.send(buffers[-1], copy=False, track=track)
483 else:
483 else:
484 tracker = stream.send(buffers[-1], copy=False)
484 tracker = stream.send(buffers[-1], copy=False)
485
485
486 # omsg = Message(msg)
486 # omsg = Message(msg)
487 if self.debug:
487 if self.debug:
488 pprint.pprint(msg)
488 pprint.pprint(msg)
489 pprint.pprint(to_send)
489 pprint.pprint(to_send)
490 pprint.pprint(buffers)
490 pprint.pprint(buffers)
491
491
492 msg['tracker'] = tracker
492 msg['tracker'] = tracker
493
493
494 return msg
494 return msg
495
495
496 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
496 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
497 """Send a raw message via ident path.
497 """Send a raw message via ident path.
498
498
499 Parameters
499 Parameters
500 ----------
500 ----------
501 msg : list of sendable buffers"""
501 msg : list of sendable buffers"""
502 to_send = []
502 to_send = []
503 if isinstance(ident, bytes):
503 if isinstance(ident, bytes):
504 ident = [ident]
504 ident = [ident]
505 if ident is not None:
505 if ident is not None:
506 to_send.extend(ident)
506 to_send.extend(ident)
507
507
508 to_send.append(DELIM)
508 to_send.append(DELIM)
509 to_send.append(self.sign(msg))
509 to_send.append(self.sign(msg))
510 to_send.extend(msg)
510 to_send.extend(msg)
511 stream.send_multipart(msg, flags, copy=copy)
511 stream.send_multipart(msg, flags, copy=copy)
512
512
513 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
513 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
514 """receives and unpacks a message
514 """receives and unpacks a message
515 returns [idents], msg"""
515 returns [idents], msg"""
516 if isinstance(socket, ZMQStream):
516 if isinstance(socket, ZMQStream):
517 socket = socket.socket
517 socket = socket.socket
518 try:
518 try:
519 msg = socket.recv_multipart(mode)
519 msg = socket.recv_multipart(mode)
520 except zmq.ZMQError as e:
520 except zmq.ZMQError as e:
521 if e.errno == zmq.EAGAIN:
521 if e.errno == zmq.EAGAIN:
522 # We can convert EAGAIN to None as we know in this case
522 # We can convert EAGAIN to None as we know in this case
523 # recv_multipart won't return None.
523 # recv_multipart won't return None.
524 return None,None
524 return None,None
525 else:
525 else:
526 raise
526 raise
527 # split multipart message into identity list and message dict
527 # split multipart message into identity list and message dict
528 # invalid large messages can cause very expensive string comparisons
528 # invalid large messages can cause very expensive string comparisons
529 idents, msg = self.feed_identities(msg, copy)
529 idents, msg = self.feed_identities(msg, copy)
530 try:
530 try:
531 return idents, self.unpack_message(msg, content=content, copy=copy)
531 return idents, self.unpack_message(msg, content=content, copy=copy)
532 except Exception as e:
532 except Exception as e:
533 print (idents, msg)
533 print (idents, msg)
534 # TODO: handle it
534 # TODO: handle it
535 raise e
535 raise e
536
536
537 def feed_identities(self, msg, copy=True):
537 def feed_identities(self, msg, copy=True):
538 """feed until DELIM is reached, then return the prefix as idents and
538 """feed until DELIM is reached, then return the prefix as idents and
539 remainder as msg. This is easily broken by setting an IDENT to DELIM,
539 remainder as msg. This is easily broken by setting an IDENT to DELIM,
540 but that would be silly.
540 but that would be silly.
541
541
542 Parameters
542 Parameters
543 ----------
543 ----------
544 msg : a list of Message or bytes objects
544 msg : a list of Message or bytes objects
545 the message to be split
545 the message to be split
546 copy : bool
546 copy : bool
547 flag determining whether the arguments are bytes or Messages
547 flag determining whether the arguments are bytes or Messages
548
548
549 Returns
549 Returns
550 -------
550 -------
551 (idents,msg) : two lists
551 (idents,msg) : two lists
552 idents will always be a list of bytes - the indentity prefix
552 idents will always be a list of bytes - the indentity prefix
553 msg will be a list of bytes or Messages, unchanged from input
553 msg will be a list of bytes or Messages, unchanged from input
554 msg should be unpackable via self.unpack_message at this point.
554 msg should be unpackable via self.unpack_message at this point.
555 """
555 """
556 if copy:
556 if copy:
557 idx = msg.index(DELIM)
557 idx = msg.index(DELIM)
558 return msg[:idx], msg[idx+1:]
558 return msg[:idx], msg[idx+1:]
559 else:
559 else:
560 failed = True
560 failed = True
561 for idx,m in enumerate(msg):
561 for idx,m in enumerate(msg):
562 if m.bytes == DELIM:
562 if m.bytes == DELIM:
563 failed = False
563 failed = False
564 break
564 break
565 if failed:
565 if failed:
566 raise ValueError("DELIM not in msg")
566 raise ValueError("DELIM not in msg")
567 idents, msg = msg[:idx], msg[idx+1:]
567 idents, msg = msg[:idx], msg[idx+1:]
568 return [m.bytes for m in idents], msg
568 return [m.bytes for m in idents], msg
569
569
570 def unpack_message(self, msg, content=True, copy=True):
570 def unpack_message(self, msg, content=True, copy=True):
571 """Return a message object from the format
571 """Return a message object from the format
572 sent by self.send.
572 sent by self.send.
573
573
574 Parameters:
574 Parameters:
575 -----------
575 -----------
576
576
577 content : bool (True)
577 content : bool (True)
578 whether to unpack the content dict (True),
578 whether to unpack the content dict (True),
579 or leave it serialized (False)
579 or leave it serialized (False)
580
580
581 copy : bool (True)
581 copy : bool (True)
582 whether to return the bytes (True),
582 whether to return the bytes (True),
583 or the non-copying Message object in each place (False)
583 or the non-copying Message object in each place (False)
584
584
585 """
585 """
586 minlen = 4
586 minlen = 4
587 message = {}
587 message = {}
588 if not copy:
588 if not copy:
589 for i in range(minlen):
589 for i in range(minlen):
590 msg[i] = msg[i].bytes
590 msg[i] = msg[i].bytes
591 if self.auth is not None:
591 if self.auth is not None:
592 signature = msg[0]
592 signature = msg[0]
593 if signature in self.digest_history:
593 if signature in self.digest_history:
594 raise ValueError("Duplicate Signature: %r"%signature)
594 raise ValueError("Duplicate Signature: %r"%signature)
595 self.digest_history.add(signature)
595 self.digest_history.add(signature)
596 check = self.sign(msg[1:4])
596 check = self.sign(msg[1:4])
597 if not signature == check:
597 if not signature == check:
598 raise ValueError("Invalid Signature: %r"%signature)
598 raise ValueError("Invalid Signature: %r"%signature)
599 if not len(msg) >= minlen:
599 if not len(msg) >= minlen:
600 raise TypeError("malformed message, must have at least %i elements"%minlen)
600 raise TypeError("malformed message, must have at least %i elements"%minlen)
601 message['header'] = self.unpack(msg[1])
601 message['header'] = self.unpack(msg[1])
602 message['msg_type'] = message['header']['msg_type']
602 message['msg_type'] = message['header']['msg_type']
603 message['parent_header'] = self.unpack(msg[2])
603 message['parent_header'] = self.unpack(msg[2])
604 if content:
604 if content:
605 message['content'] = self.unpack(msg[3])
605 message['content'] = self.unpack(msg[3])
606 else:
606 else:
607 message['content'] = msg[3]
607 message['content'] = msg[3]
608
608
609 message['buffers'] = msg[4:]
609 message['buffers'] = msg[4:]
610 return message
610 return message
611
611
612 def test_msg2obj():
612 def test_msg2obj():
613 am = dict(x=1)
613 am = dict(x=1)
614 ao = Message(am)
614 ao = Message(am)
615 assert ao.x == am['x']
615 assert ao.x == am['x']
616
616
617 am['y'] = dict(z=1)
617 am['y'] = dict(z=1)
618 ao = Message(am)
618 ao = Message(am)
619 assert ao.y.z == am['y']['z']
619 assert ao.y.z == am['y']['z']
620
620
621 k1, k2 = 'y', 'z'
621 k1, k2 = 'y', 'z'
622 assert ao[k1][k2] == am[k1][k2]
622 assert ao[k1][k2] == am[k1][k2]
623
623
624 am2 = dict(ao)
624 am2 = dict(ao)
625 assert am['x'] == am2['x']
625 assert am['x'] == am2['x']
626 assert am['y']['z'] == am2['y']['z']
626 assert am['y']['z'] == am2['y']['z']
627
627
General Comments 0
You need to be logged in to leave comments. Login now