##// END OF EJS Templates
Merge pull request #6114 from ivanov/update-digest-api...
Paul Ivanov -
r17360:71fce017 merge
parent child Browse files
Show More
@@ -1,857 +1,865 b''
1 """Session object for building, serializing, sending, and receiving messages in
1 """Session object for building, serializing, sending, and receiving messages in
2 IPython. The Session object supports serialization, HMAC signatures, and
2 IPython. The Session object supports serialization, HMAC signatures, and
3 metadata on messages.
3 metadata on messages.
4
4
5 Also defined here are utilities for working with Sessions:
5 Also defined here are utilities for working with Sessions:
6 * A SessionFactory to be used as a base class for configurables that work with
6 * A SessionFactory to be used as a base class for configurables that work with
7 Sessions.
7 Sessions.
8 * A Message object for convenience that allows attribute-access to the msg dict.
8 * A Message object for convenience that allows attribute-access to the msg dict.
9 """
9 """
10
10
11 # Copyright (c) IPython Development Team.
11 # Copyright (c) IPython Development Team.
12 # Distributed under the terms of the Modified BSD License.
12 # Distributed under the terms of the Modified BSD License.
13
13
14 import hashlib
14 import hashlib
15 import hmac
15 import hmac
16 import logging
16 import logging
17 import os
17 import os
18 import pprint
18 import pprint
19 import random
19 import random
20 import uuid
20 import uuid
21 from datetime import datetime
21 from datetime import datetime
22
22
23 try:
23 try:
24 import cPickle
24 import cPickle
25 pickle = cPickle
25 pickle = cPickle
26 except:
26 except:
27 cPickle = None
27 cPickle = None
28 import pickle
28 import pickle
29
29
30 try:
31 # We are using compare_digest to limit the surface of timing attacks
32 from hmac import compare_digest
33 except ImportError:
34 # Python < 2.7.7: When digests don't match no feedback is provided,
35 # limiting the surface of attack
36 def compare_digest(a,b): return a == b
37
30 import zmq
38 import zmq
31 from zmq.utils import jsonapi
39 from zmq.utils import jsonapi
32 from zmq.eventloop.ioloop import IOLoop
40 from zmq.eventloop.ioloop import IOLoop
33 from zmq.eventloop.zmqstream import ZMQStream
41 from zmq.eventloop.zmqstream import ZMQStream
34
42
35 from IPython.core.release import kernel_protocol_version, kernel_protocol_version_info
43 from IPython.core.release import kernel_protocol_version
36 from IPython.config.configurable import Configurable, LoggingConfigurable
44 from IPython.config.configurable import Configurable, LoggingConfigurable
37 from IPython.utils import io
45 from IPython.utils import io
38 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
39 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
47 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
40 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
48 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
41 iteritems)
49 iteritems)
42 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
43 DottedObjectName, CUnicode, Dict, Integer,
51 DottedObjectName, CUnicode, Dict, Integer,
44 TraitError,
52 TraitError,
45 )
53 )
46 from IPython.utils.pickleutil import PICKLE_PROTOCOL
54 from IPython.utils.pickleutil import PICKLE_PROTOCOL
47 from IPython.kernel.adapter import adapt
55 from IPython.kernel.adapter import adapt
48 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
56 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
49
57
50 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
51 # utility functions
59 # utility functions
52 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
53
61
54 def squash_unicode(obj):
62 def squash_unicode(obj):
55 """coerce unicode back to bytestrings."""
63 """coerce unicode back to bytestrings."""
56 if isinstance(obj,dict):
64 if isinstance(obj,dict):
57 for key in obj.keys():
65 for key in obj.keys():
58 obj[key] = squash_unicode(obj[key])
66 obj[key] = squash_unicode(obj[key])
59 if isinstance(key, unicode_type):
67 if isinstance(key, unicode_type):
60 obj[squash_unicode(key)] = obj.pop(key)
68 obj[squash_unicode(key)] = obj.pop(key)
61 elif isinstance(obj, list):
69 elif isinstance(obj, list):
62 for i,v in enumerate(obj):
70 for i,v in enumerate(obj):
63 obj[i] = squash_unicode(v)
71 obj[i] = squash_unicode(v)
64 elif isinstance(obj, unicode_type):
72 elif isinstance(obj, unicode_type):
65 obj = obj.encode('utf8')
73 obj = obj.encode('utf8')
66 return obj
74 return obj
67
75
68 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
69 # globals and defaults
77 # globals and defaults
70 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
71
79
72 # ISO8601-ify datetime objects
80 # ISO8601-ify datetime objects
73 # allow unicode
81 # allow unicode
74 # disallow nan, because it's not actually valid JSON
82 # disallow nan, because it's not actually valid JSON
75 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
83 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
76 ensure_ascii=False, allow_nan=False,
84 ensure_ascii=False, allow_nan=False,
77 )
85 )
78 json_unpacker = lambda s: jsonapi.loads(s)
86 json_unpacker = lambda s: jsonapi.loads(s)
79
87
80 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
88 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
81 pickle_unpacker = pickle.loads
89 pickle_unpacker = pickle.loads
82
90
83 default_packer = json_packer
91 default_packer = json_packer
84 default_unpacker = json_unpacker
92 default_unpacker = json_unpacker
85
93
86 DELIM = b"<IDS|MSG>"
94 DELIM = b"<IDS|MSG>"
87 # singleton dummy tracker, which will always report as done
95 # singleton dummy tracker, which will always report as done
88 DONE = zmq.MessageTracker()
96 DONE = zmq.MessageTracker()
89
97
90 #-----------------------------------------------------------------------------
98 #-----------------------------------------------------------------------------
91 # Mixin tools for apps that use Sessions
99 # Mixin tools for apps that use Sessions
92 #-----------------------------------------------------------------------------
100 #-----------------------------------------------------------------------------
93
101
94 session_aliases = dict(
102 session_aliases = dict(
95 ident = 'Session.session',
103 ident = 'Session.session',
96 user = 'Session.username',
104 user = 'Session.username',
97 keyfile = 'Session.keyfile',
105 keyfile = 'Session.keyfile',
98 )
106 )
99
107
100 session_flags = {
108 session_flags = {
101 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
109 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
102 'keyfile' : '' }},
110 'keyfile' : '' }},
103 """Use HMAC digests for authentication of messages.
111 """Use HMAC digests for authentication of messages.
104 Setting this flag will generate a new UUID to use as the HMAC key.
112 Setting this flag will generate a new UUID to use as the HMAC key.
105 """),
113 """),
106 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
114 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
107 """Don't authenticate messages."""),
115 """Don't authenticate messages."""),
108 }
116 }
109
117
110 def default_secure(cfg):
118 def default_secure(cfg):
111 """Set the default behavior for a config environment to be secure.
119 """Set the default behavior for a config environment to be secure.
112
120
113 If Session.key/keyfile have not been set, set Session.key to
121 If Session.key/keyfile have not been set, set Session.key to
114 a new random UUID.
122 a new random UUID.
115 """
123 """
116
124
117 if 'Session' in cfg:
125 if 'Session' in cfg:
118 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
126 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
119 return
127 return
120 # key/keyfile not specified, generate new UUID:
128 # key/keyfile not specified, generate new UUID:
121 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
129 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
122
130
123
131
124 #-----------------------------------------------------------------------------
132 #-----------------------------------------------------------------------------
125 # Classes
133 # Classes
126 #-----------------------------------------------------------------------------
134 #-----------------------------------------------------------------------------
127
135
128 class SessionFactory(LoggingConfigurable):
136 class SessionFactory(LoggingConfigurable):
129 """The Base class for configurables that have a Session, Context, logger,
137 """The Base class for configurables that have a Session, Context, logger,
130 and IOLoop.
138 and IOLoop.
131 """
139 """
132
140
133 logname = Unicode('')
141 logname = Unicode('')
134 def _logname_changed(self, name, old, new):
142 def _logname_changed(self, name, old, new):
135 self.log = logging.getLogger(new)
143 self.log = logging.getLogger(new)
136
144
137 # not configurable:
145 # not configurable:
138 context = Instance('zmq.Context')
146 context = Instance('zmq.Context')
139 def _context_default(self):
147 def _context_default(self):
140 return zmq.Context.instance()
148 return zmq.Context.instance()
141
149
142 session = Instance('IPython.kernel.zmq.session.Session')
150 session = Instance('IPython.kernel.zmq.session.Session')
143
151
144 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
152 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
145 def _loop_default(self):
153 def _loop_default(self):
146 return IOLoop.instance()
154 return IOLoop.instance()
147
155
148 def __init__(self, **kwargs):
156 def __init__(self, **kwargs):
149 super(SessionFactory, self).__init__(**kwargs)
157 super(SessionFactory, self).__init__(**kwargs)
150
158
151 if self.session is None:
159 if self.session is None:
152 # construct the session
160 # construct the session
153 self.session = Session(**kwargs)
161 self.session = Session(**kwargs)
154
162
155
163
156 class Message(object):
164 class Message(object):
157 """A simple message object that maps dict keys to attributes.
165 """A simple message object that maps dict keys to attributes.
158
166
159 A Message can be created from a dict and a dict from a Message instance
167 A Message can be created from a dict and a dict from a Message instance
160 simply by calling dict(msg_obj)."""
168 simply by calling dict(msg_obj)."""
161
169
162 def __init__(self, msg_dict):
170 def __init__(self, msg_dict):
163 dct = self.__dict__
171 dct = self.__dict__
164 for k, v in iteritems(dict(msg_dict)):
172 for k, v in iteritems(dict(msg_dict)):
165 if isinstance(v, dict):
173 if isinstance(v, dict):
166 v = Message(v)
174 v = Message(v)
167 dct[k] = v
175 dct[k] = v
168
176
169 # Having this iterator lets dict(msg_obj) work out of the box.
177 # Having this iterator lets dict(msg_obj) work out of the box.
170 def __iter__(self):
178 def __iter__(self):
171 return iter(iteritems(self.__dict__))
179 return iter(iteritems(self.__dict__))
172
180
173 def __repr__(self):
181 def __repr__(self):
174 return repr(self.__dict__)
182 return repr(self.__dict__)
175
183
176 def __str__(self):
184 def __str__(self):
177 return pprint.pformat(self.__dict__)
185 return pprint.pformat(self.__dict__)
178
186
179 def __contains__(self, k):
187 def __contains__(self, k):
180 return k in self.__dict__
188 return k in self.__dict__
181
189
182 def __getitem__(self, k):
190 def __getitem__(self, k):
183 return self.__dict__[k]
191 return self.__dict__[k]
184
192
185
193
186 def msg_header(msg_id, msg_type, username, session):
194 def msg_header(msg_id, msg_type, username, session):
187 date = datetime.now()
195 date = datetime.now()
188 version = kernel_protocol_version
196 version = kernel_protocol_version
189 return locals()
197 return locals()
190
198
191 def extract_header(msg_or_header):
199 def extract_header(msg_or_header):
192 """Given a message or header, return the header."""
200 """Given a message or header, return the header."""
193 if not msg_or_header:
201 if not msg_or_header:
194 return {}
202 return {}
195 try:
203 try:
196 # See if msg_or_header is the entire message.
204 # See if msg_or_header is the entire message.
197 h = msg_or_header['header']
205 h = msg_or_header['header']
198 except KeyError:
206 except KeyError:
199 try:
207 try:
200 # See if msg_or_header is just the header
208 # See if msg_or_header is just the header
201 h = msg_or_header['msg_id']
209 h = msg_or_header['msg_id']
202 except KeyError:
210 except KeyError:
203 raise
211 raise
204 else:
212 else:
205 h = msg_or_header
213 h = msg_or_header
206 if not isinstance(h, dict):
214 if not isinstance(h, dict):
207 h = dict(h)
215 h = dict(h)
208 return h
216 return h
209
217
210 class Session(Configurable):
218 class Session(Configurable):
211 """Object for handling serialization and sending of messages.
219 """Object for handling serialization and sending of messages.
212
220
213 The Session object handles building messages and sending them
221 The Session object handles building messages and sending them
214 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
222 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
215 other over the network via Session objects, and only need to work with the
223 other over the network via Session objects, and only need to work with the
216 dict-based IPython message spec. The Session will handle
224 dict-based IPython message spec. The Session will handle
217 serialization/deserialization, security, and metadata.
225 serialization/deserialization, security, and metadata.
218
226
219 Sessions support configurable serialiization via packer/unpacker traits,
227 Sessions support configurable serialization via packer/unpacker traits,
220 and signing with HMAC digests via the key/keyfile traits.
228 and signing with HMAC digests via the key/keyfile traits.
221
229
222 Parameters
230 Parameters
223 ----------
231 ----------
224
232
225 debug : bool
233 debug : bool
226 whether to trigger extra debugging statements
234 whether to trigger extra debugging statements
227 packer/unpacker : str : 'json', 'pickle' or import_string
235 packer/unpacker : str : 'json', 'pickle' or import_string
228 importstrings for methods to serialize message parts. If just
236 importstrings for methods to serialize message parts. If just
229 'json' or 'pickle', predefined JSON and pickle packers will be used.
237 'json' or 'pickle', predefined JSON and pickle packers will be used.
230 Otherwise, the entire importstring must be used.
238 Otherwise, the entire importstring must be used.
231
239
232 The functions must accept at least valid JSON input, and output *bytes*.
240 The functions must accept at least valid JSON input, and output *bytes*.
233
241
234 For example, to use msgpack:
242 For example, to use msgpack:
235 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
243 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
236 pack/unpack : callables
244 pack/unpack : callables
237 You can also set the pack/unpack callables for serialization directly.
245 You can also set the pack/unpack callables for serialization directly.
238 session : bytes
246 session : bytes
239 the ID of this Session object. The default is to generate a new UUID.
247 the ID of this Session object. The default is to generate a new UUID.
240 username : unicode
248 username : unicode
241 username added to message headers. The default is to ask the OS.
249 username added to message headers. The default is to ask the OS.
242 key : bytes
250 key : bytes
243 The key used to initialize an HMAC signature. If unset, messages
251 The key used to initialize an HMAC signature. If unset, messages
244 will not be signed or checked.
252 will not be signed or checked.
245 keyfile : filepath
253 keyfile : filepath
246 The file containing a key. If this is set, `key` will be initialized
254 The file containing a key. If this is set, `key` will be initialized
247 to the contents of the file.
255 to the contents of the file.
248
256
249 """
257 """
250
258
251 debug=Bool(False, config=True, help="""Debug output in the Session""")
259 debug=Bool(False, config=True, help="""Debug output in the Session""")
252
260
253 packer = DottedObjectName('json',config=True,
261 packer = DottedObjectName('json',config=True,
254 help="""The name of the packer for serializing messages.
262 help="""The name of the packer for serializing messages.
255 Should be one of 'json', 'pickle', or an import name
263 Should be one of 'json', 'pickle', or an import name
256 for a custom callable serializer.""")
264 for a custom callable serializer.""")
257 def _packer_changed(self, name, old, new):
265 def _packer_changed(self, name, old, new):
258 if new.lower() == 'json':
266 if new.lower() == 'json':
259 self.pack = json_packer
267 self.pack = json_packer
260 self.unpack = json_unpacker
268 self.unpack = json_unpacker
261 self.unpacker = new
269 self.unpacker = new
262 elif new.lower() == 'pickle':
270 elif new.lower() == 'pickle':
263 self.pack = pickle_packer
271 self.pack = pickle_packer
264 self.unpack = pickle_unpacker
272 self.unpack = pickle_unpacker
265 self.unpacker = new
273 self.unpacker = new
266 else:
274 else:
267 self.pack = import_item(str(new))
275 self.pack = import_item(str(new))
268
276
269 unpacker = DottedObjectName('json', config=True,
277 unpacker = DottedObjectName('json', config=True,
270 help="""The name of the unpacker for unserializing messages.
278 help="""The name of the unpacker for unserializing messages.
271 Only used with custom functions for `packer`.""")
279 Only used with custom functions for `packer`.""")
272 def _unpacker_changed(self, name, old, new):
280 def _unpacker_changed(self, name, old, new):
273 if new.lower() == 'json':
281 if new.lower() == 'json':
274 self.pack = json_packer
282 self.pack = json_packer
275 self.unpack = json_unpacker
283 self.unpack = json_unpacker
276 self.packer = new
284 self.packer = new
277 elif new.lower() == 'pickle':
285 elif new.lower() == 'pickle':
278 self.pack = pickle_packer
286 self.pack = pickle_packer
279 self.unpack = pickle_unpacker
287 self.unpack = pickle_unpacker
280 self.packer = new
288 self.packer = new
281 else:
289 else:
282 self.unpack = import_item(str(new))
290 self.unpack = import_item(str(new))
283
291
284 session = CUnicode(u'', config=True,
292 session = CUnicode(u'', config=True,
285 help="""The UUID identifying this session.""")
293 help="""The UUID identifying this session.""")
286 def _session_default(self):
294 def _session_default(self):
287 u = unicode_type(uuid.uuid4())
295 u = unicode_type(uuid.uuid4())
288 self.bsession = u.encode('ascii')
296 self.bsession = u.encode('ascii')
289 return u
297 return u
290
298
291 def _session_changed(self, name, old, new):
299 def _session_changed(self, name, old, new):
292 self.bsession = self.session.encode('ascii')
300 self.bsession = self.session.encode('ascii')
293
301
294 # bsession is the session as bytes
302 # bsession is the session as bytes
295 bsession = CBytes(b'')
303 bsession = CBytes(b'')
296
304
297 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
305 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
298 help="""Username for the Session. Default is your system username.""",
306 help="""Username for the Session. Default is your system username.""",
299 config=True)
307 config=True)
300
308
301 metadata = Dict({}, config=True,
309 metadata = Dict({}, config=True,
302 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
310 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
303
311
304 # if 0, no adapting to do.
312 # if 0, no adapting to do.
305 adapt_version = Integer(0)
313 adapt_version = Integer(0)
306
314
307 # message signature related traits:
315 # message signature related traits:
308
316
309 key = CBytes(b'', config=True,
317 key = CBytes(b'', config=True,
310 help="""execution key, for extra authentication.""")
318 help="""execution key, for extra authentication.""")
311 def _key_changed(self):
319 def _key_changed(self):
312 self._new_auth()
320 self._new_auth()
313
321
314 signature_scheme = Unicode('hmac-sha256', config=True,
322 signature_scheme = Unicode('hmac-sha256', config=True,
315 help="""The digest scheme used to construct the message signatures.
323 help="""The digest scheme used to construct the message signatures.
316 Must have the form 'hmac-HASH'.""")
324 Must have the form 'hmac-HASH'.""")
317 def _signature_scheme_changed(self, name, old, new):
325 def _signature_scheme_changed(self, name, old, new):
318 if not new.startswith('hmac-'):
326 if not new.startswith('hmac-'):
319 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
327 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
320 hash_name = new.split('-', 1)[1]
328 hash_name = new.split('-', 1)[1]
321 try:
329 try:
322 self.digest_mod = getattr(hashlib, hash_name)
330 self.digest_mod = getattr(hashlib, hash_name)
323 except AttributeError:
331 except AttributeError:
324 raise TraitError("hashlib has no such attribute: %s" % hash_name)
332 raise TraitError("hashlib has no such attribute: %s" % hash_name)
325 self._new_auth()
333 self._new_auth()
326
334
327 digest_mod = Any()
335 digest_mod = Any()
328 def _digest_mod_default(self):
336 def _digest_mod_default(self):
329 return hashlib.sha256
337 return hashlib.sha256
330
338
331 auth = Instance(hmac.HMAC)
339 auth = Instance(hmac.HMAC)
332
340
333 def _new_auth(self):
341 def _new_auth(self):
334 if self.key:
342 if self.key:
335 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
343 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
336 else:
344 else:
337 self.auth = None
345 self.auth = None
338
346
339 digest_history = Set()
347 digest_history = Set()
340 digest_history_size = Integer(2**16, config=True,
348 digest_history_size = Integer(2**16, config=True,
341 help="""The maximum number of digests to remember.
349 help="""The maximum number of digests to remember.
342
350
343 The digest history will be culled when it exceeds this value.
351 The digest history will be culled when it exceeds this value.
344 """
352 """
345 )
353 )
346
354
347 keyfile = Unicode('', config=True,
355 keyfile = Unicode('', config=True,
348 help="""path to file containing execution key.""")
356 help="""path to file containing execution key.""")
349 def _keyfile_changed(self, name, old, new):
357 def _keyfile_changed(self, name, old, new):
350 with open(new, 'rb') as f:
358 with open(new, 'rb') as f:
351 self.key = f.read().strip()
359 self.key = f.read().strip()
352
360
353 # for protecting against sends from forks
361 # for protecting against sends from forks
354 pid = Integer()
362 pid = Integer()
355
363
356 # serialization traits:
364 # serialization traits:
357
365
358 pack = Any(default_packer) # the actual packer function
366 pack = Any(default_packer) # the actual packer function
359 def _pack_changed(self, name, old, new):
367 def _pack_changed(self, name, old, new):
360 if not callable(new):
368 if not callable(new):
361 raise TypeError("packer must be callable, not %s"%type(new))
369 raise TypeError("packer must be callable, not %s"%type(new))
362
370
363 unpack = Any(default_unpacker) # the actual packer function
371 unpack = Any(default_unpacker) # the actual packer function
364 def _unpack_changed(self, name, old, new):
372 def _unpack_changed(self, name, old, new):
365 # unpacker is not checked - it is assumed to be
373 # unpacker is not checked - it is assumed to be
366 if not callable(new):
374 if not callable(new):
367 raise TypeError("unpacker must be callable, not %s"%type(new))
375 raise TypeError("unpacker must be callable, not %s"%type(new))
368
376
369 # thresholds:
377 # thresholds:
370 copy_threshold = Integer(2**16, config=True,
378 copy_threshold = Integer(2**16, config=True,
371 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
379 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
372 buffer_threshold = Integer(MAX_BYTES, config=True,
380 buffer_threshold = Integer(MAX_BYTES, config=True,
373 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
381 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
374 item_threshold = Integer(MAX_ITEMS, config=True,
382 item_threshold = Integer(MAX_ITEMS, config=True,
375 help="""The maximum number of items for a container to be introspected for custom serialization.
383 help="""The maximum number of items for a container to be introspected for custom serialization.
376 Containers larger than this are pickled outright.
384 Containers larger than this are pickled outright.
377 """
385 """
378 )
386 )
379
387
380
388
381 def __init__(self, **kwargs):
389 def __init__(self, **kwargs):
382 """create a Session object
390 """create a Session object
383
391
384 Parameters
392 Parameters
385 ----------
393 ----------
386
394
387 debug : bool
395 debug : bool
388 whether to trigger extra debugging statements
396 whether to trigger extra debugging statements
389 packer/unpacker : str : 'json', 'pickle' or import_string
397 packer/unpacker : str : 'json', 'pickle' or import_string
390 importstrings for methods to serialize message parts. If just
398 importstrings for methods to serialize message parts. If just
391 'json' or 'pickle', predefined JSON and pickle packers will be used.
399 'json' or 'pickle', predefined JSON and pickle packers will be used.
392 Otherwise, the entire importstring must be used.
400 Otherwise, the entire importstring must be used.
393
401
394 The functions must accept at least valid JSON input, and output
402 The functions must accept at least valid JSON input, and output
395 *bytes*.
403 *bytes*.
396
404
397 For example, to use msgpack:
405 For example, to use msgpack:
398 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
406 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
399 pack/unpack : callables
407 pack/unpack : callables
400 You can also set the pack/unpack callables for serialization
408 You can also set the pack/unpack callables for serialization
401 directly.
409 directly.
402 session : unicode (must be ascii)
410 session : unicode (must be ascii)
403 the ID of this Session object. The default is to generate a new
411 the ID of this Session object. The default is to generate a new
404 UUID.
412 UUID.
405 bsession : bytes
413 bsession : bytes
406 The session as bytes
414 The session as bytes
407 username : unicode
415 username : unicode
408 username added to message headers. The default is to ask the OS.
416 username added to message headers. The default is to ask the OS.
409 key : bytes
417 key : bytes
410 The key used to initialize an HMAC signature. If unset, messages
418 The key used to initialize an HMAC signature. If unset, messages
411 will not be signed or checked.
419 will not be signed or checked.
412 signature_scheme : str
420 signature_scheme : str
413 The message digest scheme. Currently must be of the form 'hmac-HASH',
421 The message digest scheme. Currently must be of the form 'hmac-HASH',
414 where 'HASH' is a hashing function available in Python's hashlib.
422 where 'HASH' is a hashing function available in Python's hashlib.
415 The default is 'hmac-sha256'.
423 The default is 'hmac-sha256'.
416 This is ignored if 'key' is empty.
424 This is ignored if 'key' is empty.
417 keyfile : filepath
425 keyfile : filepath
418 The file containing a key. If this is set, `key` will be
426 The file containing a key. If this is set, `key` will be
419 initialized to the contents of the file.
427 initialized to the contents of the file.
420 """
428 """
421 super(Session, self).__init__(**kwargs)
429 super(Session, self).__init__(**kwargs)
422 self._check_packers()
430 self._check_packers()
423 self.none = self.pack({})
431 self.none = self.pack({})
424 # ensure self._session_default() if necessary, so bsession is defined:
432 # ensure self._session_default() if necessary, so bsession is defined:
425 self.session
433 self.session
426 self.pid = os.getpid()
434 self.pid = os.getpid()
427
435
428 @property
436 @property
429 def msg_id(self):
437 def msg_id(self):
430 """always return new uuid"""
438 """always return new uuid"""
431 return str(uuid.uuid4())
439 return str(uuid.uuid4())
432
440
433 def _check_packers(self):
441 def _check_packers(self):
434 """check packers for datetime support."""
442 """check packers for datetime support."""
435 pack = self.pack
443 pack = self.pack
436 unpack = self.unpack
444 unpack = self.unpack
437
445
438 # check simple serialization
446 # check simple serialization
439 msg = dict(a=[1,'hi'])
447 msg = dict(a=[1,'hi'])
440 try:
448 try:
441 packed = pack(msg)
449 packed = pack(msg)
442 except Exception as e:
450 except Exception as e:
443 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
451 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
444 if self.packer == 'json':
452 if self.packer == 'json':
445 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
453 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
446 else:
454 else:
447 jsonmsg = ""
455 jsonmsg = ""
448 raise ValueError(
456 raise ValueError(
449 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
457 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
450 )
458 )
451
459
452 # ensure packed message is bytes
460 # ensure packed message is bytes
453 if not isinstance(packed, bytes):
461 if not isinstance(packed, bytes):
454 raise ValueError("message packed to %r, but bytes are required"%type(packed))
462 raise ValueError("message packed to %r, but bytes are required"%type(packed))
455
463
456 # check that unpack is pack's inverse
464 # check that unpack is pack's inverse
457 try:
465 try:
458 unpacked = unpack(packed)
466 unpacked = unpack(packed)
459 assert unpacked == msg
467 assert unpacked == msg
460 except Exception as e:
468 except Exception as e:
461 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
469 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
462 if self.packer == 'json':
470 if self.packer == 'json':
463 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
471 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
464 else:
472 else:
465 jsonmsg = ""
473 jsonmsg = ""
466 raise ValueError(
474 raise ValueError(
467 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
475 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
468 )
476 )
469
477
470 # check datetime support
478 # check datetime support
471 msg = dict(t=datetime.now())
479 msg = dict(t=datetime.now())
472 try:
480 try:
473 unpacked = unpack(pack(msg))
481 unpacked = unpack(pack(msg))
474 if isinstance(unpacked['t'], datetime):
482 if isinstance(unpacked['t'], datetime):
475 raise ValueError("Shouldn't deserialize to datetime")
483 raise ValueError("Shouldn't deserialize to datetime")
476 except Exception:
484 except Exception:
477 self.pack = lambda o: pack(squash_dates(o))
485 self.pack = lambda o: pack(squash_dates(o))
478 self.unpack = lambda s: unpack(s)
486 self.unpack = lambda s: unpack(s)
479
487
480 def msg_header(self, msg_type):
488 def msg_header(self, msg_type):
481 return msg_header(self.msg_id, msg_type, self.username, self.session)
489 return msg_header(self.msg_id, msg_type, self.username, self.session)
482
490
483 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
491 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
484 """Return the nested message dict.
492 """Return the nested message dict.
485
493
486 This format is different from what is sent over the wire. The
494 This format is different from what is sent over the wire. The
487 serialize/unserialize methods converts this nested message dict to the wire
495 serialize/unserialize methods converts this nested message dict to the wire
488 format, which is a list of message parts.
496 format, which is a list of message parts.
489 """
497 """
490 msg = {}
498 msg = {}
491 header = self.msg_header(msg_type) if header is None else header
499 header = self.msg_header(msg_type) if header is None else header
492 msg['header'] = header
500 msg['header'] = header
493 msg['msg_id'] = header['msg_id']
501 msg['msg_id'] = header['msg_id']
494 msg['msg_type'] = header['msg_type']
502 msg['msg_type'] = header['msg_type']
495 msg['parent_header'] = {} if parent is None else extract_header(parent)
503 msg['parent_header'] = {} if parent is None else extract_header(parent)
496 msg['content'] = {} if content is None else content
504 msg['content'] = {} if content is None else content
497 msg['metadata'] = self.metadata.copy()
505 msg['metadata'] = self.metadata.copy()
498 if metadata is not None:
506 if metadata is not None:
499 msg['metadata'].update(metadata)
507 msg['metadata'].update(metadata)
500 return msg
508 return msg
501
509
502 def sign(self, msg_list):
510 def sign(self, msg_list):
503 """Sign a message with HMAC digest. If no auth, return b''.
511 """Sign a message with HMAC digest. If no auth, return b''.
504
512
505 Parameters
513 Parameters
506 ----------
514 ----------
507 msg_list : list
515 msg_list : list
508 The [p_header,p_parent,p_content] part of the message list.
516 The [p_header,p_parent,p_content] part of the message list.
509 """
517 """
510 if self.auth is None:
518 if self.auth is None:
511 return b''
519 return b''
512 h = self.auth.copy()
520 h = self.auth.copy()
513 for m in msg_list:
521 for m in msg_list:
514 h.update(m)
522 h.update(m)
515 return str_to_bytes(h.hexdigest())
523 return str_to_bytes(h.hexdigest())
516
524
517 def serialize(self, msg, ident=None):
525 def serialize(self, msg, ident=None):
518 """Serialize the message components to bytes.
526 """Serialize the message components to bytes.
519
527
520 This is roughly the inverse of unserialize. The serialize/unserialize
528 This is roughly the inverse of unserialize. The serialize/unserialize
521 methods work with full message lists, whereas pack/unpack work with
529 methods work with full message lists, whereas pack/unpack work with
522 the individual message parts in the message list.
530 the individual message parts in the message list.
523
531
524 Parameters
532 Parameters
525 ----------
533 ----------
526 msg : dict or Message
534 msg : dict or Message
527 The nexted message dict as returned by the self.msg method.
535 The next message dict as returned by the self.msg method.
528
536
529 Returns
537 Returns
530 -------
538 -------
531 msg_list : list
539 msg_list : list
532 The list of bytes objects to be sent with the format::
540 The list of bytes objects to be sent with the format::
533
541
534 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
542 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
535 p_metadata, p_content, buffer1, buffer2, ...]
543 p_metadata, p_content, buffer1, buffer2, ...]
536
544
537 In this list, the ``p_*`` entities are the packed or serialized
545 In this list, the ``p_*`` entities are the packed or serialized
538 versions, so if JSON is used, these are utf8 encoded JSON strings.
546 versions, so if JSON is used, these are utf8 encoded JSON strings.
539 """
547 """
540 content = msg.get('content', {})
548 content = msg.get('content', {})
541 if content is None:
549 if content is None:
542 content = self.none
550 content = self.none
543 elif isinstance(content, dict):
551 elif isinstance(content, dict):
544 content = self.pack(content)
552 content = self.pack(content)
545 elif isinstance(content, bytes):
553 elif isinstance(content, bytes):
546 # content is already packed, as in a relayed message
554 # content is already packed, as in a relayed message
547 pass
555 pass
548 elif isinstance(content, unicode_type):
556 elif isinstance(content, unicode_type):
549 # should be bytes, but JSON often spits out unicode
557 # should be bytes, but JSON often spits out unicode
550 content = content.encode('utf8')
558 content = content.encode('utf8')
551 else:
559 else:
552 raise TypeError("Content incorrect type: %s"%type(content))
560 raise TypeError("Content incorrect type: %s"%type(content))
553
561
554 real_message = [self.pack(msg['header']),
562 real_message = [self.pack(msg['header']),
555 self.pack(msg['parent_header']),
563 self.pack(msg['parent_header']),
556 self.pack(msg['metadata']),
564 self.pack(msg['metadata']),
557 content,
565 content,
558 ]
566 ]
559
567
560 to_send = []
568 to_send = []
561
569
562 if isinstance(ident, list):
570 if isinstance(ident, list):
563 # accept list of idents
571 # accept list of idents
564 to_send.extend(ident)
572 to_send.extend(ident)
565 elif ident is not None:
573 elif ident is not None:
566 to_send.append(ident)
574 to_send.append(ident)
567 to_send.append(DELIM)
575 to_send.append(DELIM)
568
576
569 signature = self.sign(real_message)
577 signature = self.sign(real_message)
570 to_send.append(signature)
578 to_send.append(signature)
571
579
572 to_send.extend(real_message)
580 to_send.extend(real_message)
573
581
574 return to_send
582 return to_send
575
583
576 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
584 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
577 buffers=None, track=False, header=None, metadata=None):
585 buffers=None, track=False, header=None, metadata=None):
578 """Build and send a message via stream or socket.
586 """Build and send a message via stream or socket.
579
587
580 The message format used by this function internally is as follows:
588 The message format used by this function internally is as follows:
581
589
582 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
590 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
583 buffer1,buffer2,...]
591 buffer1,buffer2,...]
584
592
585 The serialize/unserialize methods convert the nested message dict into this
593 The serialize/unserialize methods convert the nested message dict into this
586 format.
594 format.
587
595
588 Parameters
596 Parameters
589 ----------
597 ----------
590
598
591 stream : zmq.Socket or ZMQStream
599 stream : zmq.Socket or ZMQStream
592 The socket-like object used to send the data.
600 The socket-like object used to send the data.
593 msg_or_type : str or Message/dict
601 msg_or_type : str or Message/dict
594 Normally, msg_or_type will be a msg_type unless a message is being
602 Normally, msg_or_type will be a msg_type unless a message is being
595 sent more than once. If a header is supplied, this can be set to
603 sent more than once. If a header is supplied, this can be set to
596 None and the msg_type will be pulled from the header.
604 None and the msg_type will be pulled from the header.
597
605
598 content : dict or None
606 content : dict or None
599 The content of the message (ignored if msg_or_type is a message).
607 The content of the message (ignored if msg_or_type is a message).
600 header : dict or None
608 header : dict or None
601 The header dict for the message (ignored if msg_to_type is a message).
609 The header dict for the message (ignored if msg_to_type is a message).
602 parent : Message or dict or None
610 parent : Message or dict or None
603 The parent or parent header describing the parent of this message
611 The parent or parent header describing the parent of this message
604 (ignored if msg_or_type is a message).
612 (ignored if msg_or_type is a message).
605 ident : bytes or list of bytes
613 ident : bytes or list of bytes
606 The zmq.IDENTITY routing path.
614 The zmq.IDENTITY routing path.
607 metadata : dict or None
615 metadata : dict or None
608 The metadata describing the message
616 The metadata describing the message
609 buffers : list or None
617 buffers : list or None
610 The already-serialized buffers to be appended to the message.
618 The already-serialized buffers to be appended to the message.
611 track : bool
619 track : bool
612 Whether to track. Only for use with Sockets, because ZMQStream
620 Whether to track. Only for use with Sockets, because ZMQStream
613 objects cannot track messages.
621 objects cannot track messages.
614
622
615
623
616 Returns
624 Returns
617 -------
625 -------
618 msg : dict
626 msg : dict
619 The constructed message.
627 The constructed message.
620 """
628 """
621 if not isinstance(stream, zmq.Socket):
629 if not isinstance(stream, zmq.Socket):
622 # ZMQStreams and dummy sockets do not support tracking.
630 # ZMQStreams and dummy sockets do not support tracking.
623 track = False
631 track = False
624
632
625 if isinstance(msg_or_type, (Message, dict)):
633 if isinstance(msg_or_type, (Message, dict)):
626 # We got a Message or message dict, not a msg_type so don't
634 # We got a Message or message dict, not a msg_type so don't
627 # build a new Message.
635 # build a new Message.
628 msg = msg_or_type
636 msg = msg_or_type
629 else:
637 else:
630 msg = self.msg(msg_or_type, content=content, parent=parent,
638 msg = self.msg(msg_or_type, content=content, parent=parent,
631 header=header, metadata=metadata)
639 header=header, metadata=metadata)
632 if not os.getpid() == self.pid:
640 if not os.getpid() == self.pid:
633 io.rprint("WARNING: attempted to send message from fork")
641 io.rprint("WARNING: attempted to send message from fork")
634 io.rprint(msg)
642 io.rprint(msg)
635 return
643 return
636 buffers = [] if buffers is None else buffers
644 buffers = [] if buffers is None else buffers
637 if self.adapt_version:
645 if self.adapt_version:
638 msg = adapt(msg, self.adapt_version)
646 msg = adapt(msg, self.adapt_version)
639 to_send = self.serialize(msg, ident)
647 to_send = self.serialize(msg, ident)
640 to_send.extend(buffers)
648 to_send.extend(buffers)
641 longest = max([ len(s) for s in to_send ])
649 longest = max([ len(s) for s in to_send ])
642 copy = (longest < self.copy_threshold)
650 copy = (longest < self.copy_threshold)
643
651
644 if buffers and track and not copy:
652 if buffers and track and not copy:
645 # only really track when we are doing zero-copy buffers
653 # only really track when we are doing zero-copy buffers
646 tracker = stream.send_multipart(to_send, copy=False, track=True)
654 tracker = stream.send_multipart(to_send, copy=False, track=True)
647 else:
655 else:
648 # use dummy tracker, which will be done immediately
656 # use dummy tracker, which will be done immediately
649 tracker = DONE
657 tracker = DONE
650 stream.send_multipart(to_send, copy=copy)
658 stream.send_multipart(to_send, copy=copy)
651
659
652 if self.debug:
660 if self.debug:
653 pprint.pprint(msg)
661 pprint.pprint(msg)
654 pprint.pprint(to_send)
662 pprint.pprint(to_send)
655 pprint.pprint(buffers)
663 pprint.pprint(buffers)
656
664
657 msg['tracker'] = tracker
665 msg['tracker'] = tracker
658
666
659 return msg
667 return msg
660
668
661 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
669 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
662 """Send a raw message via ident path.
670 """Send a raw message via ident path.
663
671
664 This method is used to send a already serialized message.
672 This method is used to send a already serialized message.
665
673
666 Parameters
674 Parameters
667 ----------
675 ----------
668 stream : ZMQStream or Socket
676 stream : ZMQStream or Socket
669 The ZMQ stream or socket to use for sending the message.
677 The ZMQ stream or socket to use for sending the message.
670 msg_list : list
678 msg_list : list
671 The serialized list of messages to send. This only includes the
679 The serialized list of messages to send. This only includes the
672 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
680 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
673 the message.
681 the message.
674 ident : ident or list
682 ident : ident or list
675 A single ident or a list of idents to use in sending.
683 A single ident or a list of idents to use in sending.
676 """
684 """
677 to_send = []
685 to_send = []
678 if isinstance(ident, bytes):
686 if isinstance(ident, bytes):
679 ident = [ident]
687 ident = [ident]
680 if ident is not None:
688 if ident is not None:
681 to_send.extend(ident)
689 to_send.extend(ident)
682
690
683 to_send.append(DELIM)
691 to_send.append(DELIM)
684 to_send.append(self.sign(msg_list))
692 to_send.append(self.sign(msg_list))
685 to_send.extend(msg_list)
693 to_send.extend(msg_list)
686 stream.send_multipart(to_send, flags, copy=copy)
694 stream.send_multipart(to_send, flags, copy=copy)
687
695
688 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
696 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
689 """Receive and unpack a message.
697 """Receive and unpack a message.
690
698
691 Parameters
699 Parameters
692 ----------
700 ----------
693 socket : ZMQStream or Socket
701 socket : ZMQStream or Socket
694 The socket or stream to use in receiving.
702 The socket or stream to use in receiving.
695
703
696 Returns
704 Returns
697 -------
705 -------
698 [idents], msg
706 [idents], msg
699 [idents] is a list of idents and msg is a nested message dict of
707 [idents] is a list of idents and msg is a nested message dict of
700 same format as self.msg returns.
708 same format as self.msg returns.
701 """
709 """
702 if isinstance(socket, ZMQStream):
710 if isinstance(socket, ZMQStream):
703 socket = socket.socket
711 socket = socket.socket
704 try:
712 try:
705 msg_list = socket.recv_multipart(mode, copy=copy)
713 msg_list = socket.recv_multipart(mode, copy=copy)
706 except zmq.ZMQError as e:
714 except zmq.ZMQError as e:
707 if e.errno == zmq.EAGAIN:
715 if e.errno == zmq.EAGAIN:
708 # We can convert EAGAIN to None as we know in this case
716 # We can convert EAGAIN to None as we know in this case
709 # recv_multipart won't return None.
717 # recv_multipart won't return None.
710 return None,None
718 return None,None
711 else:
719 else:
712 raise
720 raise
713 # split multipart message into identity list and message dict
721 # split multipart message into identity list and message dict
714 # invalid large messages can cause very expensive string comparisons
722 # invalid large messages can cause very expensive string comparisons
715 idents, msg_list = self.feed_identities(msg_list, copy)
723 idents, msg_list = self.feed_identities(msg_list, copy)
716 try:
724 try:
717 return idents, self.unserialize(msg_list, content=content, copy=copy)
725 return idents, self.unserialize(msg_list, content=content, copy=copy)
718 except Exception as e:
726 except Exception as e:
719 # TODO: handle it
727 # TODO: handle it
720 raise e
728 raise e
721
729
722 def feed_identities(self, msg_list, copy=True):
730 def feed_identities(self, msg_list, copy=True):
723 """Split the identities from the rest of the message.
731 """Split the identities from the rest of the message.
724
732
725 Feed until DELIM is reached, then return the prefix as idents and
733 Feed until DELIM is reached, then return the prefix as idents and
726 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
734 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
727 but that would be silly.
735 but that would be silly.
728
736
729 Parameters
737 Parameters
730 ----------
738 ----------
731 msg_list : a list of Message or bytes objects
739 msg_list : a list of Message or bytes objects
732 The message to be split.
740 The message to be split.
733 copy : bool
741 copy : bool
734 flag determining whether the arguments are bytes or Messages
742 flag determining whether the arguments are bytes or Messages
735
743
736 Returns
744 Returns
737 -------
745 -------
738 (idents, msg_list) : two lists
746 (idents, msg_list) : two lists
739 idents will always be a list of bytes, each of which is a ZMQ
747 idents will always be a list of bytes, each of which is a ZMQ
740 identity. msg_list will be a list of bytes or zmq.Messages of the
748 identity. msg_list will be a list of bytes or zmq.Messages of the
741 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
749 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
742 should be unpackable/unserializable via self.unserialize at this
750 should be unpackable/unserializable via self.unserialize at this
743 point.
751 point.
744 """
752 """
745 if copy:
753 if copy:
746 idx = msg_list.index(DELIM)
754 idx = msg_list.index(DELIM)
747 return msg_list[:idx], msg_list[idx+1:]
755 return msg_list[:idx], msg_list[idx+1:]
748 else:
756 else:
749 failed = True
757 failed = True
750 for idx,m in enumerate(msg_list):
758 for idx,m in enumerate(msg_list):
751 if m.bytes == DELIM:
759 if m.bytes == DELIM:
752 failed = False
760 failed = False
753 break
761 break
754 if failed:
762 if failed:
755 raise ValueError("DELIM not in msg_list")
763 raise ValueError("DELIM not in msg_list")
756 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
764 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
757 return [m.bytes for m in idents], msg_list
765 return [m.bytes for m in idents], msg_list
758
766
759 def _add_digest(self, signature):
767 def _add_digest(self, signature):
760 """add a digest to history to protect against replay attacks"""
768 """add a digest to history to protect against replay attacks"""
761 if self.digest_history_size == 0:
769 if self.digest_history_size == 0:
762 # no history, never add digests
770 # no history, never add digests
763 return
771 return
764
772
765 self.digest_history.add(signature)
773 self.digest_history.add(signature)
766 if len(self.digest_history) > self.digest_history_size:
774 if len(self.digest_history) > self.digest_history_size:
767 # threshold reached, cull 10%
775 # threshold reached, cull 10%
768 self._cull_digest_history()
776 self._cull_digest_history()
769
777
770 def _cull_digest_history(self):
778 def _cull_digest_history(self):
771 """cull the digest history
779 """cull the digest history
772
780
773 Removes a randomly selected 10% of the digest history
781 Removes a randomly selected 10% of the digest history
774 """
782 """
775 current = len(self.digest_history)
783 current = len(self.digest_history)
776 n_to_cull = max(int(current // 10), current - self.digest_history_size)
784 n_to_cull = max(int(current // 10), current - self.digest_history_size)
777 if n_to_cull >= current:
785 if n_to_cull >= current:
778 self.digest_history = set()
786 self.digest_history = set()
779 return
787 return
780 to_cull = random.sample(self.digest_history, n_to_cull)
788 to_cull = random.sample(self.digest_history, n_to_cull)
781 self.digest_history.difference_update(to_cull)
789 self.digest_history.difference_update(to_cull)
782
790
783 def unserialize(self, msg_list, content=True, copy=True):
791 def unserialize(self, msg_list, content=True, copy=True):
784 """Unserialize a msg_list to a nested message dict.
792 """Unserialize a msg_list to a nested message dict.
785
793
786 This is roughly the inverse of serialize. The serialize/unserialize
794 This is roughly the inverse of serialize. The serialize/unserialize
787 methods work with full message lists, whereas pack/unpack work with
795 methods work with full message lists, whereas pack/unpack work with
788 the individual message parts in the message list.
796 the individual message parts in the message list.
789
797
790 Parameters
798 Parameters
791 ----------
799 ----------
792 msg_list : list of bytes or Message objects
800 msg_list : list of bytes or Message objects
793 The list of message parts of the form [HMAC,p_header,p_parent,
801 The list of message parts of the form [HMAC,p_header,p_parent,
794 p_metadata,p_content,buffer1,buffer2,...].
802 p_metadata,p_content,buffer1,buffer2,...].
795 content : bool (True)
803 content : bool (True)
796 Whether to unpack the content dict (True), or leave it packed
804 Whether to unpack the content dict (True), or leave it packed
797 (False).
805 (False).
798 copy : bool (True)
806 copy : bool (True)
799 Whether to return the bytes (True), or the non-copying Message
807 Whether to return the bytes (True), or the non-copying Message
800 object in each place (False).
808 object in each place (False).
801
809
802 Returns
810 Returns
803 -------
811 -------
804 msg : dict
812 msg : dict
805 The nested message dict with top-level keys [header, parent_header,
813 The nested message dict with top-level keys [header, parent_header,
806 content, buffers].
814 content, buffers].
807 """
815 """
808 minlen = 5
816 minlen = 5
809 message = {}
817 message = {}
810 if not copy:
818 if not copy:
811 for i in range(minlen):
819 for i in range(minlen):
812 msg_list[i] = msg_list[i].bytes
820 msg_list[i] = msg_list[i].bytes
813 if self.auth is not None:
821 if self.auth is not None:
814 signature = msg_list[0]
822 signature = msg_list[0]
815 if not signature:
823 if not signature:
816 raise ValueError("Unsigned Message")
824 raise ValueError("Unsigned Message")
817 if signature in self.digest_history:
825 if signature in self.digest_history:
818 raise ValueError("Duplicate Signature: %r" % signature)
826 raise ValueError("Duplicate Signature: %r" % signature)
819 self._add_digest(signature)
827 self._add_digest(signature)
820 check = self.sign(msg_list[1:5])
828 check = self.sign(msg_list[1:5])
821 if not signature == check:
829 if not compare_digest(signature, check):
822 raise ValueError("Invalid Signature: %r" % signature)
830 raise ValueError("Invalid Signature: %r" % signature)
823 if not len(msg_list) >= minlen:
831 if not len(msg_list) >= minlen:
824 raise TypeError("malformed message, must have at least %i elements"%minlen)
832 raise TypeError("malformed message, must have at least %i elements"%minlen)
825 header = self.unpack(msg_list[1])
833 header = self.unpack(msg_list[1])
826 message['header'] = extract_dates(header)
834 message['header'] = extract_dates(header)
827 message['msg_id'] = header['msg_id']
835 message['msg_id'] = header['msg_id']
828 message['msg_type'] = header['msg_type']
836 message['msg_type'] = header['msg_type']
829 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
837 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
830 message['metadata'] = self.unpack(msg_list[3])
838 message['metadata'] = self.unpack(msg_list[3])
831 if content:
839 if content:
832 message['content'] = self.unpack(msg_list[4])
840 message['content'] = self.unpack(msg_list[4])
833 else:
841 else:
834 message['content'] = msg_list[4]
842 message['content'] = msg_list[4]
835
843
836 message['buffers'] = msg_list[5:]
844 message['buffers'] = msg_list[5:]
837 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
845 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
838 # adapt to the current version
846 # adapt to the current version
839 return adapt(message)
847 return adapt(message)
840 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
848 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
841
849
842 def test_msg2obj():
850 def test_msg2obj():
843 am = dict(x=1)
851 am = dict(x=1)
844 ao = Message(am)
852 ao = Message(am)
845 assert ao.x == am['x']
853 assert ao.x == am['x']
846
854
847 am['y'] = dict(z=1)
855 am['y'] = dict(z=1)
848 ao = Message(am)
856 ao = Message(am)
849 assert ao.y.z == am['y']['z']
857 assert ao.y.z == am['y']['z']
850
858
851 k1, k2 = 'y', 'z'
859 k1, k2 = 'y', 'z'
852 assert ao[k1][k2] == am[k1][k2]
860 assert ao[k1][k2] == am[k1][k2]
853
861
854 am2 = dict(ao)
862 am2 = dict(ao)
855 assert am['x'] == am2['x']
863 assert am['x'] == am2['x']
856 assert am['y']['z'] == am2['y']['z']
864 assert am['y']['z'] == am2['y']['z']
857
865
General Comments 0
You need to be logged in to leave comments. Login now