##// END OF EJS Templates
don't ensure_ascii in JSON messages in Session
MinRK -
Show More
@@ -1,850 +1,837 b''
1 """Session object for building, serializing, sending, and receiving messages in
1 """Session object for building, serializing, sending, and receiving messages in
2 IPython. The Session object supports serialization, HMAC signatures, and
2 IPython. The Session object supports serialization, HMAC signatures, and
3 metadata on messages.
3 metadata on messages.
4
4
5 Also defined here are utilities for working with Sessions:
5 Also defined here are utilities for working with Sessions:
6 * A SessionFactory to be used as a base class for configurables that work with
6 * A SessionFactory to be used as a base class for configurables that work with
7 Sessions.
7 Sessions.
8 * A Message object for convenience that allows attribute-access to the msg dict.
8 * A Message object for convenience that allows attribute-access to the msg dict.
9
10 Authors:
11
12 * Min RK
13 * Brian Granger
14 * Fernando Perez
15 """
9 """
16 #-----------------------------------------------------------------------------
17 # Copyright (C) 2010-2011 The IPython Development Team
18 #
19 # Distributed under the terms of the BSD License. The full license is in
20 # the file COPYING, distributed as part of this software.
21 #-----------------------------------------------------------------------------
22
10
23 #-----------------------------------------------------------------------------
11 # Copyright (c) IPython Development Team.
24 # Imports
12 # Distributed under the terms of the Modified BSD License.
25 #-----------------------------------------------------------------------------
26
13
27 import hashlib
14 import hashlib
28 import hmac
15 import hmac
29 import logging
16 import logging
30 import os
17 import os
31 import pprint
18 import pprint
32 import random
19 import random
33 import uuid
20 import uuid
34 from datetime import datetime
21 from datetime import datetime
35
22
36 try:
23 try:
37 import cPickle
24 import cPickle
38 pickle = cPickle
25 pickle = cPickle
39 except:
26 except:
40 cPickle = None
27 cPickle = None
41 import pickle
28 import pickle
42
29
43 import zmq
30 import zmq
44 from zmq.utils import jsonapi
31 from zmq.utils import jsonapi
45 from zmq.eventloop.ioloop import IOLoop
32 from zmq.eventloop.ioloop import IOLoop
46 from zmq.eventloop.zmqstream import ZMQStream
33 from zmq.eventloop.zmqstream import ZMQStream
47
34
48 from IPython.config.configurable import Configurable, LoggingConfigurable
35 from IPython.config.configurable import Configurable, LoggingConfigurable
49 from IPython.utils import io
36 from IPython.utils import io
50 from IPython.utils.importstring import import_item
37 from IPython.utils.importstring import import_item
51 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
38 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
52 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
39 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
53 iteritems)
40 iteritems)
54 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
41 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
55 DottedObjectName, CUnicode, Dict, Integer,
42 DottedObjectName, CUnicode, Dict, Integer,
56 TraitError,
43 TraitError,
57 )
44 )
58 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
45 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
59
46
60 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
61 # utility functions
48 # utility functions
62 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
63
50
64 def squash_unicode(obj):
51 def squash_unicode(obj):
65 """coerce unicode back to bytestrings."""
52 """coerce unicode back to bytestrings."""
66 if isinstance(obj,dict):
53 if isinstance(obj,dict):
67 for key in obj.keys():
54 for key in obj.keys():
68 obj[key] = squash_unicode(obj[key])
55 obj[key] = squash_unicode(obj[key])
69 if isinstance(key, unicode_type):
56 if isinstance(key, unicode_type):
70 obj[squash_unicode(key)] = obj.pop(key)
57 obj[squash_unicode(key)] = obj.pop(key)
71 elif isinstance(obj, list):
58 elif isinstance(obj, list):
72 for i,v in enumerate(obj):
59 for i,v in enumerate(obj):
73 obj[i] = squash_unicode(v)
60 obj[i] = squash_unicode(v)
74 elif isinstance(obj, unicode_type):
61 elif isinstance(obj, unicode_type):
75 obj = obj.encode('utf8')
62 obj = obj.encode('utf8')
76 return obj
63 return obj
77
64
78 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
79 # globals and defaults
66 # globals and defaults
80 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
81
68
82 # ISO8601-ify datetime objects
69 # ISO8601-ify datetime objects
83 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
70 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default, ensure_ascii=False)
84 json_unpacker = lambda s: jsonapi.loads(s)
71 json_unpacker = lambda s: jsonapi.loads(s)
85
72
86 pickle_packer = lambda o: pickle.dumps(squash_dates(o),-1)
73 pickle_packer = lambda o: pickle.dumps(squash_dates(o),-1)
87 pickle_unpacker = pickle.loads
74 pickle_unpacker = pickle.loads
88
75
89 default_packer = json_packer
76 default_packer = json_packer
90 default_unpacker = json_unpacker
77 default_unpacker = json_unpacker
91
78
92 DELIM = b"<IDS|MSG>"
79 DELIM = b"<IDS|MSG>"
93 # singleton dummy tracker, which will always report as done
80 # singleton dummy tracker, which will always report as done
94 DONE = zmq.MessageTracker()
81 DONE = zmq.MessageTracker()
95
82
96 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
97 # Mixin tools for apps that use Sessions
84 # Mixin tools for apps that use Sessions
98 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
99
86
100 session_aliases = dict(
87 session_aliases = dict(
101 ident = 'Session.session',
88 ident = 'Session.session',
102 user = 'Session.username',
89 user = 'Session.username',
103 keyfile = 'Session.keyfile',
90 keyfile = 'Session.keyfile',
104 )
91 )
105
92
106 session_flags = {
93 session_flags = {
107 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
94 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
108 'keyfile' : '' }},
95 'keyfile' : '' }},
109 """Use HMAC digests for authentication of messages.
96 """Use HMAC digests for authentication of messages.
110 Setting this flag will generate a new UUID to use as the HMAC key.
97 Setting this flag will generate a new UUID to use as the HMAC key.
111 """),
98 """),
112 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
99 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
113 """Don't authenticate messages."""),
100 """Don't authenticate messages."""),
114 }
101 }
115
102
116 def default_secure(cfg):
103 def default_secure(cfg):
117 """Set the default behavior for a config environment to be secure.
104 """Set the default behavior for a config environment to be secure.
118
105
119 If Session.key/keyfile have not been set, set Session.key to
106 If Session.key/keyfile have not been set, set Session.key to
120 a new random UUID.
107 a new random UUID.
121 """
108 """
122
109
123 if 'Session' in cfg:
110 if 'Session' in cfg:
124 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
111 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
125 return
112 return
126 # key/keyfile not specified, generate new UUID:
113 # key/keyfile not specified, generate new UUID:
127 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
114 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
128
115
129
116
130 #-----------------------------------------------------------------------------
117 #-----------------------------------------------------------------------------
131 # Classes
118 # Classes
132 #-----------------------------------------------------------------------------
119 #-----------------------------------------------------------------------------
133
120
134 class SessionFactory(LoggingConfigurable):
121 class SessionFactory(LoggingConfigurable):
135 """The Base class for configurables that have a Session, Context, logger,
122 """The Base class for configurables that have a Session, Context, logger,
136 and IOLoop.
123 and IOLoop.
137 """
124 """
138
125
139 logname = Unicode('')
126 logname = Unicode('')
140 def _logname_changed(self, name, old, new):
127 def _logname_changed(self, name, old, new):
141 self.log = logging.getLogger(new)
128 self.log = logging.getLogger(new)
142
129
143 # not configurable:
130 # not configurable:
144 context = Instance('zmq.Context')
131 context = Instance('zmq.Context')
145 def _context_default(self):
132 def _context_default(self):
146 return zmq.Context.instance()
133 return zmq.Context.instance()
147
134
148 session = Instance('IPython.kernel.zmq.session.Session')
135 session = Instance('IPython.kernel.zmq.session.Session')
149
136
150 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
137 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
151 def _loop_default(self):
138 def _loop_default(self):
152 return IOLoop.instance()
139 return IOLoop.instance()
153
140
154 def __init__(self, **kwargs):
141 def __init__(self, **kwargs):
155 super(SessionFactory, self).__init__(**kwargs)
142 super(SessionFactory, self).__init__(**kwargs)
156
143
157 if self.session is None:
144 if self.session is None:
158 # construct the session
145 # construct the session
159 self.session = Session(**kwargs)
146 self.session = Session(**kwargs)
160
147
161
148
162 class Message(object):
149 class Message(object):
163 """A simple message object that maps dict keys to attributes.
150 """A simple message object that maps dict keys to attributes.
164
151
165 A Message can be created from a dict and a dict from a Message instance
152 A Message can be created from a dict and a dict from a Message instance
166 simply by calling dict(msg_obj)."""
153 simply by calling dict(msg_obj)."""
167
154
168 def __init__(self, msg_dict):
155 def __init__(self, msg_dict):
169 dct = self.__dict__
156 dct = self.__dict__
170 for k, v in iteritems(dict(msg_dict)):
157 for k, v in iteritems(dict(msg_dict)):
171 if isinstance(v, dict):
158 if isinstance(v, dict):
172 v = Message(v)
159 v = Message(v)
173 dct[k] = v
160 dct[k] = v
174
161
175 # Having this iterator lets dict(msg_obj) work out of the box.
162 # Having this iterator lets dict(msg_obj) work out of the box.
176 def __iter__(self):
163 def __iter__(self):
177 return iter(iteritems(self.__dict__))
164 return iter(iteritems(self.__dict__))
178
165
179 def __repr__(self):
166 def __repr__(self):
180 return repr(self.__dict__)
167 return repr(self.__dict__)
181
168
182 def __str__(self):
169 def __str__(self):
183 return pprint.pformat(self.__dict__)
170 return pprint.pformat(self.__dict__)
184
171
185 def __contains__(self, k):
172 def __contains__(self, k):
186 return k in self.__dict__
173 return k in self.__dict__
187
174
188 def __getitem__(self, k):
175 def __getitem__(self, k):
189 return self.__dict__[k]
176 return self.__dict__[k]
190
177
191
178
192 def msg_header(msg_id, msg_type, username, session):
179 def msg_header(msg_id, msg_type, username, session):
193 date = datetime.now()
180 date = datetime.now()
194 return locals()
181 return locals()
195
182
196 def extract_header(msg_or_header):
183 def extract_header(msg_or_header):
197 """Given a message or header, return the header."""
184 """Given a message or header, return the header."""
198 if not msg_or_header:
185 if not msg_or_header:
199 return {}
186 return {}
200 try:
187 try:
201 # See if msg_or_header is the entire message.
188 # See if msg_or_header is the entire message.
202 h = msg_or_header['header']
189 h = msg_or_header['header']
203 except KeyError:
190 except KeyError:
204 try:
191 try:
205 # See if msg_or_header is just the header
192 # See if msg_or_header is just the header
206 h = msg_or_header['msg_id']
193 h = msg_or_header['msg_id']
207 except KeyError:
194 except KeyError:
208 raise
195 raise
209 else:
196 else:
210 h = msg_or_header
197 h = msg_or_header
211 if not isinstance(h, dict):
198 if not isinstance(h, dict):
212 h = dict(h)
199 h = dict(h)
213 return h
200 return h
214
201
215 class Session(Configurable):
202 class Session(Configurable):
216 """Object for handling serialization and sending of messages.
203 """Object for handling serialization and sending of messages.
217
204
218 The Session object handles building messages and sending them
205 The Session object handles building messages and sending them
219 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
206 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
220 other over the network via Session objects, and only need to work with the
207 other over the network via Session objects, and only need to work with the
221 dict-based IPython message spec. The Session will handle
208 dict-based IPython message spec. The Session will handle
222 serialization/deserialization, security, and metadata.
209 serialization/deserialization, security, and metadata.
223
210
224 Sessions support configurable serialiization via packer/unpacker traits,
211 Sessions support configurable serialiization via packer/unpacker traits,
225 and signing with HMAC digests via the key/keyfile traits.
212 and signing with HMAC digests via the key/keyfile traits.
226
213
227 Parameters
214 Parameters
228 ----------
215 ----------
229
216
230 debug : bool
217 debug : bool
231 whether to trigger extra debugging statements
218 whether to trigger extra debugging statements
232 packer/unpacker : str : 'json', 'pickle' or import_string
219 packer/unpacker : str : 'json', 'pickle' or import_string
233 importstrings for methods to serialize message parts. If just
220 importstrings for methods to serialize message parts. If just
234 'json' or 'pickle', predefined JSON and pickle packers will be used.
221 'json' or 'pickle', predefined JSON and pickle packers will be used.
235 Otherwise, the entire importstring must be used.
222 Otherwise, the entire importstring must be used.
236
223
237 The functions must accept at least valid JSON input, and output *bytes*.
224 The functions must accept at least valid JSON input, and output *bytes*.
238
225
239 For example, to use msgpack:
226 For example, to use msgpack:
240 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
227 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
241 pack/unpack : callables
228 pack/unpack : callables
242 You can also set the pack/unpack callables for serialization directly.
229 You can also set the pack/unpack callables for serialization directly.
243 session : bytes
230 session : bytes
244 the ID of this Session object. The default is to generate a new UUID.
231 the ID of this Session object. The default is to generate a new UUID.
245 username : unicode
232 username : unicode
246 username added to message headers. The default is to ask the OS.
233 username added to message headers. The default is to ask the OS.
247 key : bytes
234 key : bytes
248 The key used to initialize an HMAC signature. If unset, messages
235 The key used to initialize an HMAC signature. If unset, messages
249 will not be signed or checked.
236 will not be signed or checked.
250 keyfile : filepath
237 keyfile : filepath
251 The file containing a key. If this is set, `key` will be initialized
238 The file containing a key. If this is set, `key` will be initialized
252 to the contents of the file.
239 to the contents of the file.
253
240
254 """
241 """
255
242
256 debug=Bool(False, config=True, help="""Debug output in the Session""")
243 debug=Bool(False, config=True, help="""Debug output in the Session""")
257
244
258 packer = DottedObjectName('json',config=True,
245 packer = DottedObjectName('json',config=True,
259 help="""The name of the packer for serializing messages.
246 help="""The name of the packer for serializing messages.
260 Should be one of 'json', 'pickle', or an import name
247 Should be one of 'json', 'pickle', or an import name
261 for a custom callable serializer.""")
248 for a custom callable serializer.""")
262 def _packer_changed(self, name, old, new):
249 def _packer_changed(self, name, old, new):
263 if new.lower() == 'json':
250 if new.lower() == 'json':
264 self.pack = json_packer
251 self.pack = json_packer
265 self.unpack = json_unpacker
252 self.unpack = json_unpacker
266 self.unpacker = new
253 self.unpacker = new
267 elif new.lower() == 'pickle':
254 elif new.lower() == 'pickle':
268 self.pack = pickle_packer
255 self.pack = pickle_packer
269 self.unpack = pickle_unpacker
256 self.unpack = pickle_unpacker
270 self.unpacker = new
257 self.unpacker = new
271 else:
258 else:
272 self.pack = import_item(str(new))
259 self.pack = import_item(str(new))
273
260
274 unpacker = DottedObjectName('json', config=True,
261 unpacker = DottedObjectName('json', config=True,
275 help="""The name of the unpacker for unserializing messages.
262 help="""The name of the unpacker for unserializing messages.
276 Only used with custom functions for `packer`.""")
263 Only used with custom functions for `packer`.""")
277 def _unpacker_changed(self, name, old, new):
264 def _unpacker_changed(self, name, old, new):
278 if new.lower() == 'json':
265 if new.lower() == 'json':
279 self.pack = json_packer
266 self.pack = json_packer
280 self.unpack = json_unpacker
267 self.unpack = json_unpacker
281 self.packer = new
268 self.packer = new
282 elif new.lower() == 'pickle':
269 elif new.lower() == 'pickle':
283 self.pack = pickle_packer
270 self.pack = pickle_packer
284 self.unpack = pickle_unpacker
271 self.unpack = pickle_unpacker
285 self.packer = new
272 self.packer = new
286 else:
273 else:
287 self.unpack = import_item(str(new))
274 self.unpack = import_item(str(new))
288
275
289 session = CUnicode(u'', config=True,
276 session = CUnicode(u'', config=True,
290 help="""The UUID identifying this session.""")
277 help="""The UUID identifying this session.""")
291 def _session_default(self):
278 def _session_default(self):
292 u = unicode_type(uuid.uuid4())
279 u = unicode_type(uuid.uuid4())
293 self.bsession = u.encode('ascii')
280 self.bsession = u.encode('ascii')
294 return u
281 return u
295
282
296 def _session_changed(self, name, old, new):
283 def _session_changed(self, name, old, new):
297 self.bsession = self.session.encode('ascii')
284 self.bsession = self.session.encode('ascii')
298
285
299 # bsession is the session as bytes
286 # bsession is the session as bytes
300 bsession = CBytes(b'')
287 bsession = CBytes(b'')
301
288
302 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
289 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
303 help="""Username for the Session. Default is your system username.""",
290 help="""Username for the Session. Default is your system username.""",
304 config=True)
291 config=True)
305
292
306 metadata = Dict({}, config=True,
293 metadata = Dict({}, config=True,
307 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
294 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
308
295
309 # message signature related traits:
296 # message signature related traits:
310
297
311 key = CBytes(b'', config=True,
298 key = CBytes(b'', config=True,
312 help="""execution key, for extra authentication.""")
299 help="""execution key, for extra authentication.""")
313 def _key_changed(self, name, old, new):
300 def _key_changed(self, name, old, new):
314 if new:
301 if new:
315 self.auth = hmac.HMAC(new, digestmod=self.digest_mod)
302 self.auth = hmac.HMAC(new, digestmod=self.digest_mod)
316 else:
303 else:
317 self.auth = None
304 self.auth = None
318
305
319 signature_scheme = Unicode('hmac-sha256', config=True,
306 signature_scheme = Unicode('hmac-sha256', config=True,
320 help="""The digest scheme used to construct the message signatures.
307 help="""The digest scheme used to construct the message signatures.
321 Must have the form 'hmac-HASH'.""")
308 Must have the form 'hmac-HASH'.""")
322 def _signature_scheme_changed(self, name, old, new):
309 def _signature_scheme_changed(self, name, old, new):
323 if not new.startswith('hmac-'):
310 if not new.startswith('hmac-'):
324 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
311 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
325 hash_name = new.split('-', 1)[1]
312 hash_name = new.split('-', 1)[1]
326 try:
313 try:
327 self.digest_mod = getattr(hashlib, hash_name)
314 self.digest_mod = getattr(hashlib, hash_name)
328 except AttributeError:
315 except AttributeError:
329 raise TraitError("hashlib has no such attribute: %s" % hash_name)
316 raise TraitError("hashlib has no such attribute: %s" % hash_name)
330
317
331 digest_mod = Any()
318 digest_mod = Any()
332 def _digest_mod_default(self):
319 def _digest_mod_default(self):
333 return hashlib.sha256
320 return hashlib.sha256
334
321
335 auth = Instance(hmac.HMAC)
322 auth = Instance(hmac.HMAC)
336
323
337 digest_history = Set()
324 digest_history = Set()
338 digest_history_size = Integer(2**16, config=True,
325 digest_history_size = Integer(2**16, config=True,
339 help="""The maximum number of digests to remember.
326 help="""The maximum number of digests to remember.
340
327
341 The digest history will be culled when it exceeds this value.
328 The digest history will be culled when it exceeds this value.
342 """
329 """
343 )
330 )
344
331
345 keyfile = Unicode('', config=True,
332 keyfile = Unicode('', config=True,
346 help="""path to file containing execution key.""")
333 help="""path to file containing execution key.""")
347 def _keyfile_changed(self, name, old, new):
334 def _keyfile_changed(self, name, old, new):
348 with open(new, 'rb') as f:
335 with open(new, 'rb') as f:
349 self.key = f.read().strip()
336 self.key = f.read().strip()
350
337
351 # for protecting against sends from forks
338 # for protecting against sends from forks
352 pid = Integer()
339 pid = Integer()
353
340
354 # serialization traits:
341 # serialization traits:
355
342
356 pack = Any(default_packer) # the actual packer function
343 pack = Any(default_packer) # the actual packer function
357 def _pack_changed(self, name, old, new):
344 def _pack_changed(self, name, old, new):
358 if not callable(new):
345 if not callable(new):
359 raise TypeError("packer must be callable, not %s"%type(new))
346 raise TypeError("packer must be callable, not %s"%type(new))
360
347
361 unpack = Any(default_unpacker) # the actual packer function
348 unpack = Any(default_unpacker) # the actual packer function
362 def _unpack_changed(self, name, old, new):
349 def _unpack_changed(self, name, old, new):
363 # unpacker is not checked - it is assumed to be
350 # unpacker is not checked - it is assumed to be
364 if not callable(new):
351 if not callable(new):
365 raise TypeError("unpacker must be callable, not %s"%type(new))
352 raise TypeError("unpacker must be callable, not %s"%type(new))
366
353
367 # thresholds:
354 # thresholds:
368 copy_threshold = Integer(2**16, config=True,
355 copy_threshold = Integer(2**16, config=True,
369 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
356 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
370 buffer_threshold = Integer(MAX_BYTES, config=True,
357 buffer_threshold = Integer(MAX_BYTES, config=True,
371 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
358 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
372 item_threshold = Integer(MAX_ITEMS, config=True,
359 item_threshold = Integer(MAX_ITEMS, config=True,
373 help="""The maximum number of items for a container to be introspected for custom serialization.
360 help="""The maximum number of items for a container to be introspected for custom serialization.
374 Containers larger than this are pickled outright.
361 Containers larger than this are pickled outright.
375 """
362 """
376 )
363 )
377
364
378
365
379 def __init__(self, **kwargs):
366 def __init__(self, **kwargs):
380 """create a Session object
367 """create a Session object
381
368
382 Parameters
369 Parameters
383 ----------
370 ----------
384
371
385 debug : bool
372 debug : bool
386 whether to trigger extra debugging statements
373 whether to trigger extra debugging statements
387 packer/unpacker : str : 'json', 'pickle' or import_string
374 packer/unpacker : str : 'json', 'pickle' or import_string
388 importstrings for methods to serialize message parts. If just
375 importstrings for methods to serialize message parts. If just
389 'json' or 'pickle', predefined JSON and pickle packers will be used.
376 'json' or 'pickle', predefined JSON and pickle packers will be used.
390 Otherwise, the entire importstring must be used.
377 Otherwise, the entire importstring must be used.
391
378
392 The functions must accept at least valid JSON input, and output
379 The functions must accept at least valid JSON input, and output
393 *bytes*.
380 *bytes*.
394
381
395 For example, to use msgpack:
382 For example, to use msgpack:
396 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
383 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
397 pack/unpack : callables
384 pack/unpack : callables
398 You can also set the pack/unpack callables for serialization
385 You can also set the pack/unpack callables for serialization
399 directly.
386 directly.
400 session : unicode (must be ascii)
387 session : unicode (must be ascii)
401 the ID of this Session object. The default is to generate a new
388 the ID of this Session object. The default is to generate a new
402 UUID.
389 UUID.
403 bsession : bytes
390 bsession : bytes
404 The session as bytes
391 The session as bytes
405 username : unicode
392 username : unicode
406 username added to message headers. The default is to ask the OS.
393 username added to message headers. The default is to ask the OS.
407 key : bytes
394 key : bytes
408 The key used to initialize an HMAC signature. If unset, messages
395 The key used to initialize an HMAC signature. If unset, messages
409 will not be signed or checked.
396 will not be signed or checked.
410 signature_scheme : str
397 signature_scheme : str
411 The message digest scheme. Currently must be of the form 'hmac-HASH',
398 The message digest scheme. Currently must be of the form 'hmac-HASH',
412 where 'HASH' is a hashing function available in Python's hashlib.
399 where 'HASH' is a hashing function available in Python's hashlib.
413 The default is 'hmac-sha256'.
400 The default is 'hmac-sha256'.
414 This is ignored if 'key' is empty.
401 This is ignored if 'key' is empty.
415 keyfile : filepath
402 keyfile : filepath
416 The file containing a key. If this is set, `key` will be
403 The file containing a key. If this is set, `key` will be
417 initialized to the contents of the file.
404 initialized to the contents of the file.
418 """
405 """
419 super(Session, self).__init__(**kwargs)
406 super(Session, self).__init__(**kwargs)
420 self._check_packers()
407 self._check_packers()
421 self.none = self.pack({})
408 self.none = self.pack({})
422 # ensure self._session_default() if necessary, so bsession is defined:
409 # ensure self._session_default() if necessary, so bsession is defined:
423 self.session
410 self.session
424 self.pid = os.getpid()
411 self.pid = os.getpid()
425
412
426 @property
413 @property
427 def msg_id(self):
414 def msg_id(self):
428 """always return new uuid"""
415 """always return new uuid"""
429 return str(uuid.uuid4())
416 return str(uuid.uuid4())
430
417
431 def _check_packers(self):
418 def _check_packers(self):
432 """check packers for datetime support."""
419 """check packers for datetime support."""
433 pack = self.pack
420 pack = self.pack
434 unpack = self.unpack
421 unpack = self.unpack
435
422
436 # check simple serialization
423 # check simple serialization
437 msg = dict(a=[1,'hi'])
424 msg = dict(a=[1,'hi'])
438 try:
425 try:
439 packed = pack(msg)
426 packed = pack(msg)
440 except Exception as e:
427 except Exception as e:
441 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
428 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
442 if self.packer == 'json':
429 if self.packer == 'json':
443 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
430 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
444 else:
431 else:
445 jsonmsg = ""
432 jsonmsg = ""
446 raise ValueError(
433 raise ValueError(
447 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
434 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
448 )
435 )
449
436
450 # ensure packed message is bytes
437 # ensure packed message is bytes
451 if not isinstance(packed, bytes):
438 if not isinstance(packed, bytes):
452 raise ValueError("message packed to %r, but bytes are required"%type(packed))
439 raise ValueError("message packed to %r, but bytes are required"%type(packed))
453
440
454 # check that unpack is pack's inverse
441 # check that unpack is pack's inverse
455 try:
442 try:
456 unpacked = unpack(packed)
443 unpacked = unpack(packed)
457 assert unpacked == msg
444 assert unpacked == msg
458 except Exception as e:
445 except Exception as e:
459 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
446 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
460 if self.packer == 'json':
447 if self.packer == 'json':
461 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
448 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
462 else:
449 else:
463 jsonmsg = ""
450 jsonmsg = ""
464 raise ValueError(
451 raise ValueError(
465 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
452 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
466 )
453 )
467
454
468 # check datetime support
455 # check datetime support
469 msg = dict(t=datetime.now())
456 msg = dict(t=datetime.now())
470 try:
457 try:
471 unpacked = unpack(pack(msg))
458 unpacked = unpack(pack(msg))
472 if isinstance(unpacked['t'], datetime):
459 if isinstance(unpacked['t'], datetime):
473 raise ValueError("Shouldn't deserialize to datetime")
460 raise ValueError("Shouldn't deserialize to datetime")
474 except Exception:
461 except Exception:
475 self.pack = lambda o: pack(squash_dates(o))
462 self.pack = lambda o: pack(squash_dates(o))
476 self.unpack = lambda s: unpack(s)
463 self.unpack = lambda s: unpack(s)
477
464
478 def msg_header(self, msg_type):
465 def msg_header(self, msg_type):
479 return msg_header(self.msg_id, msg_type, self.username, self.session)
466 return msg_header(self.msg_id, msg_type, self.username, self.session)
480
467
481 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
468 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
482 """Return the nested message dict.
469 """Return the nested message dict.
483
470
484 This format is different from what is sent over the wire. The
471 This format is different from what is sent over the wire. The
485 serialize/unserialize methods converts this nested message dict to the wire
472 serialize/unserialize methods converts this nested message dict to the wire
486 format, which is a list of message parts.
473 format, which is a list of message parts.
487 """
474 """
488 msg = {}
475 msg = {}
489 header = self.msg_header(msg_type) if header is None else header
476 header = self.msg_header(msg_type) if header is None else header
490 msg['header'] = header
477 msg['header'] = header
491 msg['msg_id'] = header['msg_id']
478 msg['msg_id'] = header['msg_id']
492 msg['msg_type'] = header['msg_type']
479 msg['msg_type'] = header['msg_type']
493 msg['parent_header'] = {} if parent is None else extract_header(parent)
480 msg['parent_header'] = {} if parent is None else extract_header(parent)
494 msg['content'] = {} if content is None else content
481 msg['content'] = {} if content is None else content
495 msg['metadata'] = self.metadata.copy()
482 msg['metadata'] = self.metadata.copy()
496 if metadata is not None:
483 if metadata is not None:
497 msg['metadata'].update(metadata)
484 msg['metadata'].update(metadata)
498 return msg
485 return msg
499
486
500 def sign(self, msg_list):
487 def sign(self, msg_list):
501 """Sign a message with HMAC digest. If no auth, return b''.
488 """Sign a message with HMAC digest. If no auth, return b''.
502
489
503 Parameters
490 Parameters
504 ----------
491 ----------
505 msg_list : list
492 msg_list : list
506 The [p_header,p_parent,p_content] part of the message list.
493 The [p_header,p_parent,p_content] part of the message list.
507 """
494 """
508 if self.auth is None:
495 if self.auth is None:
509 return b''
496 return b''
510 h = self.auth.copy()
497 h = self.auth.copy()
511 for m in msg_list:
498 for m in msg_list:
512 h.update(m)
499 h.update(m)
513 return str_to_bytes(h.hexdigest())
500 return str_to_bytes(h.hexdigest())
514
501
515 def serialize(self, msg, ident=None):
502 def serialize(self, msg, ident=None):
516 """Serialize the message components to bytes.
503 """Serialize the message components to bytes.
517
504
518 This is roughly the inverse of unserialize. The serialize/unserialize
505 This is roughly the inverse of unserialize. The serialize/unserialize
519 methods work with full message lists, whereas pack/unpack work with
506 methods work with full message lists, whereas pack/unpack work with
520 the individual message parts in the message list.
507 the individual message parts in the message list.
521
508
522 Parameters
509 Parameters
523 ----------
510 ----------
524 msg : dict or Message
511 msg : dict or Message
525 The nexted message dict as returned by the self.msg method.
512 The nexted message dict as returned by the self.msg method.
526
513
527 Returns
514 Returns
528 -------
515 -------
529 msg_list : list
516 msg_list : list
530 The list of bytes objects to be sent with the format::
517 The list of bytes objects to be sent with the format::
531
518
532 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
519 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
533 p_metadata, p_content, buffer1, buffer2, ...]
520 p_metadata, p_content, buffer1, buffer2, ...]
534
521
535 In this list, the ``p_*`` entities are the packed or serialized
522 In this list, the ``p_*`` entities are the packed or serialized
536 versions, so if JSON is used, these are utf8 encoded JSON strings.
523 versions, so if JSON is used, these are utf8 encoded JSON strings.
537 """
524 """
538 content = msg.get('content', {})
525 content = msg.get('content', {})
539 if content is None:
526 if content is None:
540 content = self.none
527 content = self.none
541 elif isinstance(content, dict):
528 elif isinstance(content, dict):
542 content = self.pack(content)
529 content = self.pack(content)
543 elif isinstance(content, bytes):
530 elif isinstance(content, bytes):
544 # content is already packed, as in a relayed message
531 # content is already packed, as in a relayed message
545 pass
532 pass
546 elif isinstance(content, unicode_type):
533 elif isinstance(content, unicode_type):
547 # should be bytes, but JSON often spits out unicode
534 # should be bytes, but JSON often spits out unicode
548 content = content.encode('utf8')
535 content = content.encode('utf8')
549 else:
536 else:
550 raise TypeError("Content incorrect type: %s"%type(content))
537 raise TypeError("Content incorrect type: %s"%type(content))
551
538
552 real_message = [self.pack(msg['header']),
539 real_message = [self.pack(msg['header']),
553 self.pack(msg['parent_header']),
540 self.pack(msg['parent_header']),
554 self.pack(msg['metadata']),
541 self.pack(msg['metadata']),
555 content,
542 content,
556 ]
543 ]
557
544
558 to_send = []
545 to_send = []
559
546
560 if isinstance(ident, list):
547 if isinstance(ident, list):
561 # accept list of idents
548 # accept list of idents
562 to_send.extend(ident)
549 to_send.extend(ident)
563 elif ident is not None:
550 elif ident is not None:
564 to_send.append(ident)
551 to_send.append(ident)
565 to_send.append(DELIM)
552 to_send.append(DELIM)
566
553
567 signature = self.sign(real_message)
554 signature = self.sign(real_message)
568 to_send.append(signature)
555 to_send.append(signature)
569
556
570 to_send.extend(real_message)
557 to_send.extend(real_message)
571
558
572 return to_send
559 return to_send
573
560
574 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
561 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
575 buffers=None, track=False, header=None, metadata=None):
562 buffers=None, track=False, header=None, metadata=None):
576 """Build and send a message via stream or socket.
563 """Build and send a message via stream or socket.
577
564
578 The message format used by this function internally is as follows:
565 The message format used by this function internally is as follows:
579
566
580 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
567 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
581 buffer1,buffer2,...]
568 buffer1,buffer2,...]
582
569
583 The serialize/unserialize methods convert the nested message dict into this
570 The serialize/unserialize methods convert the nested message dict into this
584 format.
571 format.
585
572
586 Parameters
573 Parameters
587 ----------
574 ----------
588
575
589 stream : zmq.Socket or ZMQStream
576 stream : zmq.Socket or ZMQStream
590 The socket-like object used to send the data.
577 The socket-like object used to send the data.
591 msg_or_type : str or Message/dict
578 msg_or_type : str or Message/dict
592 Normally, msg_or_type will be a msg_type unless a message is being
579 Normally, msg_or_type will be a msg_type unless a message is being
593 sent more than once. If a header is supplied, this can be set to
580 sent more than once. If a header is supplied, this can be set to
594 None and the msg_type will be pulled from the header.
581 None and the msg_type will be pulled from the header.
595
582
596 content : dict or None
583 content : dict or None
597 The content of the message (ignored if msg_or_type is a message).
584 The content of the message (ignored if msg_or_type is a message).
598 header : dict or None
585 header : dict or None
599 The header dict for the message (ignored if msg_to_type is a message).
586 The header dict for the message (ignored if msg_to_type is a message).
600 parent : Message or dict or None
587 parent : Message or dict or None
601 The parent or parent header describing the parent of this message
588 The parent or parent header describing the parent of this message
602 (ignored if msg_or_type is a message).
589 (ignored if msg_or_type is a message).
603 ident : bytes or list of bytes
590 ident : bytes or list of bytes
604 The zmq.IDENTITY routing path.
591 The zmq.IDENTITY routing path.
605 metadata : dict or None
592 metadata : dict or None
606 The metadata describing the message
593 The metadata describing the message
607 buffers : list or None
594 buffers : list or None
608 The already-serialized buffers to be appended to the message.
595 The already-serialized buffers to be appended to the message.
609 track : bool
596 track : bool
610 Whether to track. Only for use with Sockets, because ZMQStream
597 Whether to track. Only for use with Sockets, because ZMQStream
611 objects cannot track messages.
598 objects cannot track messages.
612
599
613
600
614 Returns
601 Returns
615 -------
602 -------
616 msg : dict
603 msg : dict
617 The constructed message.
604 The constructed message.
618 """
605 """
619 if not isinstance(stream, zmq.Socket):
606 if not isinstance(stream, zmq.Socket):
620 # ZMQStreams and dummy sockets do not support tracking.
607 # ZMQStreams and dummy sockets do not support tracking.
621 track = False
608 track = False
622
609
623 if isinstance(msg_or_type, (Message, dict)):
610 if isinstance(msg_or_type, (Message, dict)):
624 # We got a Message or message dict, not a msg_type so don't
611 # We got a Message or message dict, not a msg_type so don't
625 # build a new Message.
612 # build a new Message.
626 msg = msg_or_type
613 msg = msg_or_type
627 else:
614 else:
628 msg = self.msg(msg_or_type, content=content, parent=parent,
615 msg = self.msg(msg_or_type, content=content, parent=parent,
629 header=header, metadata=metadata)
616 header=header, metadata=metadata)
630 if not os.getpid() == self.pid:
617 if not os.getpid() == self.pid:
631 io.rprint("WARNING: attempted to send message from fork")
618 io.rprint("WARNING: attempted to send message from fork")
632 io.rprint(msg)
619 io.rprint(msg)
633 return
620 return
634 buffers = [] if buffers is None else buffers
621 buffers = [] if buffers is None else buffers
635 to_send = self.serialize(msg, ident)
622 to_send = self.serialize(msg, ident)
636 to_send.extend(buffers)
623 to_send.extend(buffers)
637 longest = max([ len(s) for s in to_send ])
624 longest = max([ len(s) for s in to_send ])
638 copy = (longest < self.copy_threshold)
625 copy = (longest < self.copy_threshold)
639
626
640 if buffers and track and not copy:
627 if buffers and track and not copy:
641 # only really track when we are doing zero-copy buffers
628 # only really track when we are doing zero-copy buffers
642 tracker = stream.send_multipart(to_send, copy=False, track=True)
629 tracker = stream.send_multipart(to_send, copy=False, track=True)
643 else:
630 else:
644 # use dummy tracker, which will be done immediately
631 # use dummy tracker, which will be done immediately
645 tracker = DONE
632 tracker = DONE
646 stream.send_multipart(to_send, copy=copy)
633 stream.send_multipart(to_send, copy=copy)
647
634
648 if self.debug:
635 if self.debug:
649 pprint.pprint(msg)
636 pprint.pprint(msg)
650 pprint.pprint(to_send)
637 pprint.pprint(to_send)
651 pprint.pprint(buffers)
638 pprint.pprint(buffers)
652
639
653 msg['tracker'] = tracker
640 msg['tracker'] = tracker
654
641
655 return msg
642 return msg
656
643
657 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
644 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
658 """Send a raw message via ident path.
645 """Send a raw message via ident path.
659
646
660 This method is used to send a already serialized message.
647 This method is used to send a already serialized message.
661
648
662 Parameters
649 Parameters
663 ----------
650 ----------
664 stream : ZMQStream or Socket
651 stream : ZMQStream or Socket
665 The ZMQ stream or socket to use for sending the message.
652 The ZMQ stream or socket to use for sending the message.
666 msg_list : list
653 msg_list : list
667 The serialized list of messages to send. This only includes the
654 The serialized list of messages to send. This only includes the
668 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
655 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
669 the message.
656 the message.
670 ident : ident or list
657 ident : ident or list
671 A single ident or a list of idents to use in sending.
658 A single ident or a list of idents to use in sending.
672 """
659 """
673 to_send = []
660 to_send = []
674 if isinstance(ident, bytes):
661 if isinstance(ident, bytes):
675 ident = [ident]
662 ident = [ident]
676 if ident is not None:
663 if ident is not None:
677 to_send.extend(ident)
664 to_send.extend(ident)
678
665
679 to_send.append(DELIM)
666 to_send.append(DELIM)
680 to_send.append(self.sign(msg_list))
667 to_send.append(self.sign(msg_list))
681 to_send.extend(msg_list)
668 to_send.extend(msg_list)
682 stream.send_multipart(to_send, flags, copy=copy)
669 stream.send_multipart(to_send, flags, copy=copy)
683
670
684 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
671 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
685 """Receive and unpack a message.
672 """Receive and unpack a message.
686
673
687 Parameters
674 Parameters
688 ----------
675 ----------
689 socket : ZMQStream or Socket
676 socket : ZMQStream or Socket
690 The socket or stream to use in receiving.
677 The socket or stream to use in receiving.
691
678
692 Returns
679 Returns
693 -------
680 -------
694 [idents], msg
681 [idents], msg
695 [idents] is a list of idents and msg is a nested message dict of
682 [idents] is a list of idents and msg is a nested message dict of
696 same format as self.msg returns.
683 same format as self.msg returns.
697 """
684 """
698 if isinstance(socket, ZMQStream):
685 if isinstance(socket, ZMQStream):
699 socket = socket.socket
686 socket = socket.socket
700 try:
687 try:
701 msg_list = socket.recv_multipart(mode, copy=copy)
688 msg_list = socket.recv_multipart(mode, copy=copy)
702 except zmq.ZMQError as e:
689 except zmq.ZMQError as e:
703 if e.errno == zmq.EAGAIN:
690 if e.errno == zmq.EAGAIN:
704 # We can convert EAGAIN to None as we know in this case
691 # We can convert EAGAIN to None as we know in this case
705 # recv_multipart won't return None.
692 # recv_multipart won't return None.
706 return None,None
693 return None,None
707 else:
694 else:
708 raise
695 raise
709 # split multipart message into identity list and message dict
696 # split multipart message into identity list and message dict
710 # invalid large messages can cause very expensive string comparisons
697 # invalid large messages can cause very expensive string comparisons
711 idents, msg_list = self.feed_identities(msg_list, copy)
698 idents, msg_list = self.feed_identities(msg_list, copy)
712 try:
699 try:
713 return idents, self.unserialize(msg_list, content=content, copy=copy)
700 return idents, self.unserialize(msg_list, content=content, copy=copy)
714 except Exception as e:
701 except Exception as e:
715 # TODO: handle it
702 # TODO: handle it
716 raise e
703 raise e
717
704
718 def feed_identities(self, msg_list, copy=True):
705 def feed_identities(self, msg_list, copy=True):
719 """Split the identities from the rest of the message.
706 """Split the identities from the rest of the message.
720
707
721 Feed until DELIM is reached, then return the prefix as idents and
708 Feed until DELIM is reached, then return the prefix as idents and
722 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
709 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
723 but that would be silly.
710 but that would be silly.
724
711
725 Parameters
712 Parameters
726 ----------
713 ----------
727 msg_list : a list of Message or bytes objects
714 msg_list : a list of Message or bytes objects
728 The message to be split.
715 The message to be split.
729 copy : bool
716 copy : bool
730 flag determining whether the arguments are bytes or Messages
717 flag determining whether the arguments are bytes or Messages
731
718
732 Returns
719 Returns
733 -------
720 -------
734 (idents, msg_list) : two lists
721 (idents, msg_list) : two lists
735 idents will always be a list of bytes, each of which is a ZMQ
722 idents will always be a list of bytes, each of which is a ZMQ
736 identity. msg_list will be a list of bytes or zmq.Messages of the
723 identity. msg_list will be a list of bytes or zmq.Messages of the
737 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
724 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
738 should be unpackable/unserializable via self.unserialize at this
725 should be unpackable/unserializable via self.unserialize at this
739 point.
726 point.
740 """
727 """
741 if copy:
728 if copy:
742 idx = msg_list.index(DELIM)
729 idx = msg_list.index(DELIM)
743 return msg_list[:idx], msg_list[idx+1:]
730 return msg_list[:idx], msg_list[idx+1:]
744 else:
731 else:
745 failed = True
732 failed = True
746 for idx,m in enumerate(msg_list):
733 for idx,m in enumerate(msg_list):
747 if m.bytes == DELIM:
734 if m.bytes == DELIM:
748 failed = False
735 failed = False
749 break
736 break
750 if failed:
737 if failed:
751 raise ValueError("DELIM not in msg_list")
738 raise ValueError("DELIM not in msg_list")
752 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
739 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
753 return [m.bytes for m in idents], msg_list
740 return [m.bytes for m in idents], msg_list
754
741
755 def _add_digest(self, signature):
742 def _add_digest(self, signature):
756 """add a digest to history to protect against replay attacks"""
743 """add a digest to history to protect against replay attacks"""
757 if self.digest_history_size == 0:
744 if self.digest_history_size == 0:
758 # no history, never add digests
745 # no history, never add digests
759 return
746 return
760
747
761 self.digest_history.add(signature)
748 self.digest_history.add(signature)
762 if len(self.digest_history) > self.digest_history_size:
749 if len(self.digest_history) > self.digest_history_size:
763 # threshold reached, cull 10%
750 # threshold reached, cull 10%
764 self._cull_digest_history()
751 self._cull_digest_history()
765
752
766 def _cull_digest_history(self):
753 def _cull_digest_history(self):
767 """cull the digest history
754 """cull the digest history
768
755
769 Removes a randomly selected 10% of the digest history
756 Removes a randomly selected 10% of the digest history
770 """
757 """
771 current = len(self.digest_history)
758 current = len(self.digest_history)
772 n_to_cull = max(int(current // 10), current - self.digest_history_size)
759 n_to_cull = max(int(current // 10), current - self.digest_history_size)
773 if n_to_cull >= current:
760 if n_to_cull >= current:
774 self.digest_history = set()
761 self.digest_history = set()
775 return
762 return
776 to_cull = random.sample(self.digest_history, n_to_cull)
763 to_cull = random.sample(self.digest_history, n_to_cull)
777 self.digest_history.difference_update(to_cull)
764 self.digest_history.difference_update(to_cull)
778
765
779 def unserialize(self, msg_list, content=True, copy=True):
766 def unserialize(self, msg_list, content=True, copy=True):
780 """Unserialize a msg_list to a nested message dict.
767 """Unserialize a msg_list to a nested message dict.
781
768
782 This is roughly the inverse of serialize. The serialize/unserialize
769 This is roughly the inverse of serialize. The serialize/unserialize
783 methods work with full message lists, whereas pack/unpack work with
770 methods work with full message lists, whereas pack/unpack work with
784 the individual message parts in the message list.
771 the individual message parts in the message list.
785
772
786 Parameters
773 Parameters
787 ----------
774 ----------
788 msg_list : list of bytes or Message objects
775 msg_list : list of bytes or Message objects
789 The list of message parts of the form [HMAC,p_header,p_parent,
776 The list of message parts of the form [HMAC,p_header,p_parent,
790 p_metadata,p_content,buffer1,buffer2,...].
777 p_metadata,p_content,buffer1,buffer2,...].
791 content : bool (True)
778 content : bool (True)
792 Whether to unpack the content dict (True), or leave it packed
779 Whether to unpack the content dict (True), or leave it packed
793 (False).
780 (False).
794 copy : bool (True)
781 copy : bool (True)
795 Whether to return the bytes (True), or the non-copying Message
782 Whether to return the bytes (True), or the non-copying Message
796 object in each place (False).
783 object in each place (False).
797
784
798 Returns
785 Returns
799 -------
786 -------
800 msg : dict
787 msg : dict
801 The nested message dict with top-level keys [header, parent_header,
788 The nested message dict with top-level keys [header, parent_header,
802 content, buffers].
789 content, buffers].
803 """
790 """
804 minlen = 5
791 minlen = 5
805 message = {}
792 message = {}
806 if not copy:
793 if not copy:
807 for i in range(minlen):
794 for i in range(minlen):
808 msg_list[i] = msg_list[i].bytes
795 msg_list[i] = msg_list[i].bytes
809 if self.auth is not None:
796 if self.auth is not None:
810 signature = msg_list[0]
797 signature = msg_list[0]
811 if not signature:
798 if not signature:
812 raise ValueError("Unsigned Message")
799 raise ValueError("Unsigned Message")
813 if signature in self.digest_history:
800 if signature in self.digest_history:
814 raise ValueError("Duplicate Signature: %r" % signature)
801 raise ValueError("Duplicate Signature: %r" % signature)
815 self._add_digest(signature)
802 self._add_digest(signature)
816 check = self.sign(msg_list[1:5])
803 check = self.sign(msg_list[1:5])
817 if not signature == check:
804 if not signature == check:
818 raise ValueError("Invalid Signature: %r" % signature)
805 raise ValueError("Invalid Signature: %r" % signature)
819 if not len(msg_list) >= minlen:
806 if not len(msg_list) >= minlen:
820 raise TypeError("malformed message, must have at least %i elements"%minlen)
807 raise TypeError("malformed message, must have at least %i elements"%minlen)
821 header = self.unpack(msg_list[1])
808 header = self.unpack(msg_list[1])
822 message['header'] = extract_dates(header)
809 message['header'] = extract_dates(header)
823 message['msg_id'] = header['msg_id']
810 message['msg_id'] = header['msg_id']
824 message['msg_type'] = header['msg_type']
811 message['msg_type'] = header['msg_type']
825 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
812 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
826 message['metadata'] = self.unpack(msg_list[3])
813 message['metadata'] = self.unpack(msg_list[3])
827 if content:
814 if content:
828 message['content'] = self.unpack(msg_list[4])
815 message['content'] = self.unpack(msg_list[4])
829 else:
816 else:
830 message['content'] = msg_list[4]
817 message['content'] = msg_list[4]
831
818
832 message['buffers'] = msg_list[5:]
819 message['buffers'] = msg_list[5:]
833 return message
820 return message
834
821
835 def test_msg2obj():
822 def test_msg2obj():
836 am = dict(x=1)
823 am = dict(x=1)
837 ao = Message(am)
824 ao = Message(am)
838 assert ao.x == am['x']
825 assert ao.x == am['x']
839
826
840 am['y'] = dict(z=1)
827 am['y'] = dict(z=1)
841 ao = Message(am)
828 ao = Message(am)
842 assert ao.y.z == am['y']['z']
829 assert ao.y.z == am['y']['z']
843
830
844 k1, k2 = 'y', 'z'
831 k1, k2 = 'y', 'z'
845 assert ao[k1][k2] == am[k1][k2]
832 assert ao[k1][k2] == am[k1][k2]
846
833
847 am2 = dict(ao)
834 am2 = dict(ao)
848 assert am['x'] == am2['x']
835 assert am['x'] == am2['x']
849 assert am['y']['z'] == am2['y']['z']
836 assert am['y']['z'] == am2['y']['z']
850
837
General Comments 0
You need to be logged in to leave comments. Login now