##// END OF EJS Templates
update comment...
Kyle Kelley -
Show More
@@ -1,868 +1,865 b''
1 """Session object for building, serializing, sending, and receiving messages in
1 """Session object for building, serializing, sending, and receiving messages in
2 IPython. The Session object supports serialization, HMAC signatures, and
2 IPython. The Session object supports serialization, HMAC signatures, and
3 metadata on messages.
3 metadata on messages.
4
4
5 Also defined here are utilities for working with Sessions:
5 Also defined here are utilities for working with Sessions:
6 * A SessionFactory to be used as a base class for configurables that work with
6 * A SessionFactory to be used as a base class for configurables that work with
7 Sessions.
7 Sessions.
8 * A Message object for convenience that allows attribute-access to the msg dict.
8 * A Message object for convenience that allows attribute-access to the msg dict.
9 """
9 """
10
10
11 # Copyright (c) IPython Development Team.
11 # Copyright (c) IPython Development Team.
12 # Distributed under the terms of the Modified BSD License.
12 # Distributed under the terms of the Modified BSD License.
13
13
14 import hashlib
14 import hashlib
15 import hmac
15 import hmac
16 import logging
16 import logging
17 import os
17 import os
18 import pprint
18 import pprint
19 import random
19 import random
20 import uuid
20 import uuid
21 from datetime import datetime
21 from datetime import datetime
22
22
23 try:
23 try:
24 import cPickle
24 import cPickle
25 pickle = cPickle
25 pickle = cPickle
26 except:
26 except:
27 cPickle = None
27 cPickle = None
28 import pickle
28 import pickle
29
29
30 try:
30 try:
31 # We are using compare_digest to limit the surface of timing attacks
31 from hmac import compare_digest
32 from hmac import compare_digest
32 except ImportError:
33 except ImportError:
33 # Python < 2.7.7
34 # Python < 2.7.7: When digests don't match no feedback is provided,
34 import warnings
35 # limiting the surface of attack
35 warnings.warn("You are using Python older than 2.7.7, please consider "
36 def compare_digest(a,b): return a == b
36 "updating to the latest version as it reduces a possible security"
37 " vulnerability.")
38 def compare_digest(a,b):
39 return a == b
40
37
41 import zmq
38 import zmq
42 from zmq.utils import jsonapi
39 from zmq.utils import jsonapi
43 from zmq.eventloop.ioloop import IOLoop
40 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.zmqstream import ZMQStream
41 from zmq.eventloop.zmqstream import ZMQStream
45
42
46 from IPython.core.release import kernel_protocol_version
43 from IPython.core.release import kernel_protocol_version
47 from IPython.config.configurable import Configurable, LoggingConfigurable
44 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils import io
45 from IPython.utils import io
49 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
50 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
47 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
51 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
48 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
52 iteritems)
49 iteritems)
53 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
54 DottedObjectName, CUnicode, Dict, Integer,
51 DottedObjectName, CUnicode, Dict, Integer,
55 TraitError,
52 TraitError,
56 )
53 )
57 from IPython.utils.pickleutil import PICKLE_PROTOCOL
54 from IPython.utils.pickleutil import PICKLE_PROTOCOL
58 from IPython.kernel.adapter import adapt
55 from IPython.kernel.adapter import adapt
59 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
56 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
60
57
61 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
62 # utility functions
59 # utility functions
63 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
64
61
65 def squash_unicode(obj):
62 def squash_unicode(obj):
66 """coerce unicode back to bytestrings."""
63 """coerce unicode back to bytestrings."""
67 if isinstance(obj,dict):
64 if isinstance(obj,dict):
68 for key in obj.keys():
65 for key in obj.keys():
69 obj[key] = squash_unicode(obj[key])
66 obj[key] = squash_unicode(obj[key])
70 if isinstance(key, unicode_type):
67 if isinstance(key, unicode_type):
71 obj[squash_unicode(key)] = obj.pop(key)
68 obj[squash_unicode(key)] = obj.pop(key)
72 elif isinstance(obj, list):
69 elif isinstance(obj, list):
73 for i,v in enumerate(obj):
70 for i,v in enumerate(obj):
74 obj[i] = squash_unicode(v)
71 obj[i] = squash_unicode(v)
75 elif isinstance(obj, unicode_type):
72 elif isinstance(obj, unicode_type):
76 obj = obj.encode('utf8')
73 obj = obj.encode('utf8')
77 return obj
74 return obj
78
75
79 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
80 # globals and defaults
77 # globals and defaults
81 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
82
79
83 # ISO8601-ify datetime objects
80 # ISO8601-ify datetime objects
84 # allow unicode
81 # allow unicode
85 # disallow nan, because it's not actually valid JSON
82 # disallow nan, because it's not actually valid JSON
86 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
83 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
87 ensure_ascii=False, allow_nan=False,
84 ensure_ascii=False, allow_nan=False,
88 )
85 )
89 json_unpacker = lambda s: jsonapi.loads(s)
86 json_unpacker = lambda s: jsonapi.loads(s)
90
87
91 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
88 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
92 pickle_unpacker = pickle.loads
89 pickle_unpacker = pickle.loads
93
90
94 default_packer = json_packer
91 default_packer = json_packer
95 default_unpacker = json_unpacker
92 default_unpacker = json_unpacker
96
93
97 DELIM = b"<IDS|MSG>"
94 DELIM = b"<IDS|MSG>"
98 # singleton dummy tracker, which will always report as done
95 # singleton dummy tracker, which will always report as done
99 DONE = zmq.MessageTracker()
96 DONE = zmq.MessageTracker()
100
97
101 #-----------------------------------------------------------------------------
98 #-----------------------------------------------------------------------------
102 # Mixin tools for apps that use Sessions
99 # Mixin tools for apps that use Sessions
103 #-----------------------------------------------------------------------------
100 #-----------------------------------------------------------------------------
104
101
105 session_aliases = dict(
102 session_aliases = dict(
106 ident = 'Session.session',
103 ident = 'Session.session',
107 user = 'Session.username',
104 user = 'Session.username',
108 keyfile = 'Session.keyfile',
105 keyfile = 'Session.keyfile',
109 )
106 )
110
107
111 session_flags = {
108 session_flags = {
112 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
109 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
113 'keyfile' : '' }},
110 'keyfile' : '' }},
114 """Use HMAC digests for authentication of messages.
111 """Use HMAC digests for authentication of messages.
115 Setting this flag will generate a new UUID to use as the HMAC key.
112 Setting this flag will generate a new UUID to use as the HMAC key.
116 """),
113 """),
117 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
114 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
118 """Don't authenticate messages."""),
115 """Don't authenticate messages."""),
119 }
116 }
120
117
121 def default_secure(cfg):
118 def default_secure(cfg):
122 """Set the default behavior for a config environment to be secure.
119 """Set the default behavior for a config environment to be secure.
123
120
124 If Session.key/keyfile have not been set, set Session.key to
121 If Session.key/keyfile have not been set, set Session.key to
125 a new random UUID.
122 a new random UUID.
126 """
123 """
127
124
128 if 'Session' in cfg:
125 if 'Session' in cfg:
129 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
126 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
130 return
127 return
131 # key/keyfile not specified, generate new UUID:
128 # key/keyfile not specified, generate new UUID:
132 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
129 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
133
130
134
131
135 #-----------------------------------------------------------------------------
132 #-----------------------------------------------------------------------------
136 # Classes
133 # Classes
137 #-----------------------------------------------------------------------------
134 #-----------------------------------------------------------------------------
138
135
139 class SessionFactory(LoggingConfigurable):
136 class SessionFactory(LoggingConfigurable):
140 """The Base class for configurables that have a Session, Context, logger,
137 """The Base class for configurables that have a Session, Context, logger,
141 and IOLoop.
138 and IOLoop.
142 """
139 """
143
140
144 logname = Unicode('')
141 logname = Unicode('')
145 def _logname_changed(self, name, old, new):
142 def _logname_changed(self, name, old, new):
146 self.log = logging.getLogger(new)
143 self.log = logging.getLogger(new)
147
144
148 # not configurable:
145 # not configurable:
149 context = Instance('zmq.Context')
146 context = Instance('zmq.Context')
150 def _context_default(self):
147 def _context_default(self):
151 return zmq.Context.instance()
148 return zmq.Context.instance()
152
149
153 session = Instance('IPython.kernel.zmq.session.Session')
150 session = Instance('IPython.kernel.zmq.session.Session')
154
151
155 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
152 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
156 def _loop_default(self):
153 def _loop_default(self):
157 return IOLoop.instance()
154 return IOLoop.instance()
158
155
159 def __init__(self, **kwargs):
156 def __init__(self, **kwargs):
160 super(SessionFactory, self).__init__(**kwargs)
157 super(SessionFactory, self).__init__(**kwargs)
161
158
162 if self.session is None:
159 if self.session is None:
163 # construct the session
160 # construct the session
164 self.session = Session(**kwargs)
161 self.session = Session(**kwargs)
165
162
166
163
167 class Message(object):
164 class Message(object):
168 """A simple message object that maps dict keys to attributes.
165 """A simple message object that maps dict keys to attributes.
169
166
170 A Message can be created from a dict and a dict from a Message instance
167 A Message can be created from a dict and a dict from a Message instance
171 simply by calling dict(msg_obj)."""
168 simply by calling dict(msg_obj)."""
172
169
173 def __init__(self, msg_dict):
170 def __init__(self, msg_dict):
174 dct = self.__dict__
171 dct = self.__dict__
175 for k, v in iteritems(dict(msg_dict)):
172 for k, v in iteritems(dict(msg_dict)):
176 if isinstance(v, dict):
173 if isinstance(v, dict):
177 v = Message(v)
174 v = Message(v)
178 dct[k] = v
175 dct[k] = v
179
176
180 # Having this iterator lets dict(msg_obj) work out of the box.
177 # Having this iterator lets dict(msg_obj) work out of the box.
181 def __iter__(self):
178 def __iter__(self):
182 return iter(iteritems(self.__dict__))
179 return iter(iteritems(self.__dict__))
183
180
184 def __repr__(self):
181 def __repr__(self):
185 return repr(self.__dict__)
182 return repr(self.__dict__)
186
183
187 def __str__(self):
184 def __str__(self):
188 return pprint.pformat(self.__dict__)
185 return pprint.pformat(self.__dict__)
189
186
190 def __contains__(self, k):
187 def __contains__(self, k):
191 return k in self.__dict__
188 return k in self.__dict__
192
189
193 def __getitem__(self, k):
190 def __getitem__(self, k):
194 return self.__dict__[k]
191 return self.__dict__[k]
195
192
196
193
197 def msg_header(msg_id, msg_type, username, session):
194 def msg_header(msg_id, msg_type, username, session):
198 date = datetime.now()
195 date = datetime.now()
199 version = kernel_protocol_version
196 version = kernel_protocol_version
200 return locals()
197 return locals()
201
198
202 def extract_header(msg_or_header):
199 def extract_header(msg_or_header):
203 """Given a message or header, return the header."""
200 """Given a message or header, return the header."""
204 if not msg_or_header:
201 if not msg_or_header:
205 return {}
202 return {}
206 try:
203 try:
207 # See if msg_or_header is the entire message.
204 # See if msg_or_header is the entire message.
208 h = msg_or_header['header']
205 h = msg_or_header['header']
209 except KeyError:
206 except KeyError:
210 try:
207 try:
211 # See if msg_or_header is just the header
208 # See if msg_or_header is just the header
212 h = msg_or_header['msg_id']
209 h = msg_or_header['msg_id']
213 except KeyError:
210 except KeyError:
214 raise
211 raise
215 else:
212 else:
216 h = msg_or_header
213 h = msg_or_header
217 if not isinstance(h, dict):
214 if not isinstance(h, dict):
218 h = dict(h)
215 h = dict(h)
219 return h
216 return h
220
217
221 class Session(Configurable):
218 class Session(Configurable):
222 """Object for handling serialization and sending of messages.
219 """Object for handling serialization and sending of messages.
223
220
224 The Session object handles building messages and sending them
221 The Session object handles building messages and sending them
225 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
222 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
226 other over the network via Session objects, and only need to work with the
223 other over the network via Session objects, and only need to work with the
227 dict-based IPython message spec. The Session will handle
224 dict-based IPython message spec. The Session will handle
228 serialization/deserialization, security, and metadata.
225 serialization/deserialization, security, and metadata.
229
226
230 Sessions support configurable serialization via packer/unpacker traits,
227 Sessions support configurable serialization via packer/unpacker traits,
231 and signing with HMAC digests via the key/keyfile traits.
228 and signing with HMAC digests via the key/keyfile traits.
232
229
233 Parameters
230 Parameters
234 ----------
231 ----------
235
232
236 debug : bool
233 debug : bool
237 whether to trigger extra debugging statements
234 whether to trigger extra debugging statements
238 packer/unpacker : str : 'json', 'pickle' or import_string
235 packer/unpacker : str : 'json', 'pickle' or import_string
239 importstrings for methods to serialize message parts. If just
236 importstrings for methods to serialize message parts. If just
240 'json' or 'pickle', predefined JSON and pickle packers will be used.
237 'json' or 'pickle', predefined JSON and pickle packers will be used.
241 Otherwise, the entire importstring must be used.
238 Otherwise, the entire importstring must be used.
242
239
243 The functions must accept at least valid JSON input, and output *bytes*.
240 The functions must accept at least valid JSON input, and output *bytes*.
244
241
245 For example, to use msgpack:
242 For example, to use msgpack:
246 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
243 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
247 pack/unpack : callables
244 pack/unpack : callables
248 You can also set the pack/unpack callables for serialization directly.
245 You can also set the pack/unpack callables for serialization directly.
249 session : bytes
246 session : bytes
250 the ID of this Session object. The default is to generate a new UUID.
247 the ID of this Session object. The default is to generate a new UUID.
251 username : unicode
248 username : unicode
252 username added to message headers. The default is to ask the OS.
249 username added to message headers. The default is to ask the OS.
253 key : bytes
250 key : bytes
254 The key used to initialize an HMAC signature. If unset, messages
251 The key used to initialize an HMAC signature. If unset, messages
255 will not be signed or checked.
252 will not be signed or checked.
256 keyfile : filepath
253 keyfile : filepath
257 The file containing a key. If this is set, `key` will be initialized
254 The file containing a key. If this is set, `key` will be initialized
258 to the contents of the file.
255 to the contents of the file.
259
256
260 """
257 """
261
258
262 debug=Bool(False, config=True, help="""Debug output in the Session""")
259 debug=Bool(False, config=True, help="""Debug output in the Session""")
263
260
264 packer = DottedObjectName('json',config=True,
261 packer = DottedObjectName('json',config=True,
265 help="""The name of the packer for serializing messages.
262 help="""The name of the packer for serializing messages.
266 Should be one of 'json', 'pickle', or an import name
263 Should be one of 'json', 'pickle', or an import name
267 for a custom callable serializer.""")
264 for a custom callable serializer.""")
268 def _packer_changed(self, name, old, new):
265 def _packer_changed(self, name, old, new):
269 if new.lower() == 'json':
266 if new.lower() == 'json':
270 self.pack = json_packer
267 self.pack = json_packer
271 self.unpack = json_unpacker
268 self.unpack = json_unpacker
272 self.unpacker = new
269 self.unpacker = new
273 elif new.lower() == 'pickle':
270 elif new.lower() == 'pickle':
274 self.pack = pickle_packer
271 self.pack = pickle_packer
275 self.unpack = pickle_unpacker
272 self.unpack = pickle_unpacker
276 self.unpacker = new
273 self.unpacker = new
277 else:
274 else:
278 self.pack = import_item(str(new))
275 self.pack = import_item(str(new))
279
276
280 unpacker = DottedObjectName('json', config=True,
277 unpacker = DottedObjectName('json', config=True,
281 help="""The name of the unpacker for unserializing messages.
278 help="""The name of the unpacker for unserializing messages.
282 Only used with custom functions for `packer`.""")
279 Only used with custom functions for `packer`.""")
283 def _unpacker_changed(self, name, old, new):
280 def _unpacker_changed(self, name, old, new):
284 if new.lower() == 'json':
281 if new.lower() == 'json':
285 self.pack = json_packer
282 self.pack = json_packer
286 self.unpack = json_unpacker
283 self.unpack = json_unpacker
287 self.packer = new
284 self.packer = new
288 elif new.lower() == 'pickle':
285 elif new.lower() == 'pickle':
289 self.pack = pickle_packer
286 self.pack = pickle_packer
290 self.unpack = pickle_unpacker
287 self.unpack = pickle_unpacker
291 self.packer = new
288 self.packer = new
292 else:
289 else:
293 self.unpack = import_item(str(new))
290 self.unpack = import_item(str(new))
294
291
295 session = CUnicode(u'', config=True,
292 session = CUnicode(u'', config=True,
296 help="""The UUID identifying this session.""")
293 help="""The UUID identifying this session.""")
297 def _session_default(self):
294 def _session_default(self):
298 u = unicode_type(uuid.uuid4())
295 u = unicode_type(uuid.uuid4())
299 self.bsession = u.encode('ascii')
296 self.bsession = u.encode('ascii')
300 return u
297 return u
301
298
302 def _session_changed(self, name, old, new):
299 def _session_changed(self, name, old, new):
303 self.bsession = self.session.encode('ascii')
300 self.bsession = self.session.encode('ascii')
304
301
305 # bsession is the session as bytes
302 # bsession is the session as bytes
306 bsession = CBytes(b'')
303 bsession = CBytes(b'')
307
304
308 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
305 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
309 help="""Username for the Session. Default is your system username.""",
306 help="""Username for the Session. Default is your system username.""",
310 config=True)
307 config=True)
311
308
312 metadata = Dict({}, config=True,
309 metadata = Dict({}, config=True,
313 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
310 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
314
311
315 # if 0, no adapting to do.
312 # if 0, no adapting to do.
316 adapt_version = Integer(0)
313 adapt_version = Integer(0)
317
314
318 # message signature related traits:
315 # message signature related traits:
319
316
320 key = CBytes(b'', config=True,
317 key = CBytes(b'', config=True,
321 help="""execution key, for extra authentication.""")
318 help="""execution key, for extra authentication.""")
322 def _key_changed(self):
319 def _key_changed(self):
323 self._new_auth()
320 self._new_auth()
324
321
325 signature_scheme = Unicode('hmac-sha256', config=True,
322 signature_scheme = Unicode('hmac-sha256', config=True,
326 help="""The digest scheme used to construct the message signatures.
323 help="""The digest scheme used to construct the message signatures.
327 Must have the form 'hmac-HASH'.""")
324 Must have the form 'hmac-HASH'.""")
328 def _signature_scheme_changed(self, name, old, new):
325 def _signature_scheme_changed(self, name, old, new):
329 if not new.startswith('hmac-'):
326 if not new.startswith('hmac-'):
330 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
327 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
331 hash_name = new.split('-', 1)[1]
328 hash_name = new.split('-', 1)[1]
332 try:
329 try:
333 self.digest_mod = getattr(hashlib, hash_name)
330 self.digest_mod = getattr(hashlib, hash_name)
334 except AttributeError:
331 except AttributeError:
335 raise TraitError("hashlib has no such attribute: %s" % hash_name)
332 raise TraitError("hashlib has no such attribute: %s" % hash_name)
336 self._new_auth()
333 self._new_auth()
337
334
338 digest_mod = Any()
335 digest_mod = Any()
339 def _digest_mod_default(self):
336 def _digest_mod_default(self):
340 return hashlib.sha256
337 return hashlib.sha256
341
338
342 auth = Instance(hmac.HMAC)
339 auth = Instance(hmac.HMAC)
343
340
344 def _new_auth(self):
341 def _new_auth(self):
345 if self.key:
342 if self.key:
346 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
343 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
347 else:
344 else:
348 self.auth = None
345 self.auth = None
349
346
350 digest_history = Set()
347 digest_history = Set()
351 digest_history_size = Integer(2**16, config=True,
348 digest_history_size = Integer(2**16, config=True,
352 help="""The maximum number of digests to remember.
349 help="""The maximum number of digests to remember.
353
350
354 The digest history will be culled when it exceeds this value.
351 The digest history will be culled when it exceeds this value.
355 """
352 """
356 )
353 )
357
354
358 keyfile = Unicode('', config=True,
355 keyfile = Unicode('', config=True,
359 help="""path to file containing execution key.""")
356 help="""path to file containing execution key.""")
360 def _keyfile_changed(self, name, old, new):
357 def _keyfile_changed(self, name, old, new):
361 with open(new, 'rb') as f:
358 with open(new, 'rb') as f:
362 self.key = f.read().strip()
359 self.key = f.read().strip()
363
360
364 # for protecting against sends from forks
361 # for protecting against sends from forks
365 pid = Integer()
362 pid = Integer()
366
363
367 # serialization traits:
364 # serialization traits:
368
365
369 pack = Any(default_packer) # the actual packer function
366 pack = Any(default_packer) # the actual packer function
370 def _pack_changed(self, name, old, new):
367 def _pack_changed(self, name, old, new):
371 if not callable(new):
368 if not callable(new):
372 raise TypeError("packer must be callable, not %s"%type(new))
369 raise TypeError("packer must be callable, not %s"%type(new))
373
370
374 unpack = Any(default_unpacker) # the actual packer function
371 unpack = Any(default_unpacker) # the actual packer function
375 def _unpack_changed(self, name, old, new):
372 def _unpack_changed(self, name, old, new):
376 # unpacker is not checked - it is assumed to be
373 # unpacker is not checked - it is assumed to be
377 if not callable(new):
374 if not callable(new):
378 raise TypeError("unpacker must be callable, not %s"%type(new))
375 raise TypeError("unpacker must be callable, not %s"%type(new))
379
376
380 # thresholds:
377 # thresholds:
381 copy_threshold = Integer(2**16, config=True,
378 copy_threshold = Integer(2**16, config=True,
382 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
379 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
383 buffer_threshold = Integer(MAX_BYTES, config=True,
380 buffer_threshold = Integer(MAX_BYTES, config=True,
384 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
381 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
385 item_threshold = Integer(MAX_ITEMS, config=True,
382 item_threshold = Integer(MAX_ITEMS, config=True,
386 help="""The maximum number of items for a container to be introspected for custom serialization.
383 help="""The maximum number of items for a container to be introspected for custom serialization.
387 Containers larger than this are pickled outright.
384 Containers larger than this are pickled outright.
388 """
385 """
389 )
386 )
390
387
391
388
392 def __init__(self, **kwargs):
389 def __init__(self, **kwargs):
393 """create a Session object
390 """create a Session object
394
391
395 Parameters
392 Parameters
396 ----------
393 ----------
397
394
398 debug : bool
395 debug : bool
399 whether to trigger extra debugging statements
396 whether to trigger extra debugging statements
400 packer/unpacker : str : 'json', 'pickle' or import_string
397 packer/unpacker : str : 'json', 'pickle' or import_string
401 importstrings for methods to serialize message parts. If just
398 importstrings for methods to serialize message parts. If just
402 'json' or 'pickle', predefined JSON and pickle packers will be used.
399 'json' or 'pickle', predefined JSON and pickle packers will be used.
403 Otherwise, the entire importstring must be used.
400 Otherwise, the entire importstring must be used.
404
401
405 The functions must accept at least valid JSON input, and output
402 The functions must accept at least valid JSON input, and output
406 *bytes*.
403 *bytes*.
407
404
408 For example, to use msgpack:
405 For example, to use msgpack:
409 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
406 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
410 pack/unpack : callables
407 pack/unpack : callables
411 You can also set the pack/unpack callables for serialization
408 You can also set the pack/unpack callables for serialization
412 directly.
409 directly.
413 session : unicode (must be ascii)
410 session : unicode (must be ascii)
414 the ID of this Session object. The default is to generate a new
411 the ID of this Session object. The default is to generate a new
415 UUID.
412 UUID.
416 bsession : bytes
413 bsession : bytes
417 The session as bytes
414 The session as bytes
418 username : unicode
415 username : unicode
419 username added to message headers. The default is to ask the OS.
416 username added to message headers. The default is to ask the OS.
420 key : bytes
417 key : bytes
421 The key used to initialize an HMAC signature. If unset, messages
418 The key used to initialize an HMAC signature. If unset, messages
422 will not be signed or checked.
419 will not be signed or checked.
423 signature_scheme : str
420 signature_scheme : str
424 The message digest scheme. Currently must be of the form 'hmac-HASH',
421 The message digest scheme. Currently must be of the form 'hmac-HASH',
425 where 'HASH' is a hashing function available in Python's hashlib.
422 where 'HASH' is a hashing function available in Python's hashlib.
426 The default is 'hmac-sha256'.
423 The default is 'hmac-sha256'.
427 This is ignored if 'key' is empty.
424 This is ignored if 'key' is empty.
428 keyfile : filepath
425 keyfile : filepath
429 The file containing a key. If this is set, `key` will be
426 The file containing a key. If this is set, `key` will be
430 initialized to the contents of the file.
427 initialized to the contents of the file.
431 """
428 """
432 super(Session, self).__init__(**kwargs)
429 super(Session, self).__init__(**kwargs)
433 self._check_packers()
430 self._check_packers()
434 self.none = self.pack({})
431 self.none = self.pack({})
435 # ensure self._session_default() if necessary, so bsession is defined:
432 # ensure self._session_default() if necessary, so bsession is defined:
436 self.session
433 self.session
437 self.pid = os.getpid()
434 self.pid = os.getpid()
438
435
439 @property
436 @property
440 def msg_id(self):
437 def msg_id(self):
441 """always return new uuid"""
438 """always return new uuid"""
442 return str(uuid.uuid4())
439 return str(uuid.uuid4())
443
440
444 def _check_packers(self):
441 def _check_packers(self):
445 """check packers for datetime support."""
442 """check packers for datetime support."""
446 pack = self.pack
443 pack = self.pack
447 unpack = self.unpack
444 unpack = self.unpack
448
445
449 # check simple serialization
446 # check simple serialization
450 msg = dict(a=[1,'hi'])
447 msg = dict(a=[1,'hi'])
451 try:
448 try:
452 packed = pack(msg)
449 packed = pack(msg)
453 except Exception as e:
450 except Exception as e:
454 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
451 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
455 if self.packer == 'json':
452 if self.packer == 'json':
456 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
453 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
457 else:
454 else:
458 jsonmsg = ""
455 jsonmsg = ""
459 raise ValueError(
456 raise ValueError(
460 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
457 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
461 )
458 )
462
459
463 # ensure packed message is bytes
460 # ensure packed message is bytes
464 if not isinstance(packed, bytes):
461 if not isinstance(packed, bytes):
465 raise ValueError("message packed to %r, but bytes are required"%type(packed))
462 raise ValueError("message packed to %r, but bytes are required"%type(packed))
466
463
467 # check that unpack is pack's inverse
464 # check that unpack is pack's inverse
468 try:
465 try:
469 unpacked = unpack(packed)
466 unpacked = unpack(packed)
470 assert unpacked == msg
467 assert unpacked == msg
471 except Exception as e:
468 except Exception as e:
472 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
469 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
473 if self.packer == 'json':
470 if self.packer == 'json':
474 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
471 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
475 else:
472 else:
476 jsonmsg = ""
473 jsonmsg = ""
477 raise ValueError(
474 raise ValueError(
478 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
475 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
479 )
476 )
480
477
481 # check datetime support
478 # check datetime support
482 msg = dict(t=datetime.now())
479 msg = dict(t=datetime.now())
483 try:
480 try:
484 unpacked = unpack(pack(msg))
481 unpacked = unpack(pack(msg))
485 if isinstance(unpacked['t'], datetime):
482 if isinstance(unpacked['t'], datetime):
486 raise ValueError("Shouldn't deserialize to datetime")
483 raise ValueError("Shouldn't deserialize to datetime")
487 except Exception:
484 except Exception:
488 self.pack = lambda o: pack(squash_dates(o))
485 self.pack = lambda o: pack(squash_dates(o))
489 self.unpack = lambda s: unpack(s)
486 self.unpack = lambda s: unpack(s)
490
487
491 def msg_header(self, msg_type):
488 def msg_header(self, msg_type):
492 return msg_header(self.msg_id, msg_type, self.username, self.session)
489 return msg_header(self.msg_id, msg_type, self.username, self.session)
493
490
494 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
491 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
495 """Return the nested message dict.
492 """Return the nested message dict.
496
493
497 This format is different from what is sent over the wire. The
494 This format is different from what is sent over the wire. The
498 serialize/unserialize methods converts this nested message dict to the wire
495 serialize/unserialize methods converts this nested message dict to the wire
499 format, which is a list of message parts.
496 format, which is a list of message parts.
500 """
497 """
501 msg = {}
498 msg = {}
502 header = self.msg_header(msg_type) if header is None else header
499 header = self.msg_header(msg_type) if header is None else header
503 msg['header'] = header
500 msg['header'] = header
504 msg['msg_id'] = header['msg_id']
501 msg['msg_id'] = header['msg_id']
505 msg['msg_type'] = header['msg_type']
502 msg['msg_type'] = header['msg_type']
506 msg['parent_header'] = {} if parent is None else extract_header(parent)
503 msg['parent_header'] = {} if parent is None else extract_header(parent)
507 msg['content'] = {} if content is None else content
504 msg['content'] = {} if content is None else content
508 msg['metadata'] = self.metadata.copy()
505 msg['metadata'] = self.metadata.copy()
509 if metadata is not None:
506 if metadata is not None:
510 msg['metadata'].update(metadata)
507 msg['metadata'].update(metadata)
511 return msg
508 return msg
512
509
513 def sign(self, msg_list):
510 def sign(self, msg_list):
514 """Sign a message with HMAC digest. If no auth, return b''.
511 """Sign a message with HMAC digest. If no auth, return b''.
515
512
516 Parameters
513 Parameters
517 ----------
514 ----------
518 msg_list : list
515 msg_list : list
519 The [p_header,p_parent,p_content] part of the message list.
516 The [p_header,p_parent,p_content] part of the message list.
520 """
517 """
521 if self.auth is None:
518 if self.auth is None:
522 return b''
519 return b''
523 h = self.auth.copy()
520 h = self.auth.copy()
524 for m in msg_list:
521 for m in msg_list:
525 h.update(m)
522 h.update(m)
526 return str_to_bytes(h.hexdigest())
523 return str_to_bytes(h.hexdigest())
527
524
528 def serialize(self, msg, ident=None):
525 def serialize(self, msg, ident=None):
529 """Serialize the message components to bytes.
526 """Serialize the message components to bytes.
530
527
531 This is roughly the inverse of unserialize. The serialize/unserialize
528 This is roughly the inverse of unserialize. The serialize/unserialize
532 methods work with full message lists, whereas pack/unpack work with
529 methods work with full message lists, whereas pack/unpack work with
533 the individual message parts in the message list.
530 the individual message parts in the message list.
534
531
535 Parameters
532 Parameters
536 ----------
533 ----------
537 msg : dict or Message
534 msg : dict or Message
538 The next message dict as returned by the self.msg method.
535 The next message dict as returned by the self.msg method.
539
536
540 Returns
537 Returns
541 -------
538 -------
542 msg_list : list
539 msg_list : list
543 The list of bytes objects to be sent with the format::
540 The list of bytes objects to be sent with the format::
544
541
545 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
542 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
546 p_metadata, p_content, buffer1, buffer2, ...]
543 p_metadata, p_content, buffer1, buffer2, ...]
547
544
548 In this list, the ``p_*`` entities are the packed or serialized
545 In this list, the ``p_*`` entities are the packed or serialized
549 versions, so if JSON is used, these are utf8 encoded JSON strings.
546 versions, so if JSON is used, these are utf8 encoded JSON strings.
550 """
547 """
551 content = msg.get('content', {})
548 content = msg.get('content', {})
552 if content is None:
549 if content is None:
553 content = self.none
550 content = self.none
554 elif isinstance(content, dict):
551 elif isinstance(content, dict):
555 content = self.pack(content)
552 content = self.pack(content)
556 elif isinstance(content, bytes):
553 elif isinstance(content, bytes):
557 # content is already packed, as in a relayed message
554 # content is already packed, as in a relayed message
558 pass
555 pass
559 elif isinstance(content, unicode_type):
556 elif isinstance(content, unicode_type):
560 # should be bytes, but JSON often spits out unicode
557 # should be bytes, but JSON often spits out unicode
561 content = content.encode('utf8')
558 content = content.encode('utf8')
562 else:
559 else:
563 raise TypeError("Content incorrect type: %s"%type(content))
560 raise TypeError("Content incorrect type: %s"%type(content))
564
561
565 real_message = [self.pack(msg['header']),
562 real_message = [self.pack(msg['header']),
566 self.pack(msg['parent_header']),
563 self.pack(msg['parent_header']),
567 self.pack(msg['metadata']),
564 self.pack(msg['metadata']),
568 content,
565 content,
569 ]
566 ]
570
567
571 to_send = []
568 to_send = []
572
569
573 if isinstance(ident, list):
570 if isinstance(ident, list):
574 # accept list of idents
571 # accept list of idents
575 to_send.extend(ident)
572 to_send.extend(ident)
576 elif ident is not None:
573 elif ident is not None:
577 to_send.append(ident)
574 to_send.append(ident)
578 to_send.append(DELIM)
575 to_send.append(DELIM)
579
576
580 signature = self.sign(real_message)
577 signature = self.sign(real_message)
581 to_send.append(signature)
578 to_send.append(signature)
582
579
583 to_send.extend(real_message)
580 to_send.extend(real_message)
584
581
585 return to_send
582 return to_send
586
583
587 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
584 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
588 buffers=None, track=False, header=None, metadata=None):
585 buffers=None, track=False, header=None, metadata=None):
589 """Build and send a message via stream or socket.
586 """Build and send a message via stream or socket.
590
587
591 The message format used by this function internally is as follows:
588 The message format used by this function internally is as follows:
592
589
593 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
590 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
594 buffer1,buffer2,...]
591 buffer1,buffer2,...]
595
592
596 The serialize/unserialize methods convert the nested message dict into this
593 The serialize/unserialize methods convert the nested message dict into this
597 format.
594 format.
598
595
599 Parameters
596 Parameters
600 ----------
597 ----------
601
598
602 stream : zmq.Socket or ZMQStream
599 stream : zmq.Socket or ZMQStream
603 The socket-like object used to send the data.
600 The socket-like object used to send the data.
604 msg_or_type : str or Message/dict
601 msg_or_type : str or Message/dict
605 Normally, msg_or_type will be a msg_type unless a message is being
602 Normally, msg_or_type will be a msg_type unless a message is being
606 sent more than once. If a header is supplied, this can be set to
603 sent more than once. If a header is supplied, this can be set to
607 None and the msg_type will be pulled from the header.
604 None and the msg_type will be pulled from the header.
608
605
609 content : dict or None
606 content : dict or None
610 The content of the message (ignored if msg_or_type is a message).
607 The content of the message (ignored if msg_or_type is a message).
611 header : dict or None
608 header : dict or None
612 The header dict for the message (ignored if msg_to_type is a message).
609 The header dict for the message (ignored if msg_to_type is a message).
613 parent : Message or dict or None
610 parent : Message or dict or None
614 The parent or parent header describing the parent of this message
611 The parent or parent header describing the parent of this message
615 (ignored if msg_or_type is a message).
612 (ignored if msg_or_type is a message).
616 ident : bytes or list of bytes
613 ident : bytes or list of bytes
617 The zmq.IDENTITY routing path.
614 The zmq.IDENTITY routing path.
618 metadata : dict or None
615 metadata : dict or None
619 The metadata describing the message
616 The metadata describing the message
620 buffers : list or None
617 buffers : list or None
621 The already-serialized buffers to be appended to the message.
618 The already-serialized buffers to be appended to the message.
622 track : bool
619 track : bool
623 Whether to track. Only for use with Sockets, because ZMQStream
620 Whether to track. Only for use with Sockets, because ZMQStream
624 objects cannot track messages.
621 objects cannot track messages.
625
622
626
623
627 Returns
624 Returns
628 -------
625 -------
629 msg : dict
626 msg : dict
630 The constructed message.
627 The constructed message.
631 """
628 """
632 if not isinstance(stream, zmq.Socket):
629 if not isinstance(stream, zmq.Socket):
633 # ZMQStreams and dummy sockets do not support tracking.
630 # ZMQStreams and dummy sockets do not support tracking.
634 track = False
631 track = False
635
632
636 if isinstance(msg_or_type, (Message, dict)):
633 if isinstance(msg_or_type, (Message, dict)):
637 # We got a Message or message dict, not a msg_type so don't
634 # We got a Message or message dict, not a msg_type so don't
638 # build a new Message.
635 # build a new Message.
639 msg = msg_or_type
636 msg = msg_or_type
640 else:
637 else:
641 msg = self.msg(msg_or_type, content=content, parent=parent,
638 msg = self.msg(msg_or_type, content=content, parent=parent,
642 header=header, metadata=metadata)
639 header=header, metadata=metadata)
643 if not os.getpid() == self.pid:
640 if not os.getpid() == self.pid:
644 io.rprint("WARNING: attempted to send message from fork")
641 io.rprint("WARNING: attempted to send message from fork")
645 io.rprint(msg)
642 io.rprint(msg)
646 return
643 return
647 buffers = [] if buffers is None else buffers
644 buffers = [] if buffers is None else buffers
648 if self.adapt_version:
645 if self.adapt_version:
649 msg = adapt(msg, self.adapt_version)
646 msg = adapt(msg, self.adapt_version)
650 to_send = self.serialize(msg, ident)
647 to_send = self.serialize(msg, ident)
651 to_send.extend(buffers)
648 to_send.extend(buffers)
652 longest = max([ len(s) for s in to_send ])
649 longest = max([ len(s) for s in to_send ])
653 copy = (longest < self.copy_threshold)
650 copy = (longest < self.copy_threshold)
654
651
655 if buffers and track and not copy:
652 if buffers and track and not copy:
656 # only really track when we are doing zero-copy buffers
653 # only really track when we are doing zero-copy buffers
657 tracker = stream.send_multipart(to_send, copy=False, track=True)
654 tracker = stream.send_multipart(to_send, copy=False, track=True)
658 else:
655 else:
659 # use dummy tracker, which will be done immediately
656 # use dummy tracker, which will be done immediately
660 tracker = DONE
657 tracker = DONE
661 stream.send_multipart(to_send, copy=copy)
658 stream.send_multipart(to_send, copy=copy)
662
659
663 if self.debug:
660 if self.debug:
664 pprint.pprint(msg)
661 pprint.pprint(msg)
665 pprint.pprint(to_send)
662 pprint.pprint(to_send)
666 pprint.pprint(buffers)
663 pprint.pprint(buffers)
667
664
668 msg['tracker'] = tracker
665 msg['tracker'] = tracker
669
666
670 return msg
667 return msg
671
668
672 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
669 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
673 """Send a raw message via ident path.
670 """Send a raw message via ident path.
674
671
675 This method is used to send a already serialized message.
672 This method is used to send a already serialized message.
676
673
677 Parameters
674 Parameters
678 ----------
675 ----------
679 stream : ZMQStream or Socket
676 stream : ZMQStream or Socket
680 The ZMQ stream or socket to use for sending the message.
677 The ZMQ stream or socket to use for sending the message.
681 msg_list : list
678 msg_list : list
682 The serialized list of messages to send. This only includes the
679 The serialized list of messages to send. This only includes the
683 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
680 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
684 the message.
681 the message.
685 ident : ident or list
682 ident : ident or list
686 A single ident or a list of idents to use in sending.
683 A single ident or a list of idents to use in sending.
687 """
684 """
688 to_send = []
685 to_send = []
689 if isinstance(ident, bytes):
686 if isinstance(ident, bytes):
690 ident = [ident]
687 ident = [ident]
691 if ident is not None:
688 if ident is not None:
692 to_send.extend(ident)
689 to_send.extend(ident)
693
690
694 to_send.append(DELIM)
691 to_send.append(DELIM)
695 to_send.append(self.sign(msg_list))
692 to_send.append(self.sign(msg_list))
696 to_send.extend(msg_list)
693 to_send.extend(msg_list)
697 stream.send_multipart(to_send, flags, copy=copy)
694 stream.send_multipart(to_send, flags, copy=copy)
698
695
699 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
696 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
700 """Receive and unpack a message.
697 """Receive and unpack a message.
701
698
702 Parameters
699 Parameters
703 ----------
700 ----------
704 socket : ZMQStream or Socket
701 socket : ZMQStream or Socket
705 The socket or stream to use in receiving.
702 The socket or stream to use in receiving.
706
703
707 Returns
704 Returns
708 -------
705 -------
709 [idents], msg
706 [idents], msg
710 [idents] is a list of idents and msg is a nested message dict of
707 [idents] is a list of idents and msg is a nested message dict of
711 same format as self.msg returns.
708 same format as self.msg returns.
712 """
709 """
713 if isinstance(socket, ZMQStream):
710 if isinstance(socket, ZMQStream):
714 socket = socket.socket
711 socket = socket.socket
715 try:
712 try:
716 msg_list = socket.recv_multipart(mode, copy=copy)
713 msg_list = socket.recv_multipart(mode, copy=copy)
717 except zmq.ZMQError as e:
714 except zmq.ZMQError as e:
718 if e.errno == zmq.EAGAIN:
715 if e.errno == zmq.EAGAIN:
719 # We can convert EAGAIN to None as we know in this case
716 # We can convert EAGAIN to None as we know in this case
720 # recv_multipart won't return None.
717 # recv_multipart won't return None.
721 return None,None
718 return None,None
722 else:
719 else:
723 raise
720 raise
724 # split multipart message into identity list and message dict
721 # split multipart message into identity list and message dict
725 # invalid large messages can cause very expensive string comparisons
722 # invalid large messages can cause very expensive string comparisons
726 idents, msg_list = self.feed_identities(msg_list, copy)
723 idents, msg_list = self.feed_identities(msg_list, copy)
727 try:
724 try:
728 return idents, self.unserialize(msg_list, content=content, copy=copy)
725 return idents, self.unserialize(msg_list, content=content, copy=copy)
729 except Exception as e:
726 except Exception as e:
730 # TODO: handle it
727 # TODO: handle it
731 raise e
728 raise e
732
729
733 def feed_identities(self, msg_list, copy=True):
730 def feed_identities(self, msg_list, copy=True):
734 """Split the identities from the rest of the message.
731 """Split the identities from the rest of the message.
735
732
736 Feed until DELIM is reached, then return the prefix as idents and
733 Feed until DELIM is reached, then return the prefix as idents and
737 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
734 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
738 but that would be silly.
735 but that would be silly.
739
736
740 Parameters
737 Parameters
741 ----------
738 ----------
742 msg_list : a list of Message or bytes objects
739 msg_list : a list of Message or bytes objects
743 The message to be split.
740 The message to be split.
744 copy : bool
741 copy : bool
745 flag determining whether the arguments are bytes or Messages
742 flag determining whether the arguments are bytes or Messages
746
743
747 Returns
744 Returns
748 -------
745 -------
749 (idents, msg_list) : two lists
746 (idents, msg_list) : two lists
750 idents will always be a list of bytes, each of which is a ZMQ
747 idents will always be a list of bytes, each of which is a ZMQ
751 identity. msg_list will be a list of bytes or zmq.Messages of the
748 identity. msg_list will be a list of bytes or zmq.Messages of the
752 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
749 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
753 should be unpackable/unserializable via self.unserialize at this
750 should be unpackable/unserializable via self.unserialize at this
754 point.
751 point.
755 """
752 """
756 if copy:
753 if copy:
757 idx = msg_list.index(DELIM)
754 idx = msg_list.index(DELIM)
758 return msg_list[:idx], msg_list[idx+1:]
755 return msg_list[:idx], msg_list[idx+1:]
759 else:
756 else:
760 failed = True
757 failed = True
761 for idx,m in enumerate(msg_list):
758 for idx,m in enumerate(msg_list):
762 if m.bytes == DELIM:
759 if m.bytes == DELIM:
763 failed = False
760 failed = False
764 break
761 break
765 if failed:
762 if failed:
766 raise ValueError("DELIM not in msg_list")
763 raise ValueError("DELIM not in msg_list")
767 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
764 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
768 return [m.bytes for m in idents], msg_list
765 return [m.bytes for m in idents], msg_list
769
766
770 def _add_digest(self, signature):
767 def _add_digest(self, signature):
771 """add a digest to history to protect against replay attacks"""
768 """add a digest to history to protect against replay attacks"""
772 if self.digest_history_size == 0:
769 if self.digest_history_size == 0:
773 # no history, never add digests
770 # no history, never add digests
774 return
771 return
775
772
776 self.digest_history.add(signature)
773 self.digest_history.add(signature)
777 if len(self.digest_history) > self.digest_history_size:
774 if len(self.digest_history) > self.digest_history_size:
778 # threshold reached, cull 10%
775 # threshold reached, cull 10%
779 self._cull_digest_history()
776 self._cull_digest_history()
780
777
781 def _cull_digest_history(self):
778 def _cull_digest_history(self):
782 """cull the digest history
779 """cull the digest history
783
780
784 Removes a randomly selected 10% of the digest history
781 Removes a randomly selected 10% of the digest history
785 """
782 """
786 current = len(self.digest_history)
783 current = len(self.digest_history)
787 n_to_cull = max(int(current // 10), current - self.digest_history_size)
784 n_to_cull = max(int(current // 10), current - self.digest_history_size)
788 if n_to_cull >= current:
785 if n_to_cull >= current:
789 self.digest_history = set()
786 self.digest_history = set()
790 return
787 return
791 to_cull = random.sample(self.digest_history, n_to_cull)
788 to_cull = random.sample(self.digest_history, n_to_cull)
792 self.digest_history.difference_update(to_cull)
789 self.digest_history.difference_update(to_cull)
793
790
794 def unserialize(self, msg_list, content=True, copy=True):
791 def unserialize(self, msg_list, content=True, copy=True):
795 """Unserialize a msg_list to a nested message dict.
792 """Unserialize a msg_list to a nested message dict.
796
793
797 This is roughly the inverse of serialize. The serialize/unserialize
794 This is roughly the inverse of serialize. The serialize/unserialize
798 methods work with full message lists, whereas pack/unpack work with
795 methods work with full message lists, whereas pack/unpack work with
799 the individual message parts in the message list.
796 the individual message parts in the message list.
800
797
801 Parameters
798 Parameters
802 ----------
799 ----------
803 msg_list : list of bytes or Message objects
800 msg_list : list of bytes or Message objects
804 The list of message parts of the form [HMAC,p_header,p_parent,
801 The list of message parts of the form [HMAC,p_header,p_parent,
805 p_metadata,p_content,buffer1,buffer2,...].
802 p_metadata,p_content,buffer1,buffer2,...].
806 content : bool (True)
803 content : bool (True)
807 Whether to unpack the content dict (True), or leave it packed
804 Whether to unpack the content dict (True), or leave it packed
808 (False).
805 (False).
809 copy : bool (True)
806 copy : bool (True)
810 Whether to return the bytes (True), or the non-copying Message
807 Whether to return the bytes (True), or the non-copying Message
811 object in each place (False).
808 object in each place (False).
812
809
813 Returns
810 Returns
814 -------
811 -------
815 msg : dict
812 msg : dict
816 The nested message dict with top-level keys [header, parent_header,
813 The nested message dict with top-level keys [header, parent_header,
817 content, buffers].
814 content, buffers].
818 """
815 """
819 minlen = 5
816 minlen = 5
820 message = {}
817 message = {}
821 if not copy:
818 if not copy:
822 for i in range(minlen):
819 for i in range(minlen):
823 msg_list[i] = msg_list[i].bytes
820 msg_list[i] = msg_list[i].bytes
824 if self.auth is not None:
821 if self.auth is not None:
825 signature = msg_list[0]
822 signature = msg_list[0]
826 if not signature:
823 if not signature:
827 raise ValueError("Unsigned Message")
824 raise ValueError("Unsigned Message")
828 if signature in self.digest_history:
825 if signature in self.digest_history:
829 raise ValueError("Duplicate Signature: %r" % signature)
826 raise ValueError("Duplicate Signature: %r" % signature)
830 self._add_digest(signature)
827 self._add_digest(signature)
831 check = self.sign(msg_list[1:5])
828 check = self.sign(msg_list[1:5])
832 if not compare_digest(signature, check):
829 if not compare_digest(signature, check):
833 raise ValueError("Invalid Signature: %r" % signature)
830 raise ValueError("Invalid Signature: %r" % signature)
834 if not len(msg_list) >= minlen:
831 if not len(msg_list) >= minlen:
835 raise TypeError("malformed message, must have at least %i elements"%minlen)
832 raise TypeError("malformed message, must have at least %i elements"%minlen)
836 header = self.unpack(msg_list[1])
833 header = self.unpack(msg_list[1])
837 message['header'] = extract_dates(header)
834 message['header'] = extract_dates(header)
838 message['msg_id'] = header['msg_id']
835 message['msg_id'] = header['msg_id']
839 message['msg_type'] = header['msg_type']
836 message['msg_type'] = header['msg_type']
840 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
837 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
841 message['metadata'] = self.unpack(msg_list[3])
838 message['metadata'] = self.unpack(msg_list[3])
842 if content:
839 if content:
843 message['content'] = self.unpack(msg_list[4])
840 message['content'] = self.unpack(msg_list[4])
844 else:
841 else:
845 message['content'] = msg_list[4]
842 message['content'] = msg_list[4]
846
843
847 message['buffers'] = msg_list[5:]
844 message['buffers'] = msg_list[5:]
848 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
845 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
849 # adapt to the current version
846 # adapt to the current version
850 return adapt(message)
847 return adapt(message)
851 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
848 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
852
849
853 def test_msg2obj():
850 def test_msg2obj():
854 am = dict(x=1)
851 am = dict(x=1)
855 ao = Message(am)
852 ao = Message(am)
856 assert ao.x == am['x']
853 assert ao.x == am['x']
857
854
858 am['y'] = dict(z=1)
855 am['y'] = dict(z=1)
859 ao = Message(am)
856 ao = Message(am)
860 assert ao.y.z == am['y']['z']
857 assert ao.y.z == am['y']['z']
861
858
862 k1, k2 = 'y', 'z'
859 k1, k2 = 'y', 'z'
863 assert ao[k1][k2] == am[k1][k2]
860 assert ao[k1][k2] == am[k1][k2]
864
861
865 am2 = dict(ao)
862 am2 = dict(ao)
866 assert am['x'] == am2['x']
863 assert am['x'] == am2['x']
867 assert am['y']['z'] == am2['y']['z']
864 assert am['y']['z'] == am2['y']['z']
868
865
General Comments 0
You need to be logged in to leave comments. Login now