##// END OF EJS Templates
move MAX_ITEMS, MAX_BYTES to session from serialize
Min RK -
Show More
@@ -1,181 +1,180 b''
1 """serialization utilities for apply messages"""
1 """serialization utilities for apply messages"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 try:
6 try:
7 import cPickle
7 import cPickle
8 pickle = cPickle
8 pickle = cPickle
9 except:
9 except:
10 cPickle = None
10 cPickle = None
11 import pickle
11 import pickle
12
12
13 # IPython imports
13 # IPython imports
14 from IPython.utils.py3compat import PY3, buffer_to_bytes_py2
14 from IPython.utils.py3compat import PY3, buffer_to_bytes_py2
15 from IPython.utils.data import flatten
15 from IPython.utils.data import flatten
16 from IPython.utils.pickleutil import (
16 from IPython.utils.pickleutil import (
17 can, uncan, can_sequence, uncan_sequence, CannedObject,
17 can, uncan, can_sequence, uncan_sequence, CannedObject,
18 istype, sequence_types, PICKLE_PROTOCOL,
18 istype, sequence_types, PICKLE_PROTOCOL,
19 )
19 )
20 from .session import MAX_ITEMS, MAX_BYTES
21
20
22
21 if PY3:
23 if PY3:
22 buffer = memoryview
24 buffer = memoryview
23
25
24 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
25 # Serialization Functions
27 # Serialization Functions
26 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
27
29
28 # default values for the thresholds:
29 MAX_ITEMS = 64
30 MAX_BYTES = 1024
31
30
32 def _extract_buffers(obj, threshold=MAX_BYTES):
31 def _extract_buffers(obj, threshold=MAX_BYTES):
33 """extract buffers larger than a certain threshold"""
32 """extract buffers larger than a certain threshold"""
34 buffers = []
33 buffers = []
35 if isinstance(obj, CannedObject) and obj.buffers:
34 if isinstance(obj, CannedObject) and obj.buffers:
36 for i,buf in enumerate(obj.buffers):
35 for i,buf in enumerate(obj.buffers):
37 if len(buf) > threshold:
36 if len(buf) > threshold:
38 # buffer larger than threshold, prevent pickling
37 # buffer larger than threshold, prevent pickling
39 obj.buffers[i] = None
38 obj.buffers[i] = None
40 buffers.append(buf)
39 buffers.append(buf)
41 elif isinstance(buf, buffer):
40 elif isinstance(buf, buffer):
42 # buffer too small for separate send, coerce to bytes
41 # buffer too small for separate send, coerce to bytes
43 # because pickling buffer objects just results in broken pointers
42 # because pickling buffer objects just results in broken pointers
44 obj.buffers[i] = bytes(buf)
43 obj.buffers[i] = bytes(buf)
45 return buffers
44 return buffers
46
45
47 def _restore_buffers(obj, buffers):
46 def _restore_buffers(obj, buffers):
48 """restore buffers extracted by """
47 """restore buffers extracted by """
49 if isinstance(obj, CannedObject) and obj.buffers:
48 if isinstance(obj, CannedObject) and obj.buffers:
50 for i,buf in enumerate(obj.buffers):
49 for i,buf in enumerate(obj.buffers):
51 if buf is None:
50 if buf is None:
52 obj.buffers[i] = buffers.pop(0)
51 obj.buffers[i] = buffers.pop(0)
53
52
54 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
53 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
55 """Serialize an object into a list of sendable buffers.
54 """Serialize an object into a list of sendable buffers.
56
55
57 Parameters
56 Parameters
58 ----------
57 ----------
59
58
60 obj : object
59 obj : object
61 The object to be serialized
60 The object to be serialized
62 buffer_threshold : int
61 buffer_threshold : int
63 The threshold (in bytes) for pulling out data buffers
62 The threshold (in bytes) for pulling out data buffers
64 to avoid pickling them.
63 to avoid pickling them.
65 item_threshold : int
64 item_threshold : int
66 The maximum number of items over which canning will iterate.
65 The maximum number of items over which canning will iterate.
67 Containers (lists, dicts) larger than this will be pickled without
66 Containers (lists, dicts) larger than this will be pickled without
68 introspection.
67 introspection.
69
68
70 Returns
69 Returns
71 -------
70 -------
72 [bufs] : list of buffers representing the serialized object.
71 [bufs] : list of buffers representing the serialized object.
73 """
72 """
74 buffers = []
73 buffers = []
75 if istype(obj, sequence_types) and len(obj) < item_threshold:
74 if istype(obj, sequence_types) and len(obj) < item_threshold:
76 cobj = can_sequence(obj)
75 cobj = can_sequence(obj)
77 for c in cobj:
76 for c in cobj:
78 buffers.extend(_extract_buffers(c, buffer_threshold))
77 buffers.extend(_extract_buffers(c, buffer_threshold))
79 elif istype(obj, dict) and len(obj) < item_threshold:
78 elif istype(obj, dict) and len(obj) < item_threshold:
80 cobj = {}
79 cobj = {}
81 for k in sorted(obj):
80 for k in sorted(obj):
82 c = can(obj[k])
81 c = can(obj[k])
83 buffers.extend(_extract_buffers(c, buffer_threshold))
82 buffers.extend(_extract_buffers(c, buffer_threshold))
84 cobj[k] = c
83 cobj[k] = c
85 else:
84 else:
86 cobj = can(obj)
85 cobj = can(obj)
87 buffers.extend(_extract_buffers(cobj, buffer_threshold))
86 buffers.extend(_extract_buffers(cobj, buffer_threshold))
88
87
89 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
88 buffers.insert(0, pickle.dumps(cobj, PICKLE_PROTOCOL))
90 return buffers
89 return buffers
91
90
92 def deserialize_object(buffers, g=None):
91 def deserialize_object(buffers, g=None):
93 """reconstruct an object serialized by serialize_object from data buffers.
92 """reconstruct an object serialized by serialize_object from data buffers.
94
93
95 Parameters
94 Parameters
96 ----------
95 ----------
97
96
98 bufs : list of buffers/bytes
97 bufs : list of buffers/bytes
99
98
100 g : globals to be used when uncanning
99 g : globals to be used when uncanning
101
100
102 Returns
101 Returns
103 -------
102 -------
104
103
105 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
104 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
106 """
105 """
107 bufs = list(buffers)
106 bufs = list(buffers)
108 pobj = buffer_to_bytes_py2(bufs.pop(0))
107 pobj = buffer_to_bytes_py2(bufs.pop(0))
109 canned = pickle.loads(pobj)
108 canned = pickle.loads(pobj)
110 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
109 if istype(canned, sequence_types) and len(canned) < MAX_ITEMS:
111 for c in canned:
110 for c in canned:
112 _restore_buffers(c, bufs)
111 _restore_buffers(c, bufs)
113 newobj = uncan_sequence(canned, g)
112 newobj = uncan_sequence(canned, g)
114 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
113 elif istype(canned, dict) and len(canned) < MAX_ITEMS:
115 newobj = {}
114 newobj = {}
116 for k in sorted(canned):
115 for k in sorted(canned):
117 c = canned[k]
116 c = canned[k]
118 _restore_buffers(c, bufs)
117 _restore_buffers(c, bufs)
119 newobj[k] = uncan(c, g)
118 newobj[k] = uncan(c, g)
120 else:
119 else:
121 _restore_buffers(canned, bufs)
120 _restore_buffers(canned, bufs)
122 newobj = uncan(canned, g)
121 newobj = uncan(canned, g)
123
122
124 return newobj, bufs
123 return newobj, bufs
125
124
126 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
125 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
127 """pack up a function, args, and kwargs to be sent over the wire
126 """pack up a function, args, and kwargs to be sent over the wire
128
127
129 Each element of args/kwargs will be canned for special treatment,
128 Each element of args/kwargs will be canned for special treatment,
130 but inspection will not go any deeper than that.
129 but inspection will not go any deeper than that.
131
130
132 Any object whose data is larger than `threshold` will not have their data copied
131 Any object whose data is larger than `threshold` will not have their data copied
133 (only numpy arrays and bytes/buffers support zero-copy)
132 (only numpy arrays and bytes/buffers support zero-copy)
134
133
135 Message will be a list of bytes/buffers of the format:
134 Message will be a list of bytes/buffers of the format:
136
135
137 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
136 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
138
137
139 With length at least two + len(args) + len(kwargs)
138 With length at least two + len(args) + len(kwargs)
140 """
139 """
141
140
142 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
141 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
143
142
144 kw_keys = sorted(kwargs.keys())
143 kw_keys = sorted(kwargs.keys())
145 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
144 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
146
145
147 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
146 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
148
147
149 msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)]
148 msg = [pickle.dumps(can(f), PICKLE_PROTOCOL)]
150 msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
149 msg.append(pickle.dumps(info, PICKLE_PROTOCOL))
151 msg.extend(arg_bufs)
150 msg.extend(arg_bufs)
152 msg.extend(kwarg_bufs)
151 msg.extend(kwarg_bufs)
153
152
154 return msg
153 return msg
155
154
156 def unpack_apply_message(bufs, g=None, copy=True):
155 def unpack_apply_message(bufs, g=None, copy=True):
157 """unpack f,args,kwargs from buffers packed by pack_apply_message()
156 """unpack f,args,kwargs from buffers packed by pack_apply_message()
158 Returns: original f,args,kwargs"""
157 Returns: original f,args,kwargs"""
159 bufs = list(bufs) # allow us to pop
158 bufs = list(bufs) # allow us to pop
160 assert len(bufs) >= 2, "not enough buffers!"
159 assert len(bufs) >= 2, "not enough buffers!"
161 pf = buffer_to_bytes_py2(bufs.pop(0))
160 pf = buffer_to_bytes_py2(bufs.pop(0))
162 f = uncan(pickle.loads(pf), g)
161 f = uncan(pickle.loads(pf), g)
163 pinfo = buffer_to_bytes_py2(bufs.pop(0))
162 pinfo = buffer_to_bytes_py2(bufs.pop(0))
164 info = pickle.loads(pinfo)
163 info = pickle.loads(pinfo)
165 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
164 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
166
165
167 args = []
166 args = []
168 for i in range(info['nargs']):
167 for i in range(info['nargs']):
169 arg, arg_bufs = deserialize_object(arg_bufs, g)
168 arg, arg_bufs = deserialize_object(arg_bufs, g)
170 args.append(arg)
169 args.append(arg)
171 args = tuple(args)
170 args = tuple(args)
172 assert not arg_bufs, "Shouldn't be any arg bufs left over"
171 assert not arg_bufs, "Shouldn't be any arg bufs left over"
173
172
174 kwargs = {}
173 kwargs = {}
175 for key in info['kw_keys']:
174 for key in info['kw_keys']:
176 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
175 kwarg, kwarg_bufs = deserialize_object(kwarg_bufs, g)
177 kwargs[key] = kwarg
176 kwargs[key] = kwarg
178 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
177 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
179
178
180 return f,args,kwargs
179 return f,args,kwargs
181
180
@@ -1,881 +1,884 b''
1 """Session object for building, serializing, sending, and receiving messages in
1 """Session object for building, serializing, sending, and receiving messages in
2 IPython. The Session object supports serialization, HMAC signatures, and
2 IPython. The Session object supports serialization, HMAC signatures, and
3 metadata on messages.
3 metadata on messages.
4
4
5 Also defined here are utilities for working with Sessions:
5 Also defined here are utilities for working with Sessions:
6 * A SessionFactory to be used as a base class for configurables that work with
6 * A SessionFactory to be used as a base class for configurables that work with
7 Sessions.
7 Sessions.
8 * A Message object for convenience that allows attribute-access to the msg dict.
8 * A Message object for convenience that allows attribute-access to the msg dict.
9 """
9 """
10
10
11 # Copyright (c) IPython Development Team.
11 # Copyright (c) IPython Development Team.
12 # Distributed under the terms of the Modified BSD License.
12 # Distributed under the terms of the Modified BSD License.
13
13
14 import hashlib
14 import hashlib
15 import hmac
15 import hmac
16 import logging
16 import logging
17 import os
17 import os
18 import pprint
18 import pprint
19 import random
19 import random
20 import uuid
20 import uuid
21 import warnings
21 import warnings
22 from datetime import datetime
22 from datetime import datetime
23
23
24 try:
24 try:
25 import cPickle
25 import cPickle
26 pickle = cPickle
26 pickle = cPickle
27 except:
27 except:
28 cPickle = None
28 cPickle = None
29 import pickle
29 import pickle
30
30
31 try:
31 try:
32 # We are using compare_digest to limit the surface of timing attacks
32 # We are using compare_digest to limit the surface of timing attacks
33 from hmac import compare_digest
33 from hmac import compare_digest
34 except ImportError:
34 except ImportError:
35 # Python < 2.7.7: When digests don't match no feedback is provided,
35 # Python < 2.7.7: When digests don't match no feedback is provided,
36 # limiting the surface of attack
36 # limiting the surface of attack
37 def compare_digest(a,b): return a == b
37 def compare_digest(a,b): return a == b
38
38
39 import zmq
39 import zmq
40 from zmq.utils import jsonapi
40 from zmq.utils import jsonapi
41 from zmq.eventloop.ioloop import IOLoop
41 from zmq.eventloop.ioloop import IOLoop
42 from zmq.eventloop.zmqstream import ZMQStream
42 from zmq.eventloop.zmqstream import ZMQStream
43
43
44 from IPython.core.release import kernel_protocol_version
44 from IPython.core.release import kernel_protocol_version
45 from IPython.config.configurable import Configurable, LoggingConfigurable
45 from IPython.config.configurable import Configurable, LoggingConfigurable
46 from IPython.utils import io
46 from IPython.utils import io
47 from IPython.utils.importstring import import_item
47 from IPython.utils.importstring import import_item
48 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
48 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
49 from IPython.utils.py3compat import (str_to_bytes, str_to_unicode, unicode_type,
50 iteritems)
50 iteritems)
51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 DottedObjectName, CUnicode, Dict, Integer,
52 DottedObjectName, CUnicode, Dict, Integer,
53 TraitError,
53 TraitError,
54 )
54 )
55 from IPython.utils.pickleutil import PICKLE_PROTOCOL
55 from IPython.utils.pickleutil import PICKLE_PROTOCOL
56 from IPython.kernel.zmq.serialize import MAX_ITEMS, MAX_BYTES
57 from jupyter_client.adapter import adapt
56 from jupyter_client.adapter import adapt
58
57
59 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
60 # utility functions
59 # utility functions
61 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
62
61
63 def squash_unicode(obj):
62 def squash_unicode(obj):
64 """coerce unicode back to bytestrings."""
63 """coerce unicode back to bytestrings."""
65 if isinstance(obj,dict):
64 if isinstance(obj,dict):
66 for key in obj.keys():
65 for key in obj.keys():
67 obj[key] = squash_unicode(obj[key])
66 obj[key] = squash_unicode(obj[key])
68 if isinstance(key, unicode_type):
67 if isinstance(key, unicode_type):
69 obj[squash_unicode(key)] = obj.pop(key)
68 obj[squash_unicode(key)] = obj.pop(key)
70 elif isinstance(obj, list):
69 elif isinstance(obj, list):
71 for i,v in enumerate(obj):
70 for i,v in enumerate(obj):
72 obj[i] = squash_unicode(v)
71 obj[i] = squash_unicode(v)
73 elif isinstance(obj, unicode_type):
72 elif isinstance(obj, unicode_type):
74 obj = obj.encode('utf8')
73 obj = obj.encode('utf8')
75 return obj
74 return obj
76
75
77 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
78 # globals and defaults
77 # globals and defaults
79 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
80
79
80 # default values for the thresholds:
81 MAX_ITEMS = 64
82 MAX_BYTES = 1024
83
81 # ISO8601-ify datetime objects
84 # ISO8601-ify datetime objects
82 # allow unicode
85 # allow unicode
83 # disallow nan, because it's not actually valid JSON
86 # disallow nan, because it's not actually valid JSON
84 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
87 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default,
85 ensure_ascii=False, allow_nan=False,
88 ensure_ascii=False, allow_nan=False,
86 )
89 )
87 json_unpacker = lambda s: jsonapi.loads(s)
90 json_unpacker = lambda s: jsonapi.loads(s)
88
91
89 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
92 pickle_packer = lambda o: pickle.dumps(squash_dates(o), PICKLE_PROTOCOL)
90 pickle_unpacker = pickle.loads
93 pickle_unpacker = pickle.loads
91
94
92 default_packer = json_packer
95 default_packer = json_packer
93 default_unpacker = json_unpacker
96 default_unpacker = json_unpacker
94
97
95 DELIM = b"<IDS|MSG>"
98 DELIM = b"<IDS|MSG>"
96 # singleton dummy tracker, which will always report as done
99 # singleton dummy tracker, which will always report as done
97 DONE = zmq.MessageTracker()
100 DONE = zmq.MessageTracker()
98
101
99 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
100 # Mixin tools for apps that use Sessions
103 # Mixin tools for apps that use Sessions
101 #-----------------------------------------------------------------------------
104 #-----------------------------------------------------------------------------
102
105
103 session_aliases = dict(
106 session_aliases = dict(
104 ident = 'Session.session',
107 ident = 'Session.session',
105 user = 'Session.username',
108 user = 'Session.username',
106 keyfile = 'Session.keyfile',
109 keyfile = 'Session.keyfile',
107 )
110 )
108
111
109 session_flags = {
112 session_flags = {
110 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
113 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
111 'keyfile' : '' }},
114 'keyfile' : '' }},
112 """Use HMAC digests for authentication of messages.
115 """Use HMAC digests for authentication of messages.
113 Setting this flag will generate a new UUID to use as the HMAC key.
116 Setting this flag will generate a new UUID to use as the HMAC key.
114 """),
117 """),
115 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
118 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
116 """Don't authenticate messages."""),
119 """Don't authenticate messages."""),
117 }
120 }
118
121
119 def default_secure(cfg):
122 def default_secure(cfg):
120 """Set the default behavior for a config environment to be secure.
123 """Set the default behavior for a config environment to be secure.
121
124
122 If Session.key/keyfile have not been set, set Session.key to
125 If Session.key/keyfile have not been set, set Session.key to
123 a new random UUID.
126 a new random UUID.
124 """
127 """
125 warnings.warn("default_secure is deprecated", DeprecationWarning)
128 warnings.warn("default_secure is deprecated", DeprecationWarning)
126 if 'Session' in cfg:
129 if 'Session' in cfg:
127 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
130 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
128 return
131 return
129 # key/keyfile not specified, generate new UUID:
132 # key/keyfile not specified, generate new UUID:
130 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
133 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
131
134
132
135
133 #-----------------------------------------------------------------------------
136 #-----------------------------------------------------------------------------
134 # Classes
137 # Classes
135 #-----------------------------------------------------------------------------
138 #-----------------------------------------------------------------------------
136
139
137 class SessionFactory(LoggingConfigurable):
140 class SessionFactory(LoggingConfigurable):
138 """The Base class for configurables that have a Session, Context, logger,
141 """The Base class for configurables that have a Session, Context, logger,
139 and IOLoop.
142 and IOLoop.
140 """
143 """
141
144
142 logname = Unicode('')
145 logname = Unicode('')
143 def _logname_changed(self, name, old, new):
146 def _logname_changed(self, name, old, new):
144 self.log = logging.getLogger(new)
147 self.log = logging.getLogger(new)
145
148
146 # not configurable:
149 # not configurable:
147 context = Instance('zmq.Context')
150 context = Instance('zmq.Context')
148 def _context_default(self):
151 def _context_default(self):
149 return zmq.Context.instance()
152 return zmq.Context.instance()
150
153
151 session = Instance('jupyter_client.session.Session',
154 session = Instance('jupyter_client.session.Session',
152 allow_none=True)
155 allow_none=True)
153
156
154 loop = Instance('zmq.eventloop.ioloop.IOLoop')
157 loop = Instance('zmq.eventloop.ioloop.IOLoop')
155 def _loop_default(self):
158 def _loop_default(self):
156 return IOLoop.instance()
159 return IOLoop.instance()
157
160
158 def __init__(self, **kwargs):
161 def __init__(self, **kwargs):
159 super(SessionFactory, self).__init__(**kwargs)
162 super(SessionFactory, self).__init__(**kwargs)
160
163
161 if self.session is None:
164 if self.session is None:
162 # construct the session
165 # construct the session
163 self.session = Session(**kwargs)
166 self.session = Session(**kwargs)
164
167
165
168
166 class Message(object):
169 class Message(object):
167 """A simple message object that maps dict keys to attributes.
170 """A simple message object that maps dict keys to attributes.
168
171
169 A Message can be created from a dict and a dict from a Message instance
172 A Message can be created from a dict and a dict from a Message instance
170 simply by calling dict(msg_obj)."""
173 simply by calling dict(msg_obj)."""
171
174
172 def __init__(self, msg_dict):
175 def __init__(self, msg_dict):
173 dct = self.__dict__
176 dct = self.__dict__
174 for k, v in iteritems(dict(msg_dict)):
177 for k, v in iteritems(dict(msg_dict)):
175 if isinstance(v, dict):
178 if isinstance(v, dict):
176 v = Message(v)
179 v = Message(v)
177 dct[k] = v
180 dct[k] = v
178
181
179 # Having this iterator lets dict(msg_obj) work out of the box.
182 # Having this iterator lets dict(msg_obj) work out of the box.
180 def __iter__(self):
183 def __iter__(self):
181 return iter(iteritems(self.__dict__))
184 return iter(iteritems(self.__dict__))
182
185
183 def __repr__(self):
186 def __repr__(self):
184 return repr(self.__dict__)
187 return repr(self.__dict__)
185
188
186 def __str__(self):
189 def __str__(self):
187 return pprint.pformat(self.__dict__)
190 return pprint.pformat(self.__dict__)
188
191
189 def __contains__(self, k):
192 def __contains__(self, k):
190 return k in self.__dict__
193 return k in self.__dict__
191
194
192 def __getitem__(self, k):
195 def __getitem__(self, k):
193 return self.__dict__[k]
196 return self.__dict__[k]
194
197
195
198
196 def msg_header(msg_id, msg_type, username, session):
199 def msg_header(msg_id, msg_type, username, session):
197 date = datetime.now()
200 date = datetime.now()
198 version = kernel_protocol_version
201 version = kernel_protocol_version
199 return locals()
202 return locals()
200
203
201 def extract_header(msg_or_header):
204 def extract_header(msg_or_header):
202 """Given a message or header, return the header."""
205 """Given a message or header, return the header."""
203 if not msg_or_header:
206 if not msg_or_header:
204 return {}
207 return {}
205 try:
208 try:
206 # See if msg_or_header is the entire message.
209 # See if msg_or_header is the entire message.
207 h = msg_or_header['header']
210 h = msg_or_header['header']
208 except KeyError:
211 except KeyError:
209 try:
212 try:
210 # See if msg_or_header is just the header
213 # See if msg_or_header is just the header
211 h = msg_or_header['msg_id']
214 h = msg_or_header['msg_id']
212 except KeyError:
215 except KeyError:
213 raise
216 raise
214 else:
217 else:
215 h = msg_or_header
218 h = msg_or_header
216 if not isinstance(h, dict):
219 if not isinstance(h, dict):
217 h = dict(h)
220 h = dict(h)
218 return h
221 return h
219
222
220 class Session(Configurable):
223 class Session(Configurable):
221 """Object for handling serialization and sending of messages.
224 """Object for handling serialization and sending of messages.
222
225
223 The Session object handles building messages and sending them
226 The Session object handles building messages and sending them
224 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
227 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
225 other over the network via Session objects, and only need to work with the
228 other over the network via Session objects, and only need to work with the
226 dict-based IPython message spec. The Session will handle
229 dict-based IPython message spec. The Session will handle
227 serialization/deserialization, security, and metadata.
230 serialization/deserialization, security, and metadata.
228
231
229 Sessions support configurable serialization via packer/unpacker traits,
232 Sessions support configurable serialization via packer/unpacker traits,
230 and signing with HMAC digests via the key/keyfile traits.
233 and signing with HMAC digests via the key/keyfile traits.
231
234
232 Parameters
235 Parameters
233 ----------
236 ----------
234
237
235 debug : bool
238 debug : bool
236 whether to trigger extra debugging statements
239 whether to trigger extra debugging statements
237 packer/unpacker : str : 'json', 'pickle' or import_string
240 packer/unpacker : str : 'json', 'pickle' or import_string
238 importstrings for methods to serialize message parts. If just
241 importstrings for methods to serialize message parts. If just
239 'json' or 'pickle', predefined JSON and pickle packers will be used.
242 'json' or 'pickle', predefined JSON and pickle packers will be used.
240 Otherwise, the entire importstring must be used.
243 Otherwise, the entire importstring must be used.
241
244
242 The functions must accept at least valid JSON input, and output *bytes*.
245 The functions must accept at least valid JSON input, and output *bytes*.
243
246
244 For example, to use msgpack:
247 For example, to use msgpack:
245 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
248 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
246 pack/unpack : callables
249 pack/unpack : callables
247 You can also set the pack/unpack callables for serialization directly.
250 You can also set the pack/unpack callables for serialization directly.
248 session : bytes
251 session : bytes
249 the ID of this Session object. The default is to generate a new UUID.
252 the ID of this Session object. The default is to generate a new UUID.
250 username : unicode
253 username : unicode
251 username added to message headers. The default is to ask the OS.
254 username added to message headers. The default is to ask the OS.
252 key : bytes
255 key : bytes
253 The key used to initialize an HMAC signature. If unset, messages
256 The key used to initialize an HMAC signature. If unset, messages
254 will not be signed or checked.
257 will not be signed or checked.
255 keyfile : filepath
258 keyfile : filepath
256 The file containing a key. If this is set, `key` will be initialized
259 The file containing a key. If this is set, `key` will be initialized
257 to the contents of the file.
260 to the contents of the file.
258
261
259 """
262 """
260
263
261 debug=Bool(False, config=True, help="""Debug output in the Session""")
264 debug=Bool(False, config=True, help="""Debug output in the Session""")
262
265
263 packer = DottedObjectName('json',config=True,
266 packer = DottedObjectName('json',config=True,
264 help="""The name of the packer for serializing messages.
267 help="""The name of the packer for serializing messages.
265 Should be one of 'json', 'pickle', or an import name
268 Should be one of 'json', 'pickle', or an import name
266 for a custom callable serializer.""")
269 for a custom callable serializer.""")
267 def _packer_changed(self, name, old, new):
270 def _packer_changed(self, name, old, new):
268 if new.lower() == 'json':
271 if new.lower() == 'json':
269 self.pack = json_packer
272 self.pack = json_packer
270 self.unpack = json_unpacker
273 self.unpack = json_unpacker
271 self.unpacker = new
274 self.unpacker = new
272 elif new.lower() == 'pickle':
275 elif new.lower() == 'pickle':
273 self.pack = pickle_packer
276 self.pack = pickle_packer
274 self.unpack = pickle_unpacker
277 self.unpack = pickle_unpacker
275 self.unpacker = new
278 self.unpacker = new
276 else:
279 else:
277 self.pack = import_item(str(new))
280 self.pack = import_item(str(new))
278
281
279 unpacker = DottedObjectName('json', config=True,
282 unpacker = DottedObjectName('json', config=True,
280 help="""The name of the unpacker for unserializing messages.
283 help="""The name of the unpacker for unserializing messages.
281 Only used with custom functions for `packer`.""")
284 Only used with custom functions for `packer`.""")
282 def _unpacker_changed(self, name, old, new):
285 def _unpacker_changed(self, name, old, new):
283 if new.lower() == 'json':
286 if new.lower() == 'json':
284 self.pack = json_packer
287 self.pack = json_packer
285 self.unpack = json_unpacker
288 self.unpack = json_unpacker
286 self.packer = new
289 self.packer = new
287 elif new.lower() == 'pickle':
290 elif new.lower() == 'pickle':
288 self.pack = pickle_packer
291 self.pack = pickle_packer
289 self.unpack = pickle_unpacker
292 self.unpack = pickle_unpacker
290 self.packer = new
293 self.packer = new
291 else:
294 else:
292 self.unpack = import_item(str(new))
295 self.unpack = import_item(str(new))
293
296
294 session = CUnicode(u'', config=True,
297 session = CUnicode(u'', config=True,
295 help="""The UUID identifying this session.""")
298 help="""The UUID identifying this session.""")
296 def _session_default(self):
299 def _session_default(self):
297 u = unicode_type(uuid.uuid4())
300 u = unicode_type(uuid.uuid4())
298 self.bsession = u.encode('ascii')
301 self.bsession = u.encode('ascii')
299 return u
302 return u
300
303
301 def _session_changed(self, name, old, new):
304 def _session_changed(self, name, old, new):
302 self.bsession = self.session.encode('ascii')
305 self.bsession = self.session.encode('ascii')
303
306
304 # bsession is the session as bytes
307 # bsession is the session as bytes
305 bsession = CBytes(b'')
308 bsession = CBytes(b'')
306
309
307 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
310 username = Unicode(str_to_unicode(os.environ.get('USER', 'username')),
308 help="""Username for the Session. Default is your system username.""",
311 help="""Username for the Session. Default is your system username.""",
309 config=True)
312 config=True)
310
313
311 metadata = Dict({}, config=True,
314 metadata = Dict({}, config=True,
312 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
315 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
313
316
314 # if 0, no adapting to do.
317 # if 0, no adapting to do.
315 adapt_version = Integer(0)
318 adapt_version = Integer(0)
316
319
317 # message signature related traits:
320 # message signature related traits:
318
321
319 key = CBytes(config=True,
322 key = CBytes(config=True,
320 help="""execution key, for signing messages.""")
323 help="""execution key, for signing messages.""")
321 def _key_default(self):
324 def _key_default(self):
322 return str_to_bytes(str(uuid.uuid4()))
325 return str_to_bytes(str(uuid.uuid4()))
323
326
324 def _key_changed(self):
327 def _key_changed(self):
325 self._new_auth()
328 self._new_auth()
326
329
327 signature_scheme = Unicode('hmac-sha256', config=True,
330 signature_scheme = Unicode('hmac-sha256', config=True,
328 help="""The digest scheme used to construct the message signatures.
331 help="""The digest scheme used to construct the message signatures.
329 Must have the form 'hmac-HASH'.""")
332 Must have the form 'hmac-HASH'.""")
330 def _signature_scheme_changed(self, name, old, new):
333 def _signature_scheme_changed(self, name, old, new):
331 if not new.startswith('hmac-'):
334 if not new.startswith('hmac-'):
332 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
335 raise TraitError("signature_scheme must start with 'hmac-', got %r" % new)
333 hash_name = new.split('-', 1)[1]
336 hash_name = new.split('-', 1)[1]
334 try:
337 try:
335 self.digest_mod = getattr(hashlib, hash_name)
338 self.digest_mod = getattr(hashlib, hash_name)
336 except AttributeError:
339 except AttributeError:
337 raise TraitError("hashlib has no such attribute: %s" % hash_name)
340 raise TraitError("hashlib has no such attribute: %s" % hash_name)
338 self._new_auth()
341 self._new_auth()
339
342
340 digest_mod = Any()
343 digest_mod = Any()
341 def _digest_mod_default(self):
344 def _digest_mod_default(self):
342 return hashlib.sha256
345 return hashlib.sha256
343
346
344 auth = Instance(hmac.HMAC, allow_none=True)
347 auth = Instance(hmac.HMAC, allow_none=True)
345
348
346 def _new_auth(self):
349 def _new_auth(self):
347 if self.key:
350 if self.key:
348 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
351 self.auth = hmac.HMAC(self.key, digestmod=self.digest_mod)
349 else:
352 else:
350 self.auth = None
353 self.auth = None
351
354
352 digest_history = Set()
355 digest_history = Set()
353 digest_history_size = Integer(2**16, config=True,
356 digest_history_size = Integer(2**16, config=True,
354 help="""The maximum number of digests to remember.
357 help="""The maximum number of digests to remember.
355
358
356 The digest history will be culled when it exceeds this value.
359 The digest history will be culled when it exceeds this value.
357 """
360 """
358 )
361 )
359
362
360 keyfile = Unicode('', config=True,
363 keyfile = Unicode('', config=True,
361 help="""path to file containing execution key.""")
364 help="""path to file containing execution key.""")
362 def _keyfile_changed(self, name, old, new):
365 def _keyfile_changed(self, name, old, new):
363 with open(new, 'rb') as f:
366 with open(new, 'rb') as f:
364 self.key = f.read().strip()
367 self.key = f.read().strip()
365
368
366 # for protecting against sends from forks
369 # for protecting against sends from forks
367 pid = Integer()
370 pid = Integer()
368
371
369 # serialization traits:
372 # serialization traits:
370
373
371 pack = Any(default_packer) # the actual packer function
374 pack = Any(default_packer) # the actual packer function
372 def _pack_changed(self, name, old, new):
375 def _pack_changed(self, name, old, new):
373 if not callable(new):
376 if not callable(new):
374 raise TypeError("packer must be callable, not %s"%type(new))
377 raise TypeError("packer must be callable, not %s"%type(new))
375
378
376 unpack = Any(default_unpacker) # the actual packer function
379 unpack = Any(default_unpacker) # the actual packer function
377 def _unpack_changed(self, name, old, new):
380 def _unpack_changed(self, name, old, new):
378 # unpacker is not checked - it is assumed to be
381 # unpacker is not checked - it is assumed to be
379 if not callable(new):
382 if not callable(new):
380 raise TypeError("unpacker must be callable, not %s"%type(new))
383 raise TypeError("unpacker must be callable, not %s"%type(new))
381
384
382 # thresholds:
385 # thresholds:
383 copy_threshold = Integer(2**16, config=True,
386 copy_threshold = Integer(2**16, config=True,
384 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
387 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
385 buffer_threshold = Integer(MAX_BYTES, config=True,
388 buffer_threshold = Integer(MAX_BYTES, config=True,
386 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
389 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
387 item_threshold = Integer(MAX_ITEMS, config=True,
390 item_threshold = Integer(MAX_ITEMS, config=True,
388 help="""The maximum number of items for a container to be introspected for custom serialization.
391 help="""The maximum number of items for a container to be introspected for custom serialization.
389 Containers larger than this are pickled outright.
392 Containers larger than this are pickled outright.
390 """
393 """
391 )
394 )
392
395
393
396
394 def __init__(self, **kwargs):
397 def __init__(self, **kwargs):
395 """create a Session object
398 """create a Session object
396
399
397 Parameters
400 Parameters
398 ----------
401 ----------
399
402
400 debug : bool
403 debug : bool
401 whether to trigger extra debugging statements
404 whether to trigger extra debugging statements
402 packer/unpacker : str : 'json', 'pickle' or import_string
405 packer/unpacker : str : 'json', 'pickle' or import_string
403 importstrings for methods to serialize message parts. If just
406 importstrings for methods to serialize message parts. If just
404 'json' or 'pickle', predefined JSON and pickle packers will be used.
407 'json' or 'pickle', predefined JSON and pickle packers will be used.
405 Otherwise, the entire importstring must be used.
408 Otherwise, the entire importstring must be used.
406
409
407 The functions must accept at least valid JSON input, and output
410 The functions must accept at least valid JSON input, and output
408 *bytes*.
411 *bytes*.
409
412
410 For example, to use msgpack:
413 For example, to use msgpack:
411 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
414 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
412 pack/unpack : callables
415 pack/unpack : callables
413 You can also set the pack/unpack callables for serialization
416 You can also set the pack/unpack callables for serialization
414 directly.
417 directly.
415 session : unicode (must be ascii)
418 session : unicode (must be ascii)
416 the ID of this Session object. The default is to generate a new
419 the ID of this Session object. The default is to generate a new
417 UUID.
420 UUID.
418 bsession : bytes
421 bsession : bytes
419 The session as bytes
422 The session as bytes
420 username : unicode
423 username : unicode
421 username added to message headers. The default is to ask the OS.
424 username added to message headers. The default is to ask the OS.
422 key : bytes
425 key : bytes
423 The key used to initialize an HMAC signature. If unset, messages
426 The key used to initialize an HMAC signature. If unset, messages
424 will not be signed or checked.
427 will not be signed or checked.
425 signature_scheme : str
428 signature_scheme : str
426 The message digest scheme. Currently must be of the form 'hmac-HASH',
429 The message digest scheme. Currently must be of the form 'hmac-HASH',
427 where 'HASH' is a hashing function available in Python's hashlib.
430 where 'HASH' is a hashing function available in Python's hashlib.
428 The default is 'hmac-sha256'.
431 The default is 'hmac-sha256'.
429 This is ignored if 'key' is empty.
432 This is ignored if 'key' is empty.
430 keyfile : filepath
433 keyfile : filepath
431 The file containing a key. If this is set, `key` will be
434 The file containing a key. If this is set, `key` will be
432 initialized to the contents of the file.
435 initialized to the contents of the file.
433 """
436 """
434 super(Session, self).__init__(**kwargs)
437 super(Session, self).__init__(**kwargs)
435 self._check_packers()
438 self._check_packers()
436 self.none = self.pack({})
439 self.none = self.pack({})
437 # ensure self._session_default() if necessary, so bsession is defined:
440 # ensure self._session_default() if necessary, so bsession is defined:
438 self.session
441 self.session
439 self.pid = os.getpid()
442 self.pid = os.getpid()
440 self._new_auth()
443 self._new_auth()
441
444
442 @property
445 @property
443 def msg_id(self):
446 def msg_id(self):
444 """always return new uuid"""
447 """always return new uuid"""
445 return str(uuid.uuid4())
448 return str(uuid.uuid4())
446
449
447 def _check_packers(self):
450 def _check_packers(self):
448 """check packers for datetime support."""
451 """check packers for datetime support."""
449 pack = self.pack
452 pack = self.pack
450 unpack = self.unpack
453 unpack = self.unpack
451
454
452 # check simple serialization
455 # check simple serialization
453 msg = dict(a=[1,'hi'])
456 msg = dict(a=[1,'hi'])
454 try:
457 try:
455 packed = pack(msg)
458 packed = pack(msg)
456 except Exception as e:
459 except Exception as e:
457 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
460 msg = "packer '{packer}' could not serialize a simple message: {e}{jsonmsg}"
458 if self.packer == 'json':
461 if self.packer == 'json':
459 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
462 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
460 else:
463 else:
461 jsonmsg = ""
464 jsonmsg = ""
462 raise ValueError(
465 raise ValueError(
463 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
466 msg.format(packer=self.packer, e=e, jsonmsg=jsonmsg)
464 )
467 )
465
468
466 # ensure packed message is bytes
469 # ensure packed message is bytes
467 if not isinstance(packed, bytes):
470 if not isinstance(packed, bytes):
468 raise ValueError("message packed to %r, but bytes are required"%type(packed))
471 raise ValueError("message packed to %r, but bytes are required"%type(packed))
469
472
470 # check that unpack is pack's inverse
473 # check that unpack is pack's inverse
471 try:
474 try:
472 unpacked = unpack(packed)
475 unpacked = unpack(packed)
473 assert unpacked == msg
476 assert unpacked == msg
474 except Exception as e:
477 except Exception as e:
475 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
478 msg = "unpacker '{unpacker}' could not handle output from packer '{packer}': {e}{jsonmsg}"
476 if self.packer == 'json':
479 if self.packer == 'json':
477 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
480 jsonmsg = "\nzmq.utils.jsonapi.jsonmod = %s" % jsonapi.jsonmod
478 else:
481 else:
479 jsonmsg = ""
482 jsonmsg = ""
480 raise ValueError(
483 raise ValueError(
481 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
484 msg.format(packer=self.packer, unpacker=self.unpacker, e=e, jsonmsg=jsonmsg)
482 )
485 )
483
486
484 # check datetime support
487 # check datetime support
485 msg = dict(t=datetime.now())
488 msg = dict(t=datetime.now())
486 try:
489 try:
487 unpacked = unpack(pack(msg))
490 unpacked = unpack(pack(msg))
488 if isinstance(unpacked['t'], datetime):
491 if isinstance(unpacked['t'], datetime):
489 raise ValueError("Shouldn't deserialize to datetime")
492 raise ValueError("Shouldn't deserialize to datetime")
490 except Exception:
493 except Exception:
491 self.pack = lambda o: pack(squash_dates(o))
494 self.pack = lambda o: pack(squash_dates(o))
492 self.unpack = lambda s: unpack(s)
495 self.unpack = lambda s: unpack(s)
493
496
494 def msg_header(self, msg_type):
497 def msg_header(self, msg_type):
495 return msg_header(self.msg_id, msg_type, self.username, self.session)
498 return msg_header(self.msg_id, msg_type, self.username, self.session)
496
499
497 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
500 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
498 """Return the nested message dict.
501 """Return the nested message dict.
499
502
500 This format is different from what is sent over the wire. The
503 This format is different from what is sent over the wire. The
501 serialize/deserialize methods converts this nested message dict to the wire
504 serialize/deserialize methods converts this nested message dict to the wire
502 format, which is a list of message parts.
505 format, which is a list of message parts.
503 """
506 """
504 msg = {}
507 msg = {}
505 header = self.msg_header(msg_type) if header is None else header
508 header = self.msg_header(msg_type) if header is None else header
506 msg['header'] = header
509 msg['header'] = header
507 msg['msg_id'] = header['msg_id']
510 msg['msg_id'] = header['msg_id']
508 msg['msg_type'] = header['msg_type']
511 msg['msg_type'] = header['msg_type']
509 msg['parent_header'] = {} if parent is None else extract_header(parent)
512 msg['parent_header'] = {} if parent is None else extract_header(parent)
510 msg['content'] = {} if content is None else content
513 msg['content'] = {} if content is None else content
511 msg['metadata'] = self.metadata.copy()
514 msg['metadata'] = self.metadata.copy()
512 if metadata is not None:
515 if metadata is not None:
513 msg['metadata'].update(metadata)
516 msg['metadata'].update(metadata)
514 return msg
517 return msg
515
518
516 def sign(self, msg_list):
519 def sign(self, msg_list):
517 """Sign a message with HMAC digest. If no auth, return b''.
520 """Sign a message with HMAC digest. If no auth, return b''.
518
521
519 Parameters
522 Parameters
520 ----------
523 ----------
521 msg_list : list
524 msg_list : list
522 The [p_header,p_parent,p_content] part of the message list.
525 The [p_header,p_parent,p_content] part of the message list.
523 """
526 """
524 if self.auth is None:
527 if self.auth is None:
525 return b''
528 return b''
526 h = self.auth.copy()
529 h = self.auth.copy()
527 for m in msg_list:
530 for m in msg_list:
528 h.update(m)
531 h.update(m)
529 return str_to_bytes(h.hexdigest())
532 return str_to_bytes(h.hexdigest())
530
533
531 def serialize(self, msg, ident=None):
534 def serialize(self, msg, ident=None):
532 """Serialize the message components to bytes.
535 """Serialize the message components to bytes.
533
536
534 This is roughly the inverse of deserialize. The serialize/deserialize
537 This is roughly the inverse of deserialize. The serialize/deserialize
535 methods work with full message lists, whereas pack/unpack work with
538 methods work with full message lists, whereas pack/unpack work with
536 the individual message parts in the message list.
539 the individual message parts in the message list.
537
540
538 Parameters
541 Parameters
539 ----------
542 ----------
540 msg : dict or Message
543 msg : dict or Message
541 The next message dict as returned by the self.msg method.
544 The next message dict as returned by the self.msg method.
542
545
543 Returns
546 Returns
544 -------
547 -------
545 msg_list : list
548 msg_list : list
546 The list of bytes objects to be sent with the format::
549 The list of bytes objects to be sent with the format::
547
550
548 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
551 [ident1, ident2, ..., DELIM, HMAC, p_header, p_parent,
549 p_metadata, p_content, buffer1, buffer2, ...]
552 p_metadata, p_content, buffer1, buffer2, ...]
550
553
551 In this list, the ``p_*`` entities are the packed or serialized
554 In this list, the ``p_*`` entities are the packed or serialized
552 versions, so if JSON is used, these are utf8 encoded JSON strings.
555 versions, so if JSON is used, these are utf8 encoded JSON strings.
553 """
556 """
554 content = msg.get('content', {})
557 content = msg.get('content', {})
555 if content is None:
558 if content is None:
556 content = self.none
559 content = self.none
557 elif isinstance(content, dict):
560 elif isinstance(content, dict):
558 content = self.pack(content)
561 content = self.pack(content)
559 elif isinstance(content, bytes):
562 elif isinstance(content, bytes):
560 # content is already packed, as in a relayed message
563 # content is already packed, as in a relayed message
561 pass
564 pass
562 elif isinstance(content, unicode_type):
565 elif isinstance(content, unicode_type):
563 # should be bytes, but JSON often spits out unicode
566 # should be bytes, but JSON often spits out unicode
564 content = content.encode('utf8')
567 content = content.encode('utf8')
565 else:
568 else:
566 raise TypeError("Content incorrect type: %s"%type(content))
569 raise TypeError("Content incorrect type: %s"%type(content))
567
570
568 real_message = [self.pack(msg['header']),
571 real_message = [self.pack(msg['header']),
569 self.pack(msg['parent_header']),
572 self.pack(msg['parent_header']),
570 self.pack(msg['metadata']),
573 self.pack(msg['metadata']),
571 content,
574 content,
572 ]
575 ]
573
576
574 to_send = []
577 to_send = []
575
578
576 if isinstance(ident, list):
579 if isinstance(ident, list):
577 # accept list of idents
580 # accept list of idents
578 to_send.extend(ident)
581 to_send.extend(ident)
579 elif ident is not None:
582 elif ident is not None:
580 to_send.append(ident)
583 to_send.append(ident)
581 to_send.append(DELIM)
584 to_send.append(DELIM)
582
585
583 signature = self.sign(real_message)
586 signature = self.sign(real_message)
584 to_send.append(signature)
587 to_send.append(signature)
585
588
586 to_send.extend(real_message)
589 to_send.extend(real_message)
587
590
588 return to_send
591 return to_send
589
592
590 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
593 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
591 buffers=None, track=False, header=None, metadata=None):
594 buffers=None, track=False, header=None, metadata=None):
592 """Build and send a message via stream or socket.
595 """Build and send a message via stream or socket.
593
596
594 The message format used by this function internally is as follows:
597 The message format used by this function internally is as follows:
595
598
596 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
599 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
597 buffer1,buffer2,...]
600 buffer1,buffer2,...]
598
601
599 The serialize/deserialize methods convert the nested message dict into this
602 The serialize/deserialize methods convert the nested message dict into this
600 format.
603 format.
601
604
602 Parameters
605 Parameters
603 ----------
606 ----------
604
607
605 stream : zmq.Socket or ZMQStream
608 stream : zmq.Socket or ZMQStream
606 The socket-like object used to send the data.
609 The socket-like object used to send the data.
607 msg_or_type : str or Message/dict
610 msg_or_type : str or Message/dict
608 Normally, msg_or_type will be a msg_type unless a message is being
611 Normally, msg_or_type will be a msg_type unless a message is being
609 sent more than once. If a header is supplied, this can be set to
612 sent more than once. If a header is supplied, this can be set to
610 None and the msg_type will be pulled from the header.
613 None and the msg_type will be pulled from the header.
611
614
612 content : dict or None
615 content : dict or None
613 The content of the message (ignored if msg_or_type is a message).
616 The content of the message (ignored if msg_or_type is a message).
614 header : dict or None
617 header : dict or None
615 The header dict for the message (ignored if msg_to_type is a message).
618 The header dict for the message (ignored if msg_to_type is a message).
616 parent : Message or dict or None
619 parent : Message or dict or None
617 The parent or parent header describing the parent of this message
620 The parent or parent header describing the parent of this message
618 (ignored if msg_or_type is a message).
621 (ignored if msg_or_type is a message).
619 ident : bytes or list of bytes
622 ident : bytes or list of bytes
620 The zmq.IDENTITY routing path.
623 The zmq.IDENTITY routing path.
621 metadata : dict or None
624 metadata : dict or None
622 The metadata describing the message
625 The metadata describing the message
623 buffers : list or None
626 buffers : list or None
624 The already-serialized buffers to be appended to the message.
627 The already-serialized buffers to be appended to the message.
625 track : bool
628 track : bool
626 Whether to track. Only for use with Sockets, because ZMQStream
629 Whether to track. Only for use with Sockets, because ZMQStream
627 objects cannot track messages.
630 objects cannot track messages.
628
631
629
632
630 Returns
633 Returns
631 -------
634 -------
632 msg : dict
635 msg : dict
633 The constructed message.
636 The constructed message.
634 """
637 """
635 if not isinstance(stream, zmq.Socket):
638 if not isinstance(stream, zmq.Socket):
636 # ZMQStreams and dummy sockets do not support tracking.
639 # ZMQStreams and dummy sockets do not support tracking.
637 track = False
640 track = False
638
641
639 if isinstance(msg_or_type, (Message, dict)):
642 if isinstance(msg_or_type, (Message, dict)):
640 # We got a Message or message dict, not a msg_type so don't
643 # We got a Message or message dict, not a msg_type so don't
641 # build a new Message.
644 # build a new Message.
642 msg = msg_or_type
645 msg = msg_or_type
643 buffers = buffers or msg.get('buffers', [])
646 buffers = buffers or msg.get('buffers', [])
644 else:
647 else:
645 msg = self.msg(msg_or_type, content=content, parent=parent,
648 msg = self.msg(msg_or_type, content=content, parent=parent,
646 header=header, metadata=metadata)
649 header=header, metadata=metadata)
647 if not os.getpid() == self.pid:
650 if not os.getpid() == self.pid:
648 io.rprint("WARNING: attempted to send message from fork")
651 io.rprint("WARNING: attempted to send message from fork")
649 io.rprint(msg)
652 io.rprint(msg)
650 return
653 return
651 buffers = [] if buffers is None else buffers
654 buffers = [] if buffers is None else buffers
652 if self.adapt_version:
655 if self.adapt_version:
653 msg = adapt(msg, self.adapt_version)
656 msg = adapt(msg, self.adapt_version)
654 to_send = self.serialize(msg, ident)
657 to_send = self.serialize(msg, ident)
655 to_send.extend(buffers)
658 to_send.extend(buffers)
656 longest = max([ len(s) for s in to_send ])
659 longest = max([ len(s) for s in to_send ])
657 copy = (longest < self.copy_threshold)
660 copy = (longest < self.copy_threshold)
658
661
659 if buffers and track and not copy:
662 if buffers and track and not copy:
660 # only really track when we are doing zero-copy buffers
663 # only really track when we are doing zero-copy buffers
661 tracker = stream.send_multipart(to_send, copy=False, track=True)
664 tracker = stream.send_multipart(to_send, copy=False, track=True)
662 else:
665 else:
663 # use dummy tracker, which will be done immediately
666 # use dummy tracker, which will be done immediately
664 tracker = DONE
667 tracker = DONE
665 stream.send_multipart(to_send, copy=copy)
668 stream.send_multipart(to_send, copy=copy)
666
669
667 if self.debug:
670 if self.debug:
668 pprint.pprint(msg)
671 pprint.pprint(msg)
669 pprint.pprint(to_send)
672 pprint.pprint(to_send)
670 pprint.pprint(buffers)
673 pprint.pprint(buffers)
671
674
672 msg['tracker'] = tracker
675 msg['tracker'] = tracker
673
676
674 return msg
677 return msg
675
678
676 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
679 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
677 """Send a raw message via ident path.
680 """Send a raw message via ident path.
678
681
679 This method is used to send a already serialized message.
682 This method is used to send a already serialized message.
680
683
681 Parameters
684 Parameters
682 ----------
685 ----------
683 stream : ZMQStream or Socket
686 stream : ZMQStream or Socket
684 The ZMQ stream or socket to use for sending the message.
687 The ZMQ stream or socket to use for sending the message.
685 msg_list : list
688 msg_list : list
686 The serialized list of messages to send. This only includes the
689 The serialized list of messages to send. This only includes the
687 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
690 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
688 the message.
691 the message.
689 ident : ident or list
692 ident : ident or list
690 A single ident or a list of idents to use in sending.
693 A single ident or a list of idents to use in sending.
691 """
694 """
692 to_send = []
695 to_send = []
693 if isinstance(ident, bytes):
696 if isinstance(ident, bytes):
694 ident = [ident]
697 ident = [ident]
695 if ident is not None:
698 if ident is not None:
696 to_send.extend(ident)
699 to_send.extend(ident)
697
700
698 to_send.append(DELIM)
701 to_send.append(DELIM)
699 to_send.append(self.sign(msg_list))
702 to_send.append(self.sign(msg_list))
700 to_send.extend(msg_list)
703 to_send.extend(msg_list)
701 stream.send_multipart(to_send, flags, copy=copy)
704 stream.send_multipart(to_send, flags, copy=copy)
702
705
703 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
706 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
704 """Receive and unpack a message.
707 """Receive and unpack a message.
705
708
706 Parameters
709 Parameters
707 ----------
710 ----------
708 socket : ZMQStream or Socket
711 socket : ZMQStream or Socket
709 The socket or stream to use in receiving.
712 The socket or stream to use in receiving.
710
713
711 Returns
714 Returns
712 -------
715 -------
713 [idents], msg
716 [idents], msg
714 [idents] is a list of idents and msg is a nested message dict of
717 [idents] is a list of idents and msg is a nested message dict of
715 same format as self.msg returns.
718 same format as self.msg returns.
716 """
719 """
717 if isinstance(socket, ZMQStream):
720 if isinstance(socket, ZMQStream):
718 socket = socket.socket
721 socket = socket.socket
719 try:
722 try:
720 msg_list = socket.recv_multipart(mode, copy=copy)
723 msg_list = socket.recv_multipart(mode, copy=copy)
721 except zmq.ZMQError as e:
724 except zmq.ZMQError as e:
722 if e.errno == zmq.EAGAIN:
725 if e.errno == zmq.EAGAIN:
723 # We can convert EAGAIN to None as we know in this case
726 # We can convert EAGAIN to None as we know in this case
724 # recv_multipart won't return None.
727 # recv_multipart won't return None.
725 return None,None
728 return None,None
726 else:
729 else:
727 raise
730 raise
728 # split multipart message into identity list and message dict
731 # split multipart message into identity list and message dict
729 # invalid large messages can cause very expensive string comparisons
732 # invalid large messages can cause very expensive string comparisons
730 idents, msg_list = self.feed_identities(msg_list, copy)
733 idents, msg_list = self.feed_identities(msg_list, copy)
731 try:
734 try:
732 return idents, self.deserialize(msg_list, content=content, copy=copy)
735 return idents, self.deserialize(msg_list, content=content, copy=copy)
733 except Exception as e:
736 except Exception as e:
734 # TODO: handle it
737 # TODO: handle it
735 raise e
738 raise e
736
739
737 def feed_identities(self, msg_list, copy=True):
740 def feed_identities(self, msg_list, copy=True):
738 """Split the identities from the rest of the message.
741 """Split the identities from the rest of the message.
739
742
740 Feed until DELIM is reached, then return the prefix as idents and
743 Feed until DELIM is reached, then return the prefix as idents and
741 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
744 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
742 but that would be silly.
745 but that would be silly.
743
746
744 Parameters
747 Parameters
745 ----------
748 ----------
746 msg_list : a list of Message or bytes objects
749 msg_list : a list of Message or bytes objects
747 The message to be split.
750 The message to be split.
748 copy : bool
751 copy : bool
749 flag determining whether the arguments are bytes or Messages
752 flag determining whether the arguments are bytes or Messages
750
753
751 Returns
754 Returns
752 -------
755 -------
753 (idents, msg_list) : two lists
756 (idents, msg_list) : two lists
754 idents will always be a list of bytes, each of which is a ZMQ
757 idents will always be a list of bytes, each of which is a ZMQ
755 identity. msg_list will be a list of bytes or zmq.Messages of the
758 identity. msg_list will be a list of bytes or zmq.Messages of the
756 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
759 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
757 should be unpackable/unserializable via self.deserialize at this
760 should be unpackable/unserializable via self.deserialize at this
758 point.
761 point.
759 """
762 """
760 if copy:
763 if copy:
761 idx = msg_list.index(DELIM)
764 idx = msg_list.index(DELIM)
762 return msg_list[:idx], msg_list[idx+1:]
765 return msg_list[:idx], msg_list[idx+1:]
763 else:
766 else:
764 failed = True
767 failed = True
765 for idx,m in enumerate(msg_list):
768 for idx,m in enumerate(msg_list):
766 if m.bytes == DELIM:
769 if m.bytes == DELIM:
767 failed = False
770 failed = False
768 break
771 break
769 if failed:
772 if failed:
770 raise ValueError("DELIM not in msg_list")
773 raise ValueError("DELIM not in msg_list")
771 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
774 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
772 return [m.bytes for m in idents], msg_list
775 return [m.bytes for m in idents], msg_list
773
776
774 def _add_digest(self, signature):
777 def _add_digest(self, signature):
775 """add a digest to history to protect against replay attacks"""
778 """add a digest to history to protect against replay attacks"""
776 if self.digest_history_size == 0:
779 if self.digest_history_size == 0:
777 # no history, never add digests
780 # no history, never add digests
778 return
781 return
779
782
780 self.digest_history.add(signature)
783 self.digest_history.add(signature)
781 if len(self.digest_history) > self.digest_history_size:
784 if len(self.digest_history) > self.digest_history_size:
782 # threshold reached, cull 10%
785 # threshold reached, cull 10%
783 self._cull_digest_history()
786 self._cull_digest_history()
784
787
785 def _cull_digest_history(self):
788 def _cull_digest_history(self):
786 """cull the digest history
789 """cull the digest history
787
790
788 Removes a randomly selected 10% of the digest history
791 Removes a randomly selected 10% of the digest history
789 """
792 """
790 current = len(self.digest_history)
793 current = len(self.digest_history)
791 n_to_cull = max(int(current // 10), current - self.digest_history_size)
794 n_to_cull = max(int(current // 10), current - self.digest_history_size)
792 if n_to_cull >= current:
795 if n_to_cull >= current:
793 self.digest_history = set()
796 self.digest_history = set()
794 return
797 return
795 to_cull = random.sample(self.digest_history, n_to_cull)
798 to_cull = random.sample(self.digest_history, n_to_cull)
796 self.digest_history.difference_update(to_cull)
799 self.digest_history.difference_update(to_cull)
797
800
798 def deserialize(self, msg_list, content=True, copy=True):
801 def deserialize(self, msg_list, content=True, copy=True):
799 """Unserialize a msg_list to a nested message dict.
802 """Unserialize a msg_list to a nested message dict.
800
803
801 This is roughly the inverse of serialize. The serialize/deserialize
804 This is roughly the inverse of serialize. The serialize/deserialize
802 methods work with full message lists, whereas pack/unpack work with
805 methods work with full message lists, whereas pack/unpack work with
803 the individual message parts in the message list.
806 the individual message parts in the message list.
804
807
805 Parameters
808 Parameters
806 ----------
809 ----------
807 msg_list : list of bytes or Message objects
810 msg_list : list of bytes or Message objects
808 The list of message parts of the form [HMAC,p_header,p_parent,
811 The list of message parts of the form [HMAC,p_header,p_parent,
809 p_metadata,p_content,buffer1,buffer2,...].
812 p_metadata,p_content,buffer1,buffer2,...].
810 content : bool (True)
813 content : bool (True)
811 Whether to unpack the content dict (True), or leave it packed
814 Whether to unpack the content dict (True), or leave it packed
812 (False).
815 (False).
813 copy : bool (True)
816 copy : bool (True)
814 Whether msg_list contains bytes (True) or the non-copying Message
817 Whether msg_list contains bytes (True) or the non-copying Message
815 objects in each place (False).
818 objects in each place (False).
816
819
817 Returns
820 Returns
818 -------
821 -------
819 msg : dict
822 msg : dict
820 The nested message dict with top-level keys [header, parent_header,
823 The nested message dict with top-level keys [header, parent_header,
821 content, buffers]. The buffers are returned as memoryviews.
824 content, buffers]. The buffers are returned as memoryviews.
822 """
825 """
823 minlen = 5
826 minlen = 5
824 message = {}
827 message = {}
825 if not copy:
828 if not copy:
826 # pyzmq didn't copy the first parts of the message, so we'll do it
829 # pyzmq didn't copy the first parts of the message, so we'll do it
827 for i in range(minlen):
830 for i in range(minlen):
828 msg_list[i] = msg_list[i].bytes
831 msg_list[i] = msg_list[i].bytes
829 if self.auth is not None:
832 if self.auth is not None:
830 signature = msg_list[0]
833 signature = msg_list[0]
831 if not signature:
834 if not signature:
832 raise ValueError("Unsigned Message")
835 raise ValueError("Unsigned Message")
833 if signature in self.digest_history:
836 if signature in self.digest_history:
834 raise ValueError("Duplicate Signature: %r" % signature)
837 raise ValueError("Duplicate Signature: %r" % signature)
835 self._add_digest(signature)
838 self._add_digest(signature)
836 check = self.sign(msg_list[1:5])
839 check = self.sign(msg_list[1:5])
837 if not compare_digest(signature, check):
840 if not compare_digest(signature, check):
838 raise ValueError("Invalid Signature: %r" % signature)
841 raise ValueError("Invalid Signature: %r" % signature)
839 if not len(msg_list) >= minlen:
842 if not len(msg_list) >= minlen:
840 raise TypeError("malformed message, must have at least %i elements"%minlen)
843 raise TypeError("malformed message, must have at least %i elements"%minlen)
841 header = self.unpack(msg_list[1])
844 header = self.unpack(msg_list[1])
842 message['header'] = extract_dates(header)
845 message['header'] = extract_dates(header)
843 message['msg_id'] = header['msg_id']
846 message['msg_id'] = header['msg_id']
844 message['msg_type'] = header['msg_type']
847 message['msg_type'] = header['msg_type']
845 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
848 message['parent_header'] = extract_dates(self.unpack(msg_list[2]))
846 message['metadata'] = self.unpack(msg_list[3])
849 message['metadata'] = self.unpack(msg_list[3])
847 if content:
850 if content:
848 message['content'] = self.unpack(msg_list[4])
851 message['content'] = self.unpack(msg_list[4])
849 else:
852 else:
850 message['content'] = msg_list[4]
853 message['content'] = msg_list[4]
851 buffers = [memoryview(b) for b in msg_list[5:]]
854 buffers = [memoryview(b) for b in msg_list[5:]]
852 if buffers and buffers[0].shape is None:
855 if buffers and buffers[0].shape is None:
853 # force copy to workaround pyzmq #646
856 # force copy to workaround pyzmq #646
854 buffers = [memoryview(b.bytes) for b in msg_list[5:]]
857 buffers = [memoryview(b.bytes) for b in msg_list[5:]]
855 message['buffers'] = buffers
858 message['buffers'] = buffers
856 # adapt to the current version
859 # adapt to the current version
857 return adapt(message)
860 return adapt(message)
858
861
859 def unserialize(self, *args, **kwargs):
862 def unserialize(self, *args, **kwargs):
860 warnings.warn(
863 warnings.warn(
861 "Session.unserialize is deprecated. Use Session.deserialize.",
864 "Session.unserialize is deprecated. Use Session.deserialize.",
862 DeprecationWarning,
865 DeprecationWarning,
863 )
866 )
864 return self.deserialize(*args, **kwargs)
867 return self.deserialize(*args, **kwargs)
865
868
866
869
867 def test_msg2obj():
870 def test_msg2obj():
868 am = dict(x=1)
871 am = dict(x=1)
869 ao = Message(am)
872 ao = Message(am)
870 assert ao.x == am['x']
873 assert ao.x == am['x']
871
874
872 am['y'] = dict(z=1)
875 am['y'] = dict(z=1)
873 ao = Message(am)
876 ao = Message(am)
874 assert ao.y.z == am['y']['z']
877 assert ao.y.z == am['y']['z']
875
878
876 k1, k2 = 'y', 'z'
879 k1, k2 = 'y', 'z'
877 assert ao[k1][k2] == am[k1][k2]
880 assert ao[k1][k2] == am[k1][k2]
878
881
879 am2 = dict(ao)
882 am2 = dict(ao)
880 assert am['x'] == am2['x']
883 assert am['x'] == am2['x']
881 assert am['y']['z'] == am2['y']['z']
884 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now