##// END OF EJS Templates
support Python older than 2.7.7
Paul Ivanov -
Show More
@@ -1,857 +1,864 b''
1 """Session object for building, serializing, sending, and receiving messages in
1 """Session object for building, serializing, sending, and receiving messages in
2 IPython. The Session object supports serialization, HMAC signatures, and
2 IPython. The Session object supports serialization, HMAC signatures, and
3 metadata on messages.
3 metadata on messages.
4
4
5 Also defined here are utilities for working with Sessions:
5 Also defined here are utilities for working with Sessions:
6 * A SessionFactory to be used as a base class for configurables that work with
6 * A SessionFactory to be used as a base class for configurables that work with
7 Sessions.
7 Sessions.
8 * A Message object for convenience that allows attribute-access to the msg dict.
8 * A Message object for convenience that allows attribute-access to the msg dict.
9 """
9 """
10
10
11 # Copyright (c) IPython Development Team.
11 # Copyright (c) IPython Development Team.
12 # Distributed under the terms of the Modified BSD License.
12 # Distributed under the terms of the Modified BSD License.
13
13
14 import hashlib
14 import hashlib
15 import hmac
15 import hmac
16 import logging
16 import logging
17 import os
17 import os
18 import pprint
18 import pprint
19 import random
19 import random
20 import uuid
20 import uuid
21 from datetime import datetime
21 from datetime import datetime
22
22
23 try:
23 try:
24 import cPickle
24 import cPickle
25 pickle = cPickle
25 pickle = cPickle
26 except:
26 except:
27 cPickle = None
27 cPickle = None
28 import pickle
28 import pickle
29
29
30 try:
31 from hmac import compare_digest
32 except ImportError:
33 # Python < 2.7.7
34 def compare_digest(a,b):
35 return a == b
36
30 import zmq
37 import zmq
31 from zmq.utils import jsonapi
38 from zmq.utils import jsonapi
32 from zmq.eventloop.ioloop import IOLoop
39 from zmq.eventloop.ioloop import IOLoop
33 from zmq.eventloop.zmqstream import ZMQStream
40 from zmq.eventloop.zmqstream import ZMQStream
34
41
35 from IPython.core.release import kernel_protocol_version, kernel_protocol_version_info
42 from IPython.core.release import kernel_protocol_version, kernel_protocol_version_info
36 from IPython.config.configurable import Configurable, LoggingConfigurable
43 from IPython.config.configurable import Configurable, LoggingConfigurable
37 from IPython.utils import io
44 from IPython.utils import io
38 from IPython.utils.importstring import import_item
45 from IPython.utils.importstring import import_item
39 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
46 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
40 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
47 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
41 iteritems)
48 iteritems)
42 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
49 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
43 DottedObjectName, CUnicode, Dict, Integer,
50 DottedObjectName, CUnicode, Dict, Integer,
44 TraitError,
51 TraitError,
45 )
52 )
46 from IPython.utils.pickleutil import PICKLE_PROTOCOL
53 from IPython.utils.pickleutil import PICKLE_PROTOCOL
47 from IPython.kernel.adapter import adapt
54 from IPython.kernel.adapter import adapt
48 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
55 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
49
56
50 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
51 # utility functions
58 # utility functions
52 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
53
60
54 def squash_unicode(obj):
61 def squash_unicode(obj):
55 """coerce unicode back to bytestrings."""
62 """coerce unicode back to bytestrings."""
56 if isinstance(obj,dict):
63 if isinstance(obj,dict):
57 for key in obj.keys():
64 for key in obj.keys():
58 obj[key] = squash_unicode(obj[key])
65 obj[key] = squash_unicode(obj[key])
59 if isinstance(key, unicode_type):
66 if isinstance(key, unicode_type):
60 obj[squash_unicode(key)] = obj.pop(key)
67 obj[squash_unicode(key)] = obj.pop(key)
61 elif isinstance(obj, list):
68 elif isinstance(obj, list):
62 for i,v in enumerate(obj):
69 for i,v in enumerate(obj):
63 obj[i] = squash_unicode(v)
70 obj[i] = squash_unicode(v)
64 elif isinstance(obj, unicode_type):
71 elif isinstance(obj, unicode_type):
65 obj = obj.encode('utf8')
72 obj = obj.encode('utf8')
66 return obj
73 return obj
67
74
68 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
69 # globals and defaults
76 # globals and defaults
70 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
71
78
72 # ISO8601-ify datetime objects
79 # ISO8601-ify datetime objects
73 # allow unicode
80 # allow unicode
74 # disallow nan, because it's not actually valid JSON
81 # disallow nan, because it's not actually valid JSON
75 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
82 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
76 ensure_ascii=False, allow_nan=False,
83 ensure_ascii=False, allow_nan=False,
77 )
84 )
78 json_unpacker = lambda s: jsonapi.loads(s)
85 json_unpacker = lambda s: jsonapi.loads(s)
79
86
80 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
87 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
81 pickle_unpacker = pickle.loads
88 pickle_unpacker = pickle.loads
82
89
83 default_packer = json_packer
90 default_packer = json_packer
84 default_unpacker = json_unpacker
91 default_unpacker = json_unpacker
85
92
86 DELIM = b"<IDS|MSG>"
93 DELIM = b"<IDS|MSG>"
87 # singleton dummy tracker, which will always report as done
94 # singleton dummy tracker, which will always report as done
88 DONE = zmq.MessageTracker()
95 DONE = zmq.MessageTracker()
89
96
90 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
91 # Mixin tools for apps that use Sessions
98 # Mixin tools for apps that use Sessions
92 #-----------------------------------------------------------------------------
99 #-----------------------------------------------------------------------------
93
100
94 session_aliases = dict(
101 session_aliases = dict(
95 ident = 'Session.session',
102 ident = 'Session.session',
96 user = 'Session.username',
103 user = 'Session.username',
97 keyfile = 'Session.keyfile',
104 keyfile = 'Session.keyfile',
98 )
105 )
99
106
100 session_flags = {
107 session_flags = {
101 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
108 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
102 'keyfile' : '' }},
109 'keyfile' : '' }},
103 """Use HMAC digests for authentication of messages.
110 """Use HMAC digests for authentication of messages.
104 Setting this flag will generate a new UUID to use as the HMAC key.
111 Setting this flag will generate a new UUID to use as the HMAC key.
105 """),
112 """),
106 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
113 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
107 """Don't authenticate messages."""),
114 """Don't authenticate messages."""),
108 }
115 }
109
116
110 def default_secure(cfg):
117 def default_secure(cfg):
111 """Set the default behavior for a config environment to be secure.
118 """Set the default behavior for a config environment to be secure.
112
119
113 If Session.key/keyfile have not been set, set Session.key to
120 If Session.key/keyfile have not been set, set Session.key to
114 a new random UUID.
121 a new random UUID.
115 """
122 """
116
123
117 if 'Session' in cfg:
124 if 'Session' in cfg:
118 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
125 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
119 return
126 return
120 # key/keyfile not specified, generate new UUID:
127 # key/keyfile not specified, generate new UUID:
121 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
128 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
122
129
123
130
124 #-----------------------------------------------------------------------------
131 #-----------------------------------------------------------------------------
125 # Classes
132 # Classes
126 #-----------------------------------------------------------------------------
133 #-----------------------------------------------------------------------------
127
134
128 class SessionFactory(LoggingConfigurable):
135 class SessionFactory(LoggingConfigurable):
129 """The Base class for configurables that have a Session, Context, logger,
136 """The Base class for configurables that have a Session, Context, logger,
130 and IOLoop.
137 and IOLoop.
131 """
138 """
132
139
133 logname = Unicode('')
140 logname = Unicode('')
134 def _logname_changed(self, name, old, new):
141 def _logname_changed(self, name, old, new):
135 self.log = logging.getLogger(new)
142 self.log = logging.getLogger(new)
136
143
137 # not configurable:
144 # not configurable:
138 context = Instance('zmq.Context')
145 context = Instance('zmq.Context')
139 def _context_default(self):
146 def _context_default(self):
140 return zmq.Context.instance()
147 return zmq.Context.instance()
141
148
142 session = Instance('IPython.kernel.zmq.session.Session')
149 session = Instance('IPython.kernel.zmq.session.Session')
143
150
144 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
151 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
145 def _loop_default(self):
152 def _loop_default(self):
146 return IOLoop.instance()
153 return IOLoop.instance()
147
154
148 def __init__(self, **kwargs):
155 def __init__(self, **kwargs):
149 super(SessionFactory, self).__init__(**kwargs)
156 super(SessionFactory, self).__init__(**kwargs)
150
157
151 if self.session is None:
158 if self.session is None:
152 # construct the session
159 # construct the session
153 self.session = Session(**kwargs)
160 self.session = Session(**kwargs)
154
161
155
162
156 class Message(object):
163 class Message(object):
157 """A simple message object that maps dict keys to attributes.
164 """A simple message object that maps dict keys to attributes.
158
165
159 A Message can be created from a dict and a dict from a Message instance
166 A Message can be created from a dict and a dict from a Message instance
160 simply by calling dict(msg_obj)."""
167 simply by calling dict(msg_obj)."""
161
168
162 def __init__(self, msg_dict):
169 def __init__(self, msg_dict):
163 dct = self.__dict__
170 dct = self.__dict__
164 for k, v in iteritems(dict(msg_dict)):
171 for k, v in iteritems(dict(msg_dict)):
165 if isinstance(v, dict):
172 if isinstance(v, dict):
166 v = Message(v)
173 v = Message(v)
167 dct[k] = v
174 dct[k] = v
168
175
169 # Having this iterator lets dict(msg_obj) work out of the box.
176 # Having this iterator lets dict(msg_obj) work out of the box.
170 def __iter__(self):
177 def __iter__(self):
171 return iter(iteritems(self.__dict__))
178 return iter(iteritems(self.__dict__))
172
179
173 def __repr__(self):
180 def __repr__(self):
174 return repr(self.__dict__)
181 return repr(self.__dict__)
175
182
176 def __str__(self):
183 def __str__(self):
177 return pprint.pformat(self.__dict__)
184 return pprint.pformat(self.__dict__)
178
185
179 def __contains__(self, k):
186 def __contains__(self, k):
180 return k in self.__dict__
187 return k in self.__dict__
181
188
182 def __getitem__(self, k):
189 def __getitem__(self, k):
183 return self.__dict__[k]
190 return self.__dict__[k]
184
191
185
192
186 def msg_header(msg_id, msg_type, username, session):
193 def msg_header(msg_id, msg_type, username, session):
187 date = datetime.now()
194 date = datetime.now()
188 version = kernel_protocol_version
195 version = kernel_protocol_version
189 return locals()
196 return locals()
190
197
191 def extract_header(msg_or_header):
198 def extract_header(msg_or_header):
192 """Given a message or header, return the header."""
199 """Given a message or header, return the header."""
193 if not msg_or_header:
200 if not msg_or_header:
194 return {}
201 return {}
195 try:
202 try:
196 # See if msg_or_header is the entire message.
203 # See if msg_or_header is the entire message.
197 h = msg_or_header['header']
204 h = msg_or_header['header']
198 except KeyError:
205 except KeyError:
199 try:
206 try:
200 # See if msg_or_header is just the header
207 # See if msg_or_header is just the header
201 h = msg_or_header['msg_id']
208 h = msg_or_header['msg_id']
202 except KeyError:
209 except KeyError:
203 raise
210 raise
204 else:
211 else:
205 h = msg_or_header
212 h = msg_or_header
206 if not isinstance(h, dict):
213 if not isinstance(h, dict):
207 h = dict(h)
214 h = dict(h)
208 return h
215 return h
209
216
210 class Session(Configurable):
217 class Session(Configurable):
211 """Object for handling serialization and sending of messages.
218 """Object for handling serialization and sending of messages.
212
219
213 The Session object handles building messages and sending them
220 The Session object handles building messages and sending them
214 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
221 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
215 other over the network via Session objects, and only need to work with the
222 other over the network via Session objects, and only need to work with the
216 dict-based IPython message spec. The Session will handle
223 dict-based IPython message spec. The Session will handle
217 serialization/deserialization, security, and metadata.
224 serialization/deserialization, security, and metadata.
218
225
219 Sessions support configurable serialiization via packer/unpacker traits,
226 Sessions support configurable serialiization via packer/unpacker traits,
220 and signing with HMAC digests via the key/keyfile traits.
227 and signing with HMAC digests via the key/keyfile traits.
221
228
222 Parameters
229 Parameters
223 ----------
230 ----------
224
231
225 debug : bool
232 debug : bool
226 whether to trigger extra debugging statements
233 whether to trigger extra debugging statements
227 packer/unpacker : str : 'json', 'pickle' or import_string
234 packer/unpacker : str : 'json', 'pickle' or import_string
228 importstrings for methods to serialize message parts. If just
235 importstrings for methods to serialize message parts. If just
229 'json' or 'pickle', predefined JSON and pickle packers will be used.
236 'json' or 'pickle', predefined JSON and pickle packers will be used.
230 Otherwise, the entire importstring must be used.
237 Otherwise, the entire importstring must be used.
231
238
232 The functions must accept at least valid JSON input, and output *bytes*.
239 The functions must accept at least valid JSON input, and output *bytes*.
233
240
234 For example, to use msgpack:
241 For example, to use msgpack:
235 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
242 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
236 pack/unpack : callables
243 pack/unpack : callables
237 You can also set the pack/unpack callables for serialization directly.
244 You can also set the pack/unpack callables for serialization directly.
238 session : bytes
245 session : bytes
239 the ID of this Session object. The default is to generate a new UUID.
246 the ID of this Session object. The default is to generate a new UUID.
240 username : unicode
247 username : unicode
241 username added to message headers. The default is to ask the OS.
248 username added to message headers. The default is to ask the OS.
242 key : bytes
249 key : bytes
243 The key used to initialize an HMAC signature. If unset, messages
250 The key used to initialize an HMAC signature. If unset, messages
244 will not be signed or checked.
251 will not be signed or checked.
245 keyfile : filepath
252 keyfile : filepath
246 The file containing a key. If this is set, `key` will be initialized
253 The file containing a key. If this is set, `key` will be initialized
247 to the contents of the file.
254 to the contents of the file.
248
255
249 """
256 """
250
257
251 debug=Bool(False, config=True, help="""Debug output in the Session""")
258 debug=Bool(False, config=True, help="""Debug output in the Session""")
252
259
253 packer = DottedObjectName('json',config=True,
260 packer = DottedObjectName('json',config=True,
254 help="""The name of the packer for serializing messages.
261 help="""The name of the packer for serializing messages.
255 Should be one of 'json', 'pickle', or an import name
262 Should be one of 'json', 'pickle', or an import name
256 for a custom callable serializer.""")
263 for a custom callable serializer.""")
257 def _packer_changed(self, name, old, new):
264 def _packer_changed(self, name, old, new):
258 if new.lower() == 'json':
265 if new.lower() == 'json':
259 self.pack = json_packer
266 self.pack = json_packer
260 self.unpack = json_unpacker
267 self.unpack = json_unpacker
261 self.unpacker = new
268 self.unpacker = new
262 elif new.lower() == 'pickle':
269 elif new.lower() == 'pickle':
263 self.pack = pickle_packer
270 self.pack = pickle_packer
264 self.unpack = pickle_unpacker
271 self.unpack = pickle_unpacker
265 self.unpacker = new
272 self.unpacker = new
266 else:
273 else:
267 self.pack = import_item(str(new))
274 self.pack = import_item(str(new))
268
275
269 unpacker = DottedObjectName('json', config=True,
276 unpacker = DottedObjectName('json', config=True,
270 help="""The name of the unpacker for unserializing messages.
277 help="""The name of the unpacker for unserializing messages.
271 Only used with custom functions for `packer`.""")
278 Only used with custom functions for `packer`.""")
272 def _unpacker_changed(self, name, old, new):
279 def _unpacker_changed(self, name, old, new):
273 if new.lower() == 'json':
280 if new.lower() == 'json':
274 self.pack = json_packer
281 self.pack = json_packer
275 self.unpack = json_unpacker
282 self.unpack = json_unpacker
276 self.packer = new
283 self.packer = new
277 elif new.lower() == 'pickle':
284 elif new.lower() == 'pickle':
278 self.pack = pickle_packer
285 self.pack = pickle_packer
279 self.unpack = pickle_unpacker
286 self.unpack = pickle_unpacker
280 self.packer = new
287 self.packer = new
281 else:
288 else:
282 self.unpack = import_item(str(new))
289 self.unpack = import_item(str(new))
283
290
284 session = CUnicode(u'', config=True,
291 session = CUnicode(u'', config=True,
285 help="""The UUID identifying this session.""")
292 help="""The UUID identifying this session.""")
286 def _session_default(self):
293 def _session_default(self):
287 u = unicode_type(uuid.uuid4())
294 u = unicode_type(uuid.uuid4())
288 self.bsession = u.encode('ascii')
295 self.bsession = u.encode('ascii')
289 return u
296 return u
290
297
291 def _session_changed(self, name, old, new):
298 def _session_changed(self, name, old, new):
292 self.bsession = self.session.encode('ascii')
299 self.bsession = self.session.encode('ascii')
293
300
294 # bsession is the session as bytes
301 # bsession is the session as bytes
295 bsession = CBytes(b'')
302 bsession = CBytes(b'')
296
303
297 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
304 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
298 help="""Username for the Session. Default is your system username.""",
305 help="""Username for the Session. Default is your system username.""",
299 config=True)
306 config=True)
300
307
301 metadata = Dict({}, config=True,
308 metadata = Dict({}, config=True,
302 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
309 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
303
310
304 # if 0, no adapting to do.
311 # if 0, no adapting to do.
305 adapt_version = Integer(0)
312 adapt_version = Integer(0)
306
313
307 # message signature related traits:
314 # message signature related traits:
308
315
309 key = CBytes(b'', config=True,
316 key = CBytes(b'', config=True,
310 help="""execution key, for extra authentication.""")
317 help="""execution key, for extra authentication.""")
311 def _key_changed(self):
318 def _key_changed(self):
312 self._new_auth()
319 self._new_auth()
313
320
314 signature_scheme = Unicode('hmac-sha256', config=True,
321 signature_scheme = Unicode('hmac-sha256', config=True,
315 help="""The digest scheme used to construct the message signatures.
322 help="""The digest scheme used to construct the message signatures.
316 Must have the form 'hmac-HASH'.""")
323 Must have the form 'hmac-HASH'.""")
317 def _signature_scheme_changed(self, name, old, new):
324 def _signature_scheme_changed(self, name, old, new):
318 if not new.startswith('hmac-'):
325 if not new.startswith('hmac-'):
319 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
326 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
320 hash_name = new.split('-', 1)[1]
327 hash_name = new.split('-', 1)[1]
321 try:
328 try:
322 self.digest_mod = getattr(hashlib, hash_name)
329 self.digest_mod = getattr(hashlib, hash_name)
323 except AttributeError:
330 except AttributeError:
324 raise TraitError("hashlib has no such attribute: %s" % hash_name)
331 raise TraitError("hashlib has no such attribute: %s" % hash_name)
325 self._new_auth()
332 self._new_auth()
326
333
327 digest_mod = Any()
334 digest_mod = Any()
328 def _digest_mod_default(self):
335 def _digest_mod_default(self):
329 return hashlib.sha256
336 return hashlib.sha256
330
337
331 auth = Instance(hmac.HMAC)
338 auth = Instance(hmac.HMAC)
332
339
333 def _new_auth(self):
340 def _new_auth(self):
334 if self.key:
341 if self.key:
335 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
342 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
336 else:
343 else:
337 self.auth = None
344 self.auth = None
338
345
339 digest_history = Set()
346 digest_history = Set()
340 digest_history_size = Integer(2**16, config=True,
347 digest_history_size = Integer(2**16, config=True,
341 help="""The maximum number of digests to remember.
348 help="""The maximum number of digests to remember.
342
349
343 The digest history will be culled when it exceeds this value.
350 The digest history will be culled when it exceeds this value.
344 """
351 """
345 )
352 )
346
353
347 keyfile = Unicode('', config=True,
354 keyfile = Unicode('', config=True,
348 help="""path to file containing execution key.""")
355 help="""path to file containing execution key.""")
349 def _keyfile_changed(self, name, old, new):
356 def _keyfile_changed(self, name, old, new):
350 with open(new, 'rb') as f:
357 with open(new, 'rb') as f:
351 self.key = f.read().strip()
358 self.key = f.read().strip()
352
359
353 # for protecting against sends from forks
360 # for protecting against sends from forks
354 pid = Integer()
361 pid = Integer()
355
362
356 # serialization traits:
363 # serialization traits:
357
364
358 pack = Any(default_packer) # the actual packer function
365 pack = Any(default_packer) # the actual packer function
359 def _pack_changed(self, name, old, new):
366 def _pack_changed(self, name, old, new):
360 if not callable(new):
367 if not callable(new):
361 raise TypeError("packer must be callable, not %s"%type(new))
368 raise TypeError("packer must be callable, not %s"%type(new))
362
369
363 unpack = Any(default_unpacker) # the actual packer function
370 unpack = Any(default_unpacker) # the actual packer function
364 def _unpack_changed(self, name, old, new):
371 def _unpack_changed(self, name, old, new):
365 # unpacker is not checked - it is assumed to be
372 # unpacker is not checked - it is assumed to be
366 if not callable(new):
373 if not callable(new):
367 raise TypeError("unpacker must be callable, not %s"%type(new))
374 raise TypeError("unpacker must be callable, not %s"%type(new))
368
375
369 # thresholds:
376 # thresholds:
370 copy_threshold = Integer(2**16, config=True,
377 copy_threshold = Integer(2**16, config=True,
371 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
378 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
372 buffer_threshold = Integer(MAX_BYTES, config=True,
379 buffer_threshold = Integer(MAX_BYTES, config=True,
373 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
380 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
374 item_threshold = Integer(MAX_ITEMS, config=True,
381 item_threshold = Integer(MAX_ITEMS, config=True,
375 help="""The maximum number of items for a container to be introspected for custom serialization.
382 help="""The maximum number of items for a container to be introspected for custom serialization.
376 Containers larger than this are pickled outright.
383 Containers larger than this are pickled outright.
377 """
384 """
378 )
385 )
379
386
380
387
381 def __init__(self, **kwargs):
388 def __init__(self, **kwargs):
382 """create a Session object
389 """create a Session object
383
390
384 Parameters
391 Parameters
385 ----------
392 ----------
386
393
387 debug : bool
394 debug : bool
388 whether to trigger extra debugging statements
395 whether to trigger extra debugging statements
389 packer/unpacker : str : 'json', 'pickle' or import_string
396 packer/unpacker : str : 'json', 'pickle' or import_string
390 importstrings for methods to serialize message parts. If just
397 importstrings for methods to serialize message parts. If just
391 'json' or 'pickle', predefined JSON and pickle packers will be used.
398 'json' or 'pickle', predefined JSON and pickle packers will be used.
392 Otherwise, the entire importstring must be used.
399 Otherwise, the entire importstring must be used.
393
400
394 The functions must accept at least valid JSON input, and output
401 The functions must accept at least valid JSON input, and output
395 *bytes*.
402 *bytes*.
396
403
397 For example, to use msgpack:
404 For example, to use msgpack:
398 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
405 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
399 pack/unpack : callables
406 pack/unpack : callables
400 You can also set the pack/unpack callables for serialization
407 You can also set the pack/unpack callables for serialization
401 directly.
408 directly.
402 session : unicode (must be ascii)
409 session : unicode (must be ascii)
403 the ID of this Session object. The default is to generate a new
410 the ID of this Session object. The default is to generate a new
404 UUID.
411 UUID.
405 bsession : bytes
412 bsession : bytes
406 The session as bytes
413 The session as bytes
407 username : unicode
414 username : unicode
408 username added to message headers. The default is to ask the OS.
415 username added to message headers. The default is to ask the OS.
409 key : bytes
416 key : bytes
410 The key used to initialize an HMAC signature. If unset, messages
417 The key used to initialize an HMAC signature. If unset, messages
411 will not be signed or checked.
418 will not be signed or checked.
412 signature_scheme : str
419 signature_scheme : str
413 The message digest scheme. Currently must be of the form 'hmac-HASH',
420 The message digest scheme. Currently must be of the form 'hmac-HASH',
414 where 'HASH' is a hashing function available in Python's hashlib.
421 where 'HASH' is a hashing function available in Python's hashlib.
415 The default is 'hmac-sha256'.
422 The default is 'hmac-sha256'.
416 This is ignored if 'key' is empty.
423 This is ignored if 'key' is empty.
417 keyfile : filepath
424 keyfile : filepath
418 The file containing a key. If this is set, `key` will be
425 The file containing a key. If this is set, `key` will be
419 initialized to the contents of the file.
426 initialized to the contents of the file.
420 """
427 """
421 super(Session, self).__init__(**kwargs)
428 super(Session, self).__init__(**kwargs)
422 self._check_packers()
429 self._check_packers()
423 self.none = self.pack({})
430 self.none = self.pack({})
424 # ensure self._session_default() if necessary, so bsession is defined:
431 # ensure self._session_default() if necessary, so bsession is defined:
425 self.session
432 self.session
426 self.pid = os.getpid()
433 self.pid = os.getpid()
427
434
428 @property
435 @property
429 def msg_id(self):
436 def msg_id(self):
430 """always return new uuid"""
437 """always return new uuid"""
431 return str(uuid.uuid4())
438 return str(uuid.uuid4())
432
439
433 def _check_packers(self):
440 def _check_packers(self):
434 """check packers for datetime support."""
441 """check packers for datetime support."""
435 pack = self.pack
442 pack = self.pack
436 unpack = self.unpack
443 unpack = self.unpack
437
444
438 # check simple serialization
445 # check simple serialization
439 msg = dict(a=[1,'hi'])
446 msg = dict(a=[1,'hi'])
440 try:
447 try:
441 packed = pack(msg)
448 packed = pack(msg)
442 except Exception as e:
449 except Exception as e:
443 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
450 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
444 if self.packer == 'json':
451 if self.packer == 'json':
445 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
452 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
446 else:
453 else:
447 jsonmsg = ""
454 jsonmsg = ""
448 raise ValueError(
455 raise ValueError(
449 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
456 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
450 )
457 )
451
458
452 # ensure packed message is bytes
459 # ensure packed message is bytes
453 if not isinstance(packed, bytes):
460 if not isinstance(packed, bytes):
454 raise ValueError("message packed to %r, but bytes are required"%type(packed))
461 raise ValueError("message packed to %r, but bytes are required"%type(packed))
455
462
456 # check that unpack is pack's inverse
463 # check that unpack is pack's inverse
457 try:
464 try:
458 unpacked = unpack(packed)
465 unpacked = unpack(packed)
459 assert unpacked == msg
466 assert unpacked == msg
460 except Exception as e:
467 except Exception as e:
461 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
468 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
462 if self.packer == 'json':
469 if self.packer == 'json':
463 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
470 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
464 else:
471 else:
465 jsonmsg = ""
472 jsonmsg = ""
466 raise ValueError(
473 raise ValueError(
467 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
474 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
468 )
475 )
469
476
470 # check datetime support
477 # check datetime support
471 msg = dict(t=datetime.now())
478 msg = dict(t=datetime.now())
472 try:
479 try:
473 unpacked = unpack(pack(msg))
480 unpacked = unpack(pack(msg))
474 if isinstance(unpacked['t'], datetime):
481 if isinstance(unpacked['t'], datetime):
475 raise ValueError("Shouldn't deserialize to datetime")
482 raise ValueError("Shouldn't deserialize to datetime")
476 except Exception:
483 except Exception:
477 self.pack = lambda o: pack(squash_dates(o))
484 self.pack = lambda o: pack(squash_dates(o))
478 self.unpack = lambda s: unpack(s)
485 self.unpack = lambda s: unpack(s)
479
486
480 def msg_header(self, msg_type):
487 def msg_header(self, msg_type):
481 return msg_header(self.msg_id, msg_type, self.username, self.session)
488 return msg_header(self.msg_id, msg_type, self.username, self.session)
482
489
483 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
490 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
484 """Return the nested message dict.
491 """Return the nested message dict.
485
492
486 This format is different from what is sent over the wire. The
493 This format is different from what is sent over the wire. The
487 serialize/unserialize methods converts this nested message dict to the wire
494 serialize/unserialize methods converts this nested message dict to the wire
488 format, which is a list of message parts.
495 format, which is a list of message parts.
489 """
496 """
490 msg = {}
497 msg = {}
491 header = self.msg_header(msg_type) if header is None else header
498 header = self.msg_header(msg_type) if header is None else header
492 msg['header'] = header
499 msg['header'] = header
493 msg['msg_id'] = header['msg_id']
500 msg['msg_id'] = header['msg_id']
494 msg['msg_type'] = header['msg_type']
501 msg['msg_type'] = header['msg_type']
495 msg['parent_header'] = {} if parent is None else extract_header(parent)
502 msg['parent_header'] = {} if parent is None else extract_header(parent)
496 msg['content'] = {} if content is None else content
503 msg['content'] = {} if content is None else content
497 msg['metadata'] = self.metadata.copy()
504 msg['metadata'] = self.metadata.copy()
498 if metadata is not None:
505 if metadata is not None:
499 msg['metadata'].update(metadata)
506 msg['metadata'].update(metadata)
500 return msg
507 return msg
501
508
502 def sign(self, msg_list):
509 def sign(self, msg_list):
503 """Sign a message with HMAC digest. If no auth, return b''.
510 """Sign a message with HMAC digest. If no auth, return b''.
504
511
505 Parameters
512 Parameters
506 ----------
513 ----------
507 msg_list : list
514 msg_list : list
508 The [p_header,p_parent,p_content] part of the message list.
515 The [p_header,p_parent,p_content] part of the message list.
509 """
516 """
510 if self.auth is None:
517 if self.auth is None:
511 return b''
518 return b''
512 h = self.auth.copy()
519 h = self.auth.copy()
513 for m in msg_list:
520 for m in msg_list:
514 h.update(m)
521 h.update(m)
515 return str_to_bytes(h.hexdigest())
522 return str_to_bytes(h.hexdigest())
516
523
517 def serialize(self, msg, ident=None):
524 def serialize(self, msg, ident=None):
518 """Serialize the message components to bytes.
525 """Serialize the message components to bytes.
519
526
520 This is roughly the inverse of unserialize. The serialize/unserialize
527 This is roughly the inverse of unserialize. The serialize/unserialize
521 methods work with full message lists, whereas pack/unpack work with
528 methods work with full message lists, whereas pack/unpack work with
522 the individual message parts in the message list.
529 the individual message parts in the message list.
523
530
524 Parameters
531 Parameters
525 ----------
532 ----------
526 msg : dict or Message
533 msg : dict or Message
527 The nexted message dict as returned by the self.msg method.
534 The nexted message dict as returned by the self.msg method.
528
535
529 Returns
536 Returns
530 -------
537 -------
531 msg_list : list
538 msg_list : list
532 The list of bytes objects to be sent with the format::
539 The list of bytes objects to be sent with the format::
533
540
534 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
541 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
535 p_metadata, p_content, buffer1, buffer2, ...]
542 p_metadata, p_content, buffer1, buffer2, ...]
536
543
537 In this list, the ``p_*`` entities are the packed or serialized
544 In this list, the ``p_*`` entities are the packed or serialized
538 versions, so if JSON is used, these are utf8 encoded JSON strings.
545 versions, so if JSON is used, these are utf8 encoded JSON strings.
539 """
546 """
540 content = msg.get('content', {})
547 content = msg.get('content', {})
541 if content is None:
548 if content is None:
542 content = self.none
549 content = self.none
543 elif isinstance(content, dict):
550 elif isinstance(content, dict):
544 content = self.pack(content)
551 content = self.pack(content)
545 elif isinstance(content, bytes):
552 elif isinstance(content, bytes):
546 # content is already packed, as in a relayed message
553 # content is already packed, as in a relayed message
547 pass
554 pass
548 elif isinstance(content, unicode_type):
555 elif isinstance(content, unicode_type):
549 # should be bytes, but JSON often spits out unicode
556 # should be bytes, but JSON often spits out unicode
550 content = content.encode('utf8')
557 content = content.encode('utf8')
551 else:
558 else:
552 raise TypeError("Content incorrect type: %s"%type(content))
559 raise TypeError("Content incorrect type: %s"%type(content))
553
560
554 real_message = [self.pack(msg['header']),
561 real_message = [self.pack(msg['header']),
555 self.pack(msg['parent_header']),
562 self.pack(msg['parent_header']),
556 self.pack(msg['metadata']),
563 self.pack(msg['metadata']),
557 content,
564 content,
558 ]
565 ]
559
566
560 to_send = []
567 to_send = []
561
568
562 if isinstance(ident, list):
569 if isinstance(ident, list):
563 # accept list of idents
570 # accept list of idents
564 to_send.extend(ident)
571 to_send.extend(ident)
565 elif ident is not None:
572 elif ident is not None:
566 to_send.append(ident)
573 to_send.append(ident)
567 to_send.append(DELIM)
574 to_send.append(DELIM)
568
575
569 signature = self.sign(real_message)
576 signature = self.sign(real_message)
570 to_send.append(signature)
577 to_send.append(signature)
571
578
572 to_send.extend(real_message)
579 to_send.extend(real_message)
573
580
574 return to_send
581 return to_send
575
582
576 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
583 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
577 buffers=None, track=False, header=None, metadata=None):
584 buffers=None, track=False, header=None, metadata=None):
578 """Build and send a message via stream or socket.
585 """Build and send a message via stream or socket.
579
586
580 The message format used by this function internally is as follows:
587 The message format used by this function internally is as follows:
581
588
582 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
589 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
583 buffer1,buffer2,...]
590 buffer1,buffer2,...]
584
591
585 The serialize/unserialize methods convert the nested message dict into this
592 The serialize/unserialize methods convert the nested message dict into this
586 format.
593 format.
587
594
588 Parameters
595 Parameters
589 ----------
596 ----------
590
597
591 stream : zmq.Socket or ZMQStream
598 stream : zmq.Socket or ZMQStream
592 The socket-like object used to send the data.
599 The socket-like object used to send the data.
593 msg_or_type : str or Message/dict
600 msg_or_type : str or Message/dict
594 Normally, msg_or_type will be a msg_type unless a message is being
601 Normally, msg_or_type will be a msg_type unless a message is being
595 sent more than once. If a header is supplied, this can be set to
602 sent more than once. If a header is supplied, this can be set to
596 None and the msg_type will be pulled from the header.
603 None and the msg_type will be pulled from the header.
597
604
598 content : dict or None
605 content : dict or None
599 The content of the message (ignored if msg_or_type is a message).
606 The content of the message (ignored if msg_or_type is a message).
600 header : dict or None
607 header : dict or None
601 The header dict for the message (ignored if msg_to_type is a message).
608 The header dict for the message (ignored if msg_to_type is a message).
602 parent : Message or dict or None
609 parent : Message or dict or None
603 The parent or parent header describing the parent of this message
610 The parent or parent header describing the parent of this message
604 (ignored if msg_or_type is a message).
611 (ignored if msg_or_type is a message).
605 ident : bytes or list of bytes
612 ident : bytes or list of bytes
606 The zmq.IDENTITY routing path.
613 The zmq.IDENTITY routing path.
607 metadata : dict or None
614 metadata : dict or None
608 The metadata describing the message
615 The metadata describing the message
609 buffers : list or None
616 buffers : list or None
610 The already-serialized buffers to be appended to the message.
617 The already-serialized buffers to be appended to the message.
611 track : bool
618 track : bool
612 Whether to track. Only for use with Sockets, because ZMQStream
619 Whether to track. Only for use with Sockets, because ZMQStream
613 objects cannot track messages.
620 objects cannot track messages.
614
621
615
622
616 Returns
623 Returns
617 -------
624 -------
618 msg : dict
625 msg : dict
619 The constructed message.
626 The constructed message.
620 """
627 """
621 if not isinstance(stream, zmq.Socket):
628 if not isinstance(stream, zmq.Socket):
622 # ZMQStreams and dummy sockets do not support tracking.
629 # ZMQStreams and dummy sockets do not support tracking.
623 track = False
630 track = False
624
631
625 if isinstance(msg_or_type, (Message, dict)):
632 if isinstance(msg_or_type, (Message, dict)):
626 # We got a Message or message dict, not a msg_type so don't
633 # We got a Message or message dict, not a msg_type so don't
627 # build a new Message.
634 # build a new Message.
628 msg = msg_or_type
635 msg = msg_or_type
629 else:
636 else:
630 msg = self.msg(msg_or_type, content=content, parent=parent,
637 msg = self.msg(msg_or_type, content=content, parent=parent,
631 header=header, metadata=metadata)
638 header=header, metadata=metadata)
632 if not os.getpid() == self.pid:
639 if not os.getpid() == self.pid:
633 io.rprint("WARNING: attempted to send message from fork")
640 io.rprint("WARNING: attempted to send message from fork")
634 io.rprint(msg)
641 io.rprint(msg)
635 return
642 return
636 buffers = [] if buffers is None else buffers
643 buffers = [] if buffers is None else buffers
637 if self.adapt_version:
644 if self.adapt_version:
638 msg = adapt(msg, self.adapt_version)
645 msg = adapt(msg, self.adapt_version)
639 to_send = self.serialize(msg, ident)
646 to_send = self.serialize(msg, ident)
640 to_send.extend(buffers)
647 to_send.extend(buffers)
641 longest = max([ len(s) for s in to_send ])
648 longest = max([ len(s) for s in to_send ])
642 copy = (longest < self.copy_threshold)
649 copy = (longest < self.copy_threshold)
643
650
644 if buffers and track and not copy:
651 if buffers and track and not copy:
645 # only really track when we are doing zero-copy buffers
652 # only really track when we are doing zero-copy buffers
646 tracker = stream.send_multipart(to_send, copy=False, track=True)
653 tracker = stream.send_multipart(to_send, copy=False, track=True)
647 else:
654 else:
648 # use dummy tracker, which will be done immediately
655 # use dummy tracker, which will be done immediately
649 tracker = DONE
656 tracker = DONE
650 stream.send_multipart(to_send, copy=copy)
657 stream.send_multipart(to_send, copy=copy)
651
658
652 if self.debug:
659 if self.debug:
653 pprint.pprint(msg)
660 pprint.pprint(msg)
654 pprint.pprint(to_send)
661 pprint.pprint(to_send)
655 pprint.pprint(buffers)
662 pprint.pprint(buffers)
656
663
657 msg['tracker'] = tracker
664 msg['tracker'] = tracker
658
665
659 return msg
666 return msg
660
667
661 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
668 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
662 """Send a raw message via ident path.
669 """Send a raw message via ident path.
663
670
664 This method is used to send a already serialized message.
671 This method is used to send a already serialized message.
665
672
666 Parameters
673 Parameters
667 ----------
674 ----------
668 stream : ZMQStream or Socket
675 stream : ZMQStream or Socket
669 The ZMQ stream or socket to use for sending the message.
676 The ZMQ stream or socket to use for sending the message.
670 msg_list : list
677 msg_list : list
671 The serialized list of messages to send. This only includes the
678 The serialized list of messages to send. This only includes the
672 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
679 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
673 the message.
680 the message.
674 ident : ident or list
681 ident : ident or list
675 A single ident or a list of idents to use in sending.
682 A single ident or a list of idents to use in sending.
676 """
683 """
677 to_send = []
684 to_send = []
678 if isinstance(ident, bytes):
685 if isinstance(ident, bytes):
679 ident = [ident]
686 ident = [ident]
680 if ident is not None:
687 if ident is not None:
681 to_send.extend(ident)
688 to_send.extend(ident)
682
689
683 to_send.append(DELIM)
690 to_send.append(DELIM)
684 to_send.append(self.sign(msg_list))
691 to_send.append(self.sign(msg_list))
685 to_send.extend(msg_list)
692 to_send.extend(msg_list)
686 stream.send_multipart(to_send, flags, copy=copy)
693 stream.send_multipart(to_send, flags, copy=copy)
687
694
688 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
695 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
689 """Receive and unpack a message.
696 """Receive and unpack a message.
690
697
691 Parameters
698 Parameters
692 ----------
699 ----------
693 socket : ZMQStream or Socket
700 socket : ZMQStream or Socket
694 The socket or stream to use in receiving.
701 The socket or stream to use in receiving.
695
702
696 Returns
703 Returns
697 -------
704 -------
698 [idents], msg
705 [idents], msg
699 [idents] is a list of idents and msg is a nested message dict of
706 [idents] is a list of idents and msg is a nested message dict of
700 same format as self.msg returns.
707 same format as self.msg returns.
701 """
708 """
702 if isinstance(socket, ZMQStream):
709 if isinstance(socket, ZMQStream):
703 socket = socket.socket
710 socket = socket.socket
704 try:
711 try:
705 msg_list = socket.recv_multipart(mode, copy=copy)
712 msg_list = socket.recv_multipart(mode, copy=copy)
706 except zmq.ZMQError as e:
713 except zmq.ZMQError as e:
707 if e.errno == zmq.EAGAIN:
714 if e.errno == zmq.EAGAIN:
708 # We can convert EAGAIN to None as we know in this case
715 # We can convert EAGAIN to None as we know in this case
709 # recv_multipart won't return None.
716 # recv_multipart won't return None.
710 return None,None
717 return None,None
711 else:
718 else:
712 raise
719 raise
713 # split multipart message into identity list and message dict
720 # split multipart message into identity list and message dict
714 # invalid large messages can cause very expensive string comparisons
721 # invalid large messages can cause very expensive string comparisons
715 idents, msg_list = self.feed_identities(msg_list, copy)
722 idents, msg_list = self.feed_identities(msg_list, copy)
716 try:
723 try:
717 return idents, self.unserialize(msg_list, content=content, copy=copy)
724 return idents, self.unserialize(msg_list, content=content, copy=copy)
718 except Exception as e:
725 except Exception as e:
719 # TODO: handle it
726 # TODO: handle it
720 raise e
727 raise e
721
728
722 def feed_identities(self, msg_list, copy=True):
729 def feed_identities(self, msg_list, copy=True):
723 """Split the identities from the rest of the message.
730 """Split the identities from the rest of the message.
724
731
725 Feed until DELIM is reached, then return the prefix as idents and
732 Feed until DELIM is reached, then return the prefix as idents and
726 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
733 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
727 but that would be silly.
734 but that would be silly.
728
735
729 Parameters
736 Parameters
730 ----------
737 ----------
731 msg_list : a list of Message or bytes objects
738 msg_list : a list of Message or bytes objects
732 The message to be split.
739 The message to be split.
733 copy : bool
740 copy : bool
734 flag determining whether the arguments are bytes or Messages
741 flag determining whether the arguments are bytes or Messages
735
742
736 Returns
743 Returns
737 -------
744 -------
738 (idents, msg_list) : two lists
745 (idents, msg_list) : two lists
739 idents will always be a list of bytes, each of which is a ZMQ
746 idents will always be a list of bytes, each of which is a ZMQ
740 identity. msg_list will be a list of bytes or zmq.Messages of the
747 identity. msg_list will be a list of bytes or zmq.Messages of the
741 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
748 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
742 should be unpackable/unserializable via self.unserialize at this
749 should be unpackable/unserializable via self.unserialize at this
743 point.
750 point.
744 """
751 """
745 if copy:
752 if copy:
746 idx = msg_list.index(DELIM)
753 idx = msg_list.index(DELIM)
747 return msg_list[:idx], msg_list[idx+1:]
754 return msg_list[:idx], msg_list[idx+1:]
748 else:
755 else:
749 failed = True
756 failed = True
750 for idx,m in enumerate(msg_list):
757 for idx,m in enumerate(msg_list):
751 if m.bytes == DELIM:
758 if m.bytes == DELIM:
752 failed = False
759 failed = False
753 break
760 break
754 if failed:
761 if failed:
755 raise ValueError("DELIM not in msg_list")
762 raise ValueError("DELIM not in msg_list")
756 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
763 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
757 return [m.bytes for m in idents], msg_list
764 return [m.bytes for m in idents], msg_list
758
765
759 def _add_digest(self, signature):
766 def _add_digest(self, signature):
760 """add a digest to history to protect against replay attacks"""
767 """add a digest to history to protect against replay attacks"""
761 if self.digest_history_size == 0:
768 if self.digest_history_size == 0:
762 # no history, never add digests
769 # no history, never add digests
763 return
770 return
764
771
765 self.digest_history.add(signature)
772 self.digest_history.add(signature)
766 if len(self.digest_history) > self.digest_history_size:
773 if len(self.digest_history) > self.digest_history_size:
767 # threshold reached, cull 10%
774 # threshold reached, cull 10%
768 self._cull_digest_history()
775 self._cull_digest_history()
769
776
770 def _cull_digest_history(self):
777 def _cull_digest_history(self):
771 """cull the digest history
778 """cull the digest history
772
779
773 Removes a randomly selected 10% of the digest history
780 Removes a randomly selected 10% of the digest history
774 """
781 """
775 current = len(self.digest_history)
782 current = len(self.digest_history)
776 n_to_cull = max(int(current // 10), current - self.digest_history_size)
783 n_to_cull = max(int(current // 10), current - self.digest_history_size)
777 if n_to_cull >= current:
784 if n_to_cull >= current:
778 self.digest_history = set()
785 self.digest_history = set()
779 return
786 return
780 to_cull = random.sample(self.digest_history, n_to_cull)
787 to_cull = random.sample(self.digest_history, n_to_cull)
781 self.digest_history.difference_update(to_cull)
788 self.digest_history.difference_update(to_cull)
782
789
783 def unserialize(self, msg_list, content=True, copy=True):
790 def unserialize(self, msg_list, content=True, copy=True):
784 """Unserialize a msg_list to a nested message dict.
791 """Unserialize a msg_list to a nested message dict.
785
792
786 This is roughly the inverse of serialize. The serialize/unserialize
793 This is roughly the inverse of serialize. The serialize/unserialize
787 methods work with full message lists, whereas pack/unpack work with
794 methods work with full message lists, whereas pack/unpack work with
788 the individual message parts in the message list.
795 the individual message parts in the message list.
789
796
790 Parameters
797 Parameters
791 ----------
798 ----------
792 msg_list : list of bytes or Message objects
799 msg_list : list of bytes or Message objects
793 The list of message parts of the form [HMAC,p_header,p_parent,
800 The list of message parts of the form [HMAC,p_header,p_parent,
794 p_metadata,p_content,buffer1,buffer2,...].
801 p_metadata,p_content,buffer1,buffer2,...].
795 content : bool (True)
802 content : bool (True)
796 Whether to unpack the content dict (True), or leave it packed
803 Whether to unpack the content dict (True), or leave it packed
797 (False).
804 (False).
798 copy : bool (True)
805 copy : bool (True)
799 Whether to return the bytes (True), or the non-copying Message
806 Whether to return the bytes (True), or the non-copying Message
800 object in each place (False).
807 object in each place (False).
801
808
802 Returns
809 Returns
803 -------
810 -------
804 msg : dict
811 msg : dict
805 The nested message dict with top-level keys [header, parent_header,
812 The nested message dict with top-level keys [header, parent_header,
806 content, buffers].
813 content, buffers].
807 """
814 """
808 minlen = 5
815 minlen = 5
809 message = {}
816 message = {}
810 if not copy:
817 if not copy:
811 for i in range(minlen):
818 for i in range(minlen):
812 msg_list[i] = msg_list[i].bytes
819 msg_list[i] = msg_list[i].bytes
813 if self.auth is not None:
820 if self.auth is not None:
814 signature = msg_list[0]
821 signature = msg_list[0]
815 if not signature:
822 if not signature:
816 raise ValueError("Unsigned Message")
823 raise ValueError("Unsigned Message")
817 if signature in self.digest_history:
824 if signature in self.digest_history:
818 raise ValueError("Duplicate Signature: %r" % signature)
825 raise ValueError("Duplicate Signature: %r" % signature)
819 self._add_digest(signature)
826 self._add_digest(signature)
820 check = self.sign(msg_list[1:5])
827 check = self.sign(msg_list[1:5])
821 if not hmac.compare_digest(signature, check):
828 if not compare_digest(signature, check):
822 raise ValueError("Invalid Signature: %r" % signature)
829 raise ValueError("Invalid Signature: %r" % signature)
823 if not len(msg_list) >= minlen:
830 if not len(msg_list) >= minlen:
824 raise TypeError("malformed message, must have at least %i elements"%minlen)
831 raise TypeError("malformed message, must have at least %i elements"%minlen)
825 header = self.unpack(msg_list[1])
832 header = self.unpack(msg_list[1])
826 message['header'] = extract_dates(header)
833 message['header'] = extract_dates(header)
827 message['msg_id'] = header['msg_id']
834 message['msg_id'] = header['msg_id']
828 message['msg_type'] = header['msg_type']
835 message['msg_type'] = header['msg_type']
829 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
836 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
830 message['metadata'] = self.unpack(msg_list[3])
837 message['metadata'] = self.unpack(msg_list[3])
831 if content:
838 if content:
832 message['content'] = self.unpack(msg_list[4])
839 message['content'] = self.unpack(msg_list[4])
833 else:
840 else:
834 message['content'] = msg_list[4]
841 message['content'] = msg_list[4]
835
842
836 message['buffers'] = msg_list[5:]
843 message['buffers'] = msg_list[5:]
837 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
844 # print("received: %s: %s\n %s" % (message['msg_type'], message['header'], message['content']))
838 # adapt to the current version
845 # adapt to the current version
839 return adapt(message)
846 return adapt(message)
840 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
847 # print("adapted: %s: %s\n %s" % (adapted['msg_type'], adapted['header'], adapted['content']))
841
848
842 def test_msg2obj():
849 def test_msg2obj():
843 am = dict(x=1)
850 am = dict(x=1)
844 ao = Message(am)
851 ao = Message(am)
845 assert ao.x == am['x']
852 assert ao.x == am['x']
846
853
847 am['y'] = dict(z=1)
854 am['y'] = dict(z=1)
848 ao = Message(am)
855 ao = Message(am)
849 assert ao.y.z == am['y']['z']
856 assert ao.y.z == am['y']['z']
850
857
851 k1, k2 = 'y', 'z'
858 k1, k2 = 'y', 'z'
852 assert ao[k1][k2] == am[k1][k2]
859 assert ao[k1][k2] == am[k1][k2]
853
860
854 am2 = dict(ao)
861 am2 = dict(ao)
855 assert am['x'] == am2['x']
862 assert am['x'] == am2['x']
856 assert am['y']['z'] == am2['y']['z']
863 assert am['y']['z'] == am2['y']['z']
857
864
General Comments 0
You need to be logged in to leave comments. Login now