##// END OF EJS Templates
respect copy flag in Session.recv
MinRK -
Show More
@@ -1,756 +1,756 b''
1 """Session object for building, serializing, sending, and receiving messages in
1 """Session object for building, serializing, sending, and receiving messages in
2 IPython. The Session object supports serialization, HMAC signatures, and
2 IPython. The Session object supports serialization, HMAC signatures, and
3 metadata on messages.
3 metadata on messages.
4
4
5 Also defined here are utilities for working with Sessions:
5 Also defined here are utilities for working with Sessions:
6 * A SessionFactory to be used as a base class for configurables that work with
6 * A SessionFactory to be used as a base class for configurables that work with
7 Sessions.
7 Sessions.
8 * A Message object for convenience that allows attribute-access to the msg dict.
8 * A Message object for convenience that allows attribute-access to the msg dict.
9
9
10 Authors:
10 Authors:
11
11
12 * Min RK
12 * Min RK
13 * Brian Granger
13 * Brian Granger
14 * Fernando Perez
14 * Fernando Perez
15 """
15 """
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Copyright (C) 2010-2011 The IPython Development Team
17 # Copyright (C) 2010-2011 The IPython Development Team
18 #
18 #
19 # Distributed under the terms of the BSD License. The full license is in
19 # Distributed under the terms of the BSD License. The full license is in
20 # the file COPYING, distributed as part of this software.
20 # the file COPYING, distributed as part of this software.
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
24 # Imports
24 # Imports
25 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
26
26
27 import hmac
27 import hmac
28 import logging
28 import logging
29 import os
29 import os
30 import pprint
30 import pprint
31 import uuid
31 import uuid
32 from datetime import datetime
32 from datetime import datetime
33
33
34 try:
34 try:
35 import cPickle
35 import cPickle
36 pickle = cPickle
36 pickle = cPickle
37 except:
37 except:
38 cPickle = None
38 cPickle = None
39 import pickle
39 import pickle
40
40
41 import zmq
41 import zmq
42 from zmq.utils import jsonapi
42 from zmq.utils import jsonapi
43 from zmq.eventloop.ioloop import IOLoop
43 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.zmqstream import ZMQStream
44 from zmq.eventloop.zmqstream import ZMQStream
45
45
46 from IPython.config.application import Application, boolean_flag
46 from IPython.config.application import Application, boolean_flag
47 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils.importstring import import_item
48 from IPython.utils.importstring import import_item
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.py3compat import str_to_bytes
50 from IPython.utils.py3compat import str_to_bytes
51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 DottedObjectName, CUnicode)
52 DottedObjectName, CUnicode)
53
53
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55 # utility functions
55 # utility functions
56 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
57
57
58 def squash_unicode(obj):
58 def squash_unicode(obj):
59 """coerce unicode back to bytestrings."""
59 """coerce unicode back to bytestrings."""
60 if isinstance(obj,dict):
60 if isinstance(obj,dict):
61 for key in obj.keys():
61 for key in obj.keys():
62 obj[key] = squash_unicode(obj[key])
62 obj[key] = squash_unicode(obj[key])
63 if isinstance(key, unicode):
63 if isinstance(key, unicode):
64 obj[squash_unicode(key)] = obj.pop(key)
64 obj[squash_unicode(key)] = obj.pop(key)
65 elif isinstance(obj, list):
65 elif isinstance(obj, list):
66 for i,v in enumerate(obj):
66 for i,v in enumerate(obj):
67 obj[i] = squash_unicode(v)
67 obj[i] = squash_unicode(v)
68 elif isinstance(obj, unicode):
68 elif isinstance(obj, unicode):
69 obj = obj.encode('utf8')
69 obj = obj.encode('utf8')
70 return obj
70 return obj
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # globals and defaults
73 # globals and defaults
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 # ISO8601-ify datetime objects
77 # ISO8601-ify datetime objects
78 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
78 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
79 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
79 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
80
80
81 pickle_packer = lambda o: pickle.dumps(o,-1)
81 pickle_packer = lambda o: pickle.dumps(o,-1)
82 pickle_unpacker = pickle.loads
82 pickle_unpacker = pickle.loads
83
83
84 default_packer = json_packer
84 default_packer = json_packer
85 default_unpacker = json_unpacker
85 default_unpacker = json_unpacker
86
86
87 DELIM=b"<IDS|MSG>"
87 DELIM=b"<IDS|MSG>"
88
88
89
89
90 #-----------------------------------------------------------------------------
90 #-----------------------------------------------------------------------------
91 # Mixin tools for apps that use Sessions
91 # Mixin tools for apps that use Sessions
92 #-----------------------------------------------------------------------------
92 #-----------------------------------------------------------------------------
93
93
94 session_aliases = dict(
94 session_aliases = dict(
95 ident = 'Session.session',
95 ident = 'Session.session',
96 user = 'Session.username',
96 user = 'Session.username',
97 keyfile = 'Session.keyfile',
97 keyfile = 'Session.keyfile',
98 )
98 )
99
99
100 session_flags = {
100 session_flags = {
101 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
101 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
102 'keyfile' : '' }},
102 'keyfile' : '' }},
103 """Use HMAC digests for authentication of messages.
103 """Use HMAC digests for authentication of messages.
104 Setting this flag will generate a new UUID to use as the HMAC key.
104 Setting this flag will generate a new UUID to use as the HMAC key.
105 """),
105 """),
106 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
106 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
107 """Don't authenticate messages."""),
107 """Don't authenticate messages."""),
108 }
108 }
109
109
110 def default_secure(cfg):
110 def default_secure(cfg):
111 """Set the default behavior for a config environment to be secure.
111 """Set the default behavior for a config environment to be secure.
112
112
113 If Session.key/keyfile have not been set, set Session.key to
113 If Session.key/keyfile have not been set, set Session.key to
114 a new random UUID.
114 a new random UUID.
115 """
115 """
116
116
117 if 'Session' in cfg:
117 if 'Session' in cfg:
118 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
118 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
119 return
119 return
120 # key/keyfile not specified, generate new UUID:
120 # key/keyfile not specified, generate new UUID:
121 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
121 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
122
122
123
123
124 #-----------------------------------------------------------------------------
124 #-----------------------------------------------------------------------------
125 # Classes
125 # Classes
126 #-----------------------------------------------------------------------------
126 #-----------------------------------------------------------------------------
127
127
128 class SessionFactory(LoggingConfigurable):
128 class SessionFactory(LoggingConfigurable):
129 """The Base class for configurables that have a Session, Context, logger,
129 """The Base class for configurables that have a Session, Context, logger,
130 and IOLoop.
130 and IOLoop.
131 """
131 """
132
132
133 logname = Unicode('')
133 logname = Unicode('')
134 def _logname_changed(self, name, old, new):
134 def _logname_changed(self, name, old, new):
135 self.log = logging.getLogger(new)
135 self.log = logging.getLogger(new)
136
136
137 # not configurable:
137 # not configurable:
138 context = Instance('zmq.Context')
138 context = Instance('zmq.Context')
139 def _context_default(self):
139 def _context_default(self):
140 return zmq.Context.instance()
140 return zmq.Context.instance()
141
141
142 session = Instance('IPython.zmq.session.Session')
142 session = Instance('IPython.zmq.session.Session')
143
143
144 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
144 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
145 def _loop_default(self):
145 def _loop_default(self):
146 return IOLoop.instance()
146 return IOLoop.instance()
147
147
148 def __init__(self, **kwargs):
148 def __init__(self, **kwargs):
149 super(SessionFactory, self).__init__(**kwargs)
149 super(SessionFactory, self).__init__(**kwargs)
150
150
151 if self.session is None:
151 if self.session is None:
152 # construct the session
152 # construct the session
153 self.session = Session(**kwargs)
153 self.session = Session(**kwargs)
154
154
155
155
156 class Message(object):
156 class Message(object):
157 """A simple message object that maps dict keys to attributes.
157 """A simple message object that maps dict keys to attributes.
158
158
159 A Message can be created from a dict and a dict from a Message instance
159 A Message can be created from a dict and a dict from a Message instance
160 simply by calling dict(msg_obj)."""
160 simply by calling dict(msg_obj)."""
161
161
162 def __init__(self, msg_dict):
162 def __init__(self, msg_dict):
163 dct = self.__dict__
163 dct = self.__dict__
164 for k, v in dict(msg_dict).iteritems():
164 for k, v in dict(msg_dict).iteritems():
165 if isinstance(v, dict):
165 if isinstance(v, dict):
166 v = Message(v)
166 v = Message(v)
167 dct[k] = v
167 dct[k] = v
168
168
169 # Having this iterator lets dict(msg_obj) work out of the box.
169 # Having this iterator lets dict(msg_obj) work out of the box.
170 def __iter__(self):
170 def __iter__(self):
171 return iter(self.__dict__.iteritems())
171 return iter(self.__dict__.iteritems())
172
172
173 def __repr__(self):
173 def __repr__(self):
174 return repr(self.__dict__)
174 return repr(self.__dict__)
175
175
176 def __str__(self):
176 def __str__(self):
177 return pprint.pformat(self.__dict__)
177 return pprint.pformat(self.__dict__)
178
178
179 def __contains__(self, k):
179 def __contains__(self, k):
180 return k in self.__dict__
180 return k in self.__dict__
181
181
182 def __getitem__(self, k):
182 def __getitem__(self, k):
183 return self.__dict__[k]
183 return self.__dict__[k]
184
184
185
185
186 def msg_header(msg_id, msg_type, username, session):
186 def msg_header(msg_id, msg_type, username, session):
187 date = datetime.now()
187 date = datetime.now()
188 return locals()
188 return locals()
189
189
190 def extract_header(msg_or_header):
190 def extract_header(msg_or_header):
191 """Given a message or header, return the header."""
191 """Given a message or header, return the header."""
192 if not msg_or_header:
192 if not msg_or_header:
193 return {}
193 return {}
194 try:
194 try:
195 # See if msg_or_header is the entire message.
195 # See if msg_or_header is the entire message.
196 h = msg_or_header['header']
196 h = msg_or_header['header']
197 except KeyError:
197 except KeyError:
198 try:
198 try:
199 # See if msg_or_header is just the header
199 # See if msg_or_header is just the header
200 h = msg_or_header['msg_id']
200 h = msg_or_header['msg_id']
201 except KeyError:
201 except KeyError:
202 raise
202 raise
203 else:
203 else:
204 h = msg_or_header
204 h = msg_or_header
205 if not isinstance(h, dict):
205 if not isinstance(h, dict):
206 h = dict(h)
206 h = dict(h)
207 return h
207 return h
208
208
209 class Session(Configurable):
209 class Session(Configurable):
210 """Object for handling serialization and sending of messages.
210 """Object for handling serialization and sending of messages.
211
211
212 The Session object handles building messages and sending them
212 The Session object handles building messages and sending them
213 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
213 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
214 other over the network via Session objects, and only need to work with the
214 other over the network via Session objects, and only need to work with the
215 dict-based IPython message spec. The Session will handle
215 dict-based IPython message spec. The Session will handle
216 serialization/deserialization, security, and metadata.
216 serialization/deserialization, security, and metadata.
217
217
218 Sessions support configurable serialiization via packer/unpacker traits,
218 Sessions support configurable serialiization via packer/unpacker traits,
219 and signing with HMAC digests via the key/keyfile traits.
219 and signing with HMAC digests via the key/keyfile traits.
220
220
221 Parameters
221 Parameters
222 ----------
222 ----------
223
223
224 debug : bool
224 debug : bool
225 whether to trigger extra debugging statements
225 whether to trigger extra debugging statements
226 packer/unpacker : str : 'json', 'pickle' or import_string
226 packer/unpacker : str : 'json', 'pickle' or import_string
227 importstrings for methods to serialize message parts. If just
227 importstrings for methods to serialize message parts. If just
228 'json' or 'pickle', predefined JSON and pickle packers will be used.
228 'json' or 'pickle', predefined JSON and pickle packers will be used.
229 Otherwise, the entire importstring must be used.
229 Otherwise, the entire importstring must be used.
230
230
231 The functions must accept at least valid JSON input, and output *bytes*.
231 The functions must accept at least valid JSON input, and output *bytes*.
232
232
233 For example, to use msgpack:
233 For example, to use msgpack:
234 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
234 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
235 pack/unpack : callables
235 pack/unpack : callables
236 You can also set the pack/unpack callables for serialization directly.
236 You can also set the pack/unpack callables for serialization directly.
237 session : bytes
237 session : bytes
238 the ID of this Session object. The default is to generate a new UUID.
238 the ID of this Session object. The default is to generate a new UUID.
239 username : unicode
239 username : unicode
240 username added to message headers. The default is to ask the OS.
240 username added to message headers. The default is to ask the OS.
241 key : bytes
241 key : bytes
242 The key used to initialize an HMAC signature. If unset, messages
242 The key used to initialize an HMAC signature. If unset, messages
243 will not be signed or checked.
243 will not be signed or checked.
244 keyfile : filepath
244 keyfile : filepath
245 The file containing a key. If this is set, `key` will be initialized
245 The file containing a key. If this is set, `key` will be initialized
246 to the contents of the file.
246 to the contents of the file.
247
247
248 """
248 """
249
249
250 debug=Bool(False, config=True, help="""Debug output in the Session""")
250 debug=Bool(False, config=True, help="""Debug output in the Session""")
251
251
252 packer = DottedObjectName('json',config=True,
252 packer = DottedObjectName('json',config=True,
253 help="""The name of the packer for serializing messages.
253 help="""The name of the packer for serializing messages.
254 Should be one of 'json', 'pickle', or an import name
254 Should be one of 'json', 'pickle', or an import name
255 for a custom callable serializer.""")
255 for a custom callable serializer.""")
256 def _packer_changed(self, name, old, new):
256 def _packer_changed(self, name, old, new):
257 if new.lower() == 'json':
257 if new.lower() == 'json':
258 self.pack = json_packer
258 self.pack = json_packer
259 self.unpack = json_unpacker
259 self.unpack = json_unpacker
260 elif new.lower() == 'pickle':
260 elif new.lower() == 'pickle':
261 self.pack = pickle_packer
261 self.pack = pickle_packer
262 self.unpack = pickle_unpacker
262 self.unpack = pickle_unpacker
263 else:
263 else:
264 self.pack = import_item(str(new))
264 self.pack = import_item(str(new))
265
265
266 unpacker = DottedObjectName('json', config=True,
266 unpacker = DottedObjectName('json', config=True,
267 help="""The name of the unpacker for unserializing messages.
267 help="""The name of the unpacker for unserializing messages.
268 Only used with custom functions for `packer`.""")
268 Only used with custom functions for `packer`.""")
269 def _unpacker_changed(self, name, old, new):
269 def _unpacker_changed(self, name, old, new):
270 if new.lower() == 'json':
270 if new.lower() == 'json':
271 self.pack = json_packer
271 self.pack = json_packer
272 self.unpack = json_unpacker
272 self.unpack = json_unpacker
273 elif new.lower() == 'pickle':
273 elif new.lower() == 'pickle':
274 self.pack = pickle_packer
274 self.pack = pickle_packer
275 self.unpack = pickle_unpacker
275 self.unpack = pickle_unpacker
276 else:
276 else:
277 self.unpack = import_item(str(new))
277 self.unpack = import_item(str(new))
278
278
279 session = CUnicode(u'', config=True,
279 session = CUnicode(u'', config=True,
280 help="""The UUID identifying this session.""")
280 help="""The UUID identifying this session.""")
281 def _session_default(self):
281 def _session_default(self):
282 u = unicode(uuid.uuid4())
282 u = unicode(uuid.uuid4())
283 self.bsession = u.encode('ascii')
283 self.bsession = u.encode('ascii')
284 return u
284 return u
285
285
286 def _session_changed(self, name, old, new):
286 def _session_changed(self, name, old, new):
287 self.bsession = self.session.encode('ascii')
287 self.bsession = self.session.encode('ascii')
288
288
289 # bsession is the session as bytes
289 # bsession is the session as bytes
290 bsession = CBytes(b'')
290 bsession = CBytes(b'')
291
291
292 username = Unicode(os.environ.get('USER',u'username'), config=True,
292 username = Unicode(os.environ.get('USER',u'username'), config=True,
293 help="""Username for the Session. Default is your system username.""")
293 help="""Username for the Session. Default is your system username.""")
294
294
295 # message signature related traits:
295 # message signature related traits:
296
296
297 key = CBytes(b'', config=True,
297 key = CBytes(b'', config=True,
298 help="""execution key, for extra authentication.""")
298 help="""execution key, for extra authentication.""")
299 def _key_changed(self, name, old, new):
299 def _key_changed(self, name, old, new):
300 if new:
300 if new:
301 self.auth = hmac.HMAC(new)
301 self.auth = hmac.HMAC(new)
302 else:
302 else:
303 self.auth = None
303 self.auth = None
304 auth = Instance(hmac.HMAC)
304 auth = Instance(hmac.HMAC)
305 digest_history = Set()
305 digest_history = Set()
306
306
307 keyfile = Unicode('', config=True,
307 keyfile = Unicode('', config=True,
308 help="""path to file containing execution key.""")
308 help="""path to file containing execution key.""")
309 def _keyfile_changed(self, name, old, new):
309 def _keyfile_changed(self, name, old, new):
310 with open(new, 'rb') as f:
310 with open(new, 'rb') as f:
311 self.key = f.read().strip()
311 self.key = f.read().strip()
312
312
313 # serialization traits:
313 # serialization traits:
314
314
315 pack = Any(default_packer) # the actual packer function
315 pack = Any(default_packer) # the actual packer function
316 def _pack_changed(self, name, old, new):
316 def _pack_changed(self, name, old, new):
317 if not callable(new):
317 if not callable(new):
318 raise TypeError("packer must be callable, not %s"%type(new))
318 raise TypeError("packer must be callable, not %s"%type(new))
319
319
320 unpack = Any(default_unpacker) # the actual packer function
320 unpack = Any(default_unpacker) # the actual packer function
321 def _unpack_changed(self, name, old, new):
321 def _unpack_changed(self, name, old, new):
322 # unpacker is not checked - it is assumed to be
322 # unpacker is not checked - it is assumed to be
323 if not callable(new):
323 if not callable(new):
324 raise TypeError("unpacker must be callable, not %s"%type(new))
324 raise TypeError("unpacker must be callable, not %s"%type(new))
325
325
326 def __init__(self, **kwargs):
326 def __init__(self, **kwargs):
327 """create a Session object
327 """create a Session object
328
328
329 Parameters
329 Parameters
330 ----------
330 ----------
331
331
332 debug : bool
332 debug : bool
333 whether to trigger extra debugging statements
333 whether to trigger extra debugging statements
334 packer/unpacker : str : 'json', 'pickle' or import_string
334 packer/unpacker : str : 'json', 'pickle' or import_string
335 importstrings for methods to serialize message parts. If just
335 importstrings for methods to serialize message parts. If just
336 'json' or 'pickle', predefined JSON and pickle packers will be used.
336 'json' or 'pickle', predefined JSON and pickle packers will be used.
337 Otherwise, the entire importstring must be used.
337 Otherwise, the entire importstring must be used.
338
338
339 The functions must accept at least valid JSON input, and output
339 The functions must accept at least valid JSON input, and output
340 *bytes*.
340 *bytes*.
341
341
342 For example, to use msgpack:
342 For example, to use msgpack:
343 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
343 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
344 pack/unpack : callables
344 pack/unpack : callables
345 You can also set the pack/unpack callables for serialization
345 You can also set the pack/unpack callables for serialization
346 directly.
346 directly.
347 session : unicode (must be ascii)
347 session : unicode (must be ascii)
348 the ID of this Session object. The default is to generate a new
348 the ID of this Session object. The default is to generate a new
349 UUID.
349 UUID.
350 bsession : bytes
350 bsession : bytes
351 The session as bytes
351 The session as bytes
352 username : unicode
352 username : unicode
353 username added to message headers. The default is to ask the OS.
353 username added to message headers. The default is to ask the OS.
354 key : bytes
354 key : bytes
355 The key used to initialize an HMAC signature. If unset, messages
355 The key used to initialize an HMAC signature. If unset, messages
356 will not be signed or checked.
356 will not be signed or checked.
357 keyfile : filepath
357 keyfile : filepath
358 The file containing a key. If this is set, `key` will be
358 The file containing a key. If this is set, `key` will be
359 initialized to the contents of the file.
359 initialized to the contents of the file.
360 """
360 """
361 super(Session, self).__init__(**kwargs)
361 super(Session, self).__init__(**kwargs)
362 self._check_packers()
362 self._check_packers()
363 self.none = self.pack({})
363 self.none = self.pack({})
364 # ensure self._session_default() if necessary, so bsession is defined:
364 # ensure self._session_default() if necessary, so bsession is defined:
365 self.session
365 self.session
366
366
367 @property
367 @property
368 def msg_id(self):
368 def msg_id(self):
369 """always return new uuid"""
369 """always return new uuid"""
370 return str(uuid.uuid4())
370 return str(uuid.uuid4())
371
371
372 def _check_packers(self):
372 def _check_packers(self):
373 """check packers for binary data and datetime support."""
373 """check packers for binary data and datetime support."""
374 pack = self.pack
374 pack = self.pack
375 unpack = self.unpack
375 unpack = self.unpack
376
376
377 # check simple serialization
377 # check simple serialization
378 msg = dict(a=[1,'hi'])
378 msg = dict(a=[1,'hi'])
379 try:
379 try:
380 packed = pack(msg)
380 packed = pack(msg)
381 except Exception:
381 except Exception:
382 raise ValueError("packer could not serialize a simple message")
382 raise ValueError("packer could not serialize a simple message")
383
383
384 # ensure packed message is bytes
384 # ensure packed message is bytes
385 if not isinstance(packed, bytes):
385 if not isinstance(packed, bytes):
386 raise ValueError("message packed to %r, but bytes are required"%type(packed))
386 raise ValueError("message packed to %r, but bytes are required"%type(packed))
387
387
388 # check that unpack is pack's inverse
388 # check that unpack is pack's inverse
389 try:
389 try:
390 unpacked = unpack(packed)
390 unpacked = unpack(packed)
391 except Exception:
391 except Exception:
392 raise ValueError("unpacker could not handle the packer's output")
392 raise ValueError("unpacker could not handle the packer's output")
393
393
394 # check datetime support
394 # check datetime support
395 msg = dict(t=datetime.now())
395 msg = dict(t=datetime.now())
396 try:
396 try:
397 unpacked = unpack(pack(msg))
397 unpacked = unpack(pack(msg))
398 except Exception:
398 except Exception:
399 self.pack = lambda o: pack(squash_dates(o))
399 self.pack = lambda o: pack(squash_dates(o))
400 self.unpack = lambda s: extract_dates(unpack(s))
400 self.unpack = lambda s: extract_dates(unpack(s))
401
401
402 def msg_header(self, msg_type):
402 def msg_header(self, msg_type):
403 return msg_header(self.msg_id, msg_type, self.username, self.session)
403 return msg_header(self.msg_id, msg_type, self.username, self.session)
404
404
405 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
405 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
406 """Return the nested message dict.
406 """Return the nested message dict.
407
407
408 This format is different from what is sent over the wire. The
408 This format is different from what is sent over the wire. The
409 serialize/unserialize methods converts this nested message dict to the wire
409 serialize/unserialize methods converts this nested message dict to the wire
410 format, which is a list of message parts.
410 format, which is a list of message parts.
411 """
411 """
412 msg = {}
412 msg = {}
413 header = self.msg_header(msg_type) if header is None else header
413 header = self.msg_header(msg_type) if header is None else header
414 msg['header'] = header
414 msg['header'] = header
415 msg['msg_id'] = header['msg_id']
415 msg['msg_id'] = header['msg_id']
416 msg['msg_type'] = header['msg_type']
416 msg['msg_type'] = header['msg_type']
417 msg['parent_header'] = {} if parent is None else extract_header(parent)
417 msg['parent_header'] = {} if parent is None else extract_header(parent)
418 msg['content'] = {} if content is None else content
418 msg['content'] = {} if content is None else content
419 sub = {} if subheader is None else subheader
419 sub = {} if subheader is None else subheader
420 msg['header'].update(sub)
420 msg['header'].update(sub)
421 return msg
421 return msg
422
422
423 def sign(self, msg_list):
423 def sign(self, msg_list):
424 """Sign a message with HMAC digest. If no auth, return b''.
424 """Sign a message with HMAC digest. If no auth, return b''.
425
425
426 Parameters
426 Parameters
427 ----------
427 ----------
428 msg_list : list
428 msg_list : list
429 The [p_header,p_parent,p_content] part of the message list.
429 The [p_header,p_parent,p_content] part of the message list.
430 """
430 """
431 if self.auth is None:
431 if self.auth is None:
432 return b''
432 return b''
433 h = self.auth.copy()
433 h = self.auth.copy()
434 for m in msg_list:
434 for m in msg_list:
435 h.update(m)
435 h.update(m)
436 return str_to_bytes(h.hexdigest())
436 return str_to_bytes(h.hexdigest())
437
437
438 def serialize(self, msg, ident=None):
438 def serialize(self, msg, ident=None):
439 """Serialize the message components to bytes.
439 """Serialize the message components to bytes.
440
440
441 This is roughly the inverse of unserialize. The serialize/unserialize
441 This is roughly the inverse of unserialize. The serialize/unserialize
442 methods work with full message lists, whereas pack/unpack work with
442 methods work with full message lists, whereas pack/unpack work with
443 the individual message parts in the message list.
443 the individual message parts in the message list.
444
444
445 Parameters
445 Parameters
446 ----------
446 ----------
447 msg : dict or Message
447 msg : dict or Message
448 The nexted message dict as returned by the self.msg method.
448 The nexted message dict as returned by the self.msg method.
449
449
450 Returns
450 Returns
451 -------
451 -------
452 msg_list : list
452 msg_list : list
453 The list of bytes objects to be sent with the format:
453 The list of bytes objects to be sent with the format:
454 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
454 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
455 buffer1,buffer2,...]. In this list, the p_* entities are
455 buffer1,buffer2,...]. In this list, the p_* entities are
456 the packed or serialized versions, so if JSON is used, these
456 the packed or serialized versions, so if JSON is used, these
457 are utf8 encoded JSON strings.
457 are utf8 encoded JSON strings.
458 """
458 """
459 content = msg.get('content', {})
459 content = msg.get('content', {})
460 if content is None:
460 if content is None:
461 content = self.none
461 content = self.none
462 elif isinstance(content, dict):
462 elif isinstance(content, dict):
463 content = self.pack(content)
463 content = self.pack(content)
464 elif isinstance(content, bytes):
464 elif isinstance(content, bytes):
465 # content is already packed, as in a relayed message
465 # content is already packed, as in a relayed message
466 pass
466 pass
467 elif isinstance(content, unicode):
467 elif isinstance(content, unicode):
468 # should be bytes, but JSON often spits out unicode
468 # should be bytes, but JSON often spits out unicode
469 content = content.encode('utf8')
469 content = content.encode('utf8')
470 else:
470 else:
471 raise TypeError("Content incorrect type: %s"%type(content))
471 raise TypeError("Content incorrect type: %s"%type(content))
472
472
473 real_message = [self.pack(msg['header']),
473 real_message = [self.pack(msg['header']),
474 self.pack(msg['parent_header']),
474 self.pack(msg['parent_header']),
475 content
475 content
476 ]
476 ]
477
477
478 to_send = []
478 to_send = []
479
479
480 if isinstance(ident, list):
480 if isinstance(ident, list):
481 # accept list of idents
481 # accept list of idents
482 to_send.extend(ident)
482 to_send.extend(ident)
483 elif ident is not None:
483 elif ident is not None:
484 to_send.append(ident)
484 to_send.append(ident)
485 to_send.append(DELIM)
485 to_send.append(DELIM)
486
486
487 signature = self.sign(real_message)
487 signature = self.sign(real_message)
488 to_send.append(signature)
488 to_send.append(signature)
489
489
490 to_send.extend(real_message)
490 to_send.extend(real_message)
491
491
492 return to_send
492 return to_send
493
493
494 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
494 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
495 buffers=None, subheader=None, track=False, header=None):
495 buffers=None, subheader=None, track=False, header=None):
496 """Build and send a message via stream or socket.
496 """Build and send a message via stream or socket.
497
497
498 The message format used by this function internally is as follows:
498 The message format used by this function internally is as follows:
499
499
500 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
500 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
501 buffer1,buffer2,...]
501 buffer1,buffer2,...]
502
502
503 The serialize/unserialize methods convert the nested message dict into this
503 The serialize/unserialize methods convert the nested message dict into this
504 format.
504 format.
505
505
506 Parameters
506 Parameters
507 ----------
507 ----------
508
508
509 stream : zmq.Socket or ZMQStream
509 stream : zmq.Socket or ZMQStream
510 The socket-like object used to send the data.
510 The socket-like object used to send the data.
511 msg_or_type : str or Message/dict
511 msg_or_type : str or Message/dict
512 Normally, msg_or_type will be a msg_type unless a message is being
512 Normally, msg_or_type will be a msg_type unless a message is being
513 sent more than once. If a header is supplied, this can be set to
513 sent more than once. If a header is supplied, this can be set to
514 None and the msg_type will be pulled from the header.
514 None and the msg_type will be pulled from the header.
515
515
516 content : dict or None
516 content : dict or None
517 The content of the message (ignored if msg_or_type is a message).
517 The content of the message (ignored if msg_or_type is a message).
518 header : dict or None
518 header : dict or None
519 The header dict for the message (ignores if msg_to_type is a message).
519 The header dict for the message (ignores if msg_to_type is a message).
520 parent : Message or dict or None
520 parent : Message or dict or None
521 The parent or parent header describing the parent of this message
521 The parent or parent header describing the parent of this message
522 (ignored if msg_or_type is a message).
522 (ignored if msg_or_type is a message).
523 ident : bytes or list of bytes
523 ident : bytes or list of bytes
524 The zmq.IDENTITY routing path.
524 The zmq.IDENTITY routing path.
525 subheader : dict or None
525 subheader : dict or None
526 Extra header keys for this message's header (ignored if msg_or_type
526 Extra header keys for this message's header (ignored if msg_or_type
527 is a message).
527 is a message).
528 buffers : list or None
528 buffers : list or None
529 The already-serialized buffers to be appended to the message.
529 The already-serialized buffers to be appended to the message.
530 track : bool
530 track : bool
531 Whether to track. Only for use with Sockets, because ZMQStream
531 Whether to track. Only for use with Sockets, because ZMQStream
532 objects cannot track messages.
532 objects cannot track messages.
533
533
534 Returns
534 Returns
535 -------
535 -------
536 msg : dict
536 msg : dict
537 The constructed message.
537 The constructed message.
538 (msg,tracker) : (dict, MessageTracker)
538 (msg,tracker) : (dict, MessageTracker)
539 if track=True, then a 2-tuple will be returned,
539 if track=True, then a 2-tuple will be returned,
540 the first element being the constructed
540 the first element being the constructed
541 message, and the second being the MessageTracker
541 message, and the second being the MessageTracker
542
542
543 """
543 """
544
544
545 if not isinstance(stream, (zmq.Socket, ZMQStream)):
545 if not isinstance(stream, (zmq.Socket, ZMQStream)):
546 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
546 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
547 elif track and isinstance(stream, ZMQStream):
547 elif track and isinstance(stream, ZMQStream):
548 raise TypeError("ZMQStream cannot track messages")
548 raise TypeError("ZMQStream cannot track messages")
549
549
550 if isinstance(msg_or_type, (Message, dict)):
550 if isinstance(msg_or_type, (Message, dict)):
551 # We got a Message or message dict, not a msg_type so don't
551 # We got a Message or message dict, not a msg_type so don't
552 # build a new Message.
552 # build a new Message.
553 msg = msg_or_type
553 msg = msg_or_type
554 else:
554 else:
555 msg = self.msg(msg_or_type, content=content, parent=parent,
555 msg = self.msg(msg_or_type, content=content, parent=parent,
556 subheader=subheader, header=header)
556 subheader=subheader, header=header)
557
557
558 buffers = [] if buffers is None else buffers
558 buffers = [] if buffers is None else buffers
559 to_send = self.serialize(msg, ident)
559 to_send = self.serialize(msg, ident)
560 flag = 0
560 flag = 0
561 if buffers:
561 if buffers:
562 flag = zmq.SNDMORE
562 flag = zmq.SNDMORE
563 _track = False
563 _track = False
564 else:
564 else:
565 _track=track
565 _track=track
566 if track:
566 if track:
567 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
567 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
568 else:
568 else:
569 tracker = stream.send_multipart(to_send, flag, copy=False)
569 tracker = stream.send_multipart(to_send, flag, copy=False)
570 for b in buffers[:-1]:
570 for b in buffers[:-1]:
571 stream.send(b, flag, copy=False)
571 stream.send(b, flag, copy=False)
572 if buffers:
572 if buffers:
573 if track:
573 if track:
574 tracker = stream.send(buffers[-1], copy=False, track=track)
574 tracker = stream.send(buffers[-1], copy=False, track=track)
575 else:
575 else:
576 tracker = stream.send(buffers[-1], copy=False)
576 tracker = stream.send(buffers[-1], copy=False)
577
577
578 # omsg = Message(msg)
578 # omsg = Message(msg)
579 if self.debug:
579 if self.debug:
580 pprint.pprint(msg)
580 pprint.pprint(msg)
581 pprint.pprint(to_send)
581 pprint.pprint(to_send)
582 pprint.pprint(buffers)
582 pprint.pprint(buffers)
583
583
584 msg['tracker'] = tracker
584 msg['tracker'] = tracker
585
585
586 return msg
586 return msg
587
587
588 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
588 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
589 """Send a raw message via ident path.
589 """Send a raw message via ident path.
590
590
591 This method is used to send a already serialized message.
591 This method is used to send a already serialized message.
592
592
593 Parameters
593 Parameters
594 ----------
594 ----------
595 stream : ZMQStream or Socket
595 stream : ZMQStream or Socket
596 The ZMQ stream or socket to use for sending the message.
596 The ZMQ stream or socket to use for sending the message.
597 msg_list : list
597 msg_list : list
598 The serialized list of messages to send. This only includes the
598 The serialized list of messages to send. This only includes the
599 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
599 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
600 the message.
600 the message.
601 ident : ident or list
601 ident : ident or list
602 A single ident or a list of idents to use in sending.
602 A single ident or a list of idents to use in sending.
603 """
603 """
604 to_send = []
604 to_send = []
605 if isinstance(ident, bytes):
605 if isinstance(ident, bytes):
606 ident = [ident]
606 ident = [ident]
607 if ident is not None:
607 if ident is not None:
608 to_send.extend(ident)
608 to_send.extend(ident)
609
609
610 to_send.append(DELIM)
610 to_send.append(DELIM)
611 to_send.append(self.sign(msg_list))
611 to_send.append(self.sign(msg_list))
612 to_send.extend(msg_list)
612 to_send.extend(msg_list)
613 stream.send_multipart(msg_list, flags, copy=copy)
613 stream.send_multipart(msg_list, flags, copy=copy)
614
614
615 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
615 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
616 """Receive and unpack a message.
616 """Receive and unpack a message.
617
617
618 Parameters
618 Parameters
619 ----------
619 ----------
620 socket : ZMQStream or Socket
620 socket : ZMQStream or Socket
621 The socket or stream to use in receiving.
621 The socket or stream to use in receiving.
622
622
623 Returns
623 Returns
624 -------
624 -------
625 [idents], msg
625 [idents], msg
626 [idents] is a list of idents and msg is a nested message dict of
626 [idents] is a list of idents and msg is a nested message dict of
627 same format as self.msg returns.
627 same format as self.msg returns.
628 """
628 """
629 if isinstance(socket, ZMQStream):
629 if isinstance(socket, ZMQStream):
630 socket = socket.socket
630 socket = socket.socket
631 try:
631 try:
632 msg_list = socket.recv_multipart(mode)
632 msg_list = socket.recv_multipart(mode, copy=copy)
633 except zmq.ZMQError as e:
633 except zmq.ZMQError as e:
634 if e.errno == zmq.EAGAIN:
634 if e.errno == zmq.EAGAIN:
635 # We can convert EAGAIN to None as we know in this case
635 # We can convert EAGAIN to None as we know in this case
636 # recv_multipart won't return None.
636 # recv_multipart won't return None.
637 return None,None
637 return None,None
638 else:
638 else:
639 raise
639 raise
640 # split multipart message into identity list and message dict
640 # split multipart message into identity list and message dict
641 # invalid large messages can cause very expensive string comparisons
641 # invalid large messages can cause very expensive string comparisons
642 idents, msg_list = self.feed_identities(msg_list, copy)
642 idents, msg_list = self.feed_identities(msg_list, copy)
643 try:
643 try:
644 return idents, self.unserialize(msg_list, content=content, copy=copy)
644 return idents, self.unserialize(msg_list, content=content, copy=copy)
645 except Exception as e:
645 except Exception as e:
646 # TODO: handle it
646 # TODO: handle it
647 raise e
647 raise e
648
648
649 def feed_identities(self, msg_list, copy=True):
649 def feed_identities(self, msg_list, copy=True):
650 """Split the identities from the rest of the message.
650 """Split the identities from the rest of the message.
651
651
652 Feed until DELIM is reached, then return the prefix as idents and
652 Feed until DELIM is reached, then return the prefix as idents and
653 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
653 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
654 but that would be silly.
654 but that would be silly.
655
655
656 Parameters
656 Parameters
657 ----------
657 ----------
658 msg_list : a list of Message or bytes objects
658 msg_list : a list of Message or bytes objects
659 The message to be split.
659 The message to be split.
660 copy : bool
660 copy : bool
661 flag determining whether the arguments are bytes or Messages
661 flag determining whether the arguments are bytes or Messages
662
662
663 Returns
663 Returns
664 -------
664 -------
665 (idents, msg_list) : two lists
665 (idents, msg_list) : two lists
666 idents will always be a list of bytes, each of which is a ZMQ
666 idents will always be a list of bytes, each of which is a ZMQ
667 identity. msg_list will be a list of bytes or zmq.Messages of the
667 identity. msg_list will be a list of bytes or zmq.Messages of the
668 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
668 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
669 should be unpackable/unserializable via self.unserialize at this
669 should be unpackable/unserializable via self.unserialize at this
670 point.
670 point.
671 """
671 """
672 if copy:
672 if copy:
673 idx = msg_list.index(DELIM)
673 idx = msg_list.index(DELIM)
674 return msg_list[:idx], msg_list[idx+1:]
674 return msg_list[:idx], msg_list[idx+1:]
675 else:
675 else:
676 failed = True
676 failed = True
677 for idx,m in enumerate(msg_list):
677 for idx,m in enumerate(msg_list):
678 if m.bytes == DELIM:
678 if m.bytes == DELIM:
679 failed = False
679 failed = False
680 break
680 break
681 if failed:
681 if failed:
682 raise ValueError("DELIM not in msg_list")
682 raise ValueError("DELIM not in msg_list")
683 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
683 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
684 return [m.bytes for m in idents], msg_list
684 return [m.bytes for m in idents], msg_list
685
685
686 def unserialize(self, msg_list, content=True, copy=True):
686 def unserialize(self, msg_list, content=True, copy=True):
687 """Unserialize a msg_list to a nested message dict.
687 """Unserialize a msg_list to a nested message dict.
688
688
689 This is roughly the inverse of serialize. The serialize/unserialize
689 This is roughly the inverse of serialize. The serialize/unserialize
690 methods work with full message lists, whereas pack/unpack work with
690 methods work with full message lists, whereas pack/unpack work with
691 the individual message parts in the message list.
691 the individual message parts in the message list.
692
692
693 Parameters:
693 Parameters:
694 -----------
694 -----------
695 msg_list : list of bytes or Message objects
695 msg_list : list of bytes or Message objects
696 The list of message parts of the form [HMAC,p_header,p_parent,
696 The list of message parts of the form [HMAC,p_header,p_parent,
697 p_content,buffer1,buffer2,...].
697 p_content,buffer1,buffer2,...].
698 content : bool (True)
698 content : bool (True)
699 Whether to unpack the content dict (True), or leave it packed
699 Whether to unpack the content dict (True), or leave it packed
700 (False).
700 (False).
701 copy : bool (True)
701 copy : bool (True)
702 Whether to return the bytes (True), or the non-copying Message
702 Whether to return the bytes (True), or the non-copying Message
703 object in each place (False).
703 object in each place (False).
704
704
705 Returns
705 Returns
706 -------
706 -------
707 msg : dict
707 msg : dict
708 The nested message dict with top-level keys [header, parent_header,
708 The nested message dict with top-level keys [header, parent_header,
709 content, buffers].
709 content, buffers].
710 """
710 """
711 minlen = 4
711 minlen = 4
712 message = {}
712 message = {}
713 if not copy:
713 if not copy:
714 for i in range(minlen):
714 for i in range(minlen):
715 msg_list[i] = msg_list[i].bytes
715 msg_list[i] = msg_list[i].bytes
716 if self.auth is not None:
716 if self.auth is not None:
717 signature = msg_list[0]
717 signature = msg_list[0]
718 if not signature:
718 if not signature:
719 raise ValueError("Unsigned Message")
719 raise ValueError("Unsigned Message")
720 if signature in self.digest_history:
720 if signature in self.digest_history:
721 raise ValueError("Duplicate Signature: %r"%signature)
721 raise ValueError("Duplicate Signature: %r"%signature)
722 self.digest_history.add(signature)
722 self.digest_history.add(signature)
723 check = self.sign(msg_list[1:4])
723 check = self.sign(msg_list[1:4])
724 if not signature == check:
724 if not signature == check:
725 raise ValueError("Invalid Signature: %r"%signature)
725 raise ValueError("Invalid Signature: %r"%signature)
726 if not len(msg_list) >= minlen:
726 if not len(msg_list) >= minlen:
727 raise TypeError("malformed message, must have at least %i elements"%minlen)
727 raise TypeError("malformed message, must have at least %i elements"%minlen)
728 header = self.unpack(msg_list[1])
728 header = self.unpack(msg_list[1])
729 message['header'] = header
729 message['header'] = header
730 message['msg_id'] = header['msg_id']
730 message['msg_id'] = header['msg_id']
731 message['msg_type'] = header['msg_type']
731 message['msg_type'] = header['msg_type']
732 message['parent_header'] = self.unpack(msg_list[2])
732 message['parent_header'] = self.unpack(msg_list[2])
733 if content:
733 if content:
734 message['content'] = self.unpack(msg_list[3])
734 message['content'] = self.unpack(msg_list[3])
735 else:
735 else:
736 message['content'] = msg_list[3]
736 message['content'] = msg_list[3]
737
737
738 message['buffers'] = msg_list[4:]
738 message['buffers'] = msg_list[4:]
739 return message
739 return message
740
740
741 def test_msg2obj():
741 def test_msg2obj():
742 am = dict(x=1)
742 am = dict(x=1)
743 ao = Message(am)
743 ao = Message(am)
744 assert ao.x == am['x']
744 assert ao.x == am['x']
745
745
746 am['y'] = dict(z=1)
746 am['y'] = dict(z=1)
747 ao = Message(am)
747 ao = Message(am)
748 assert ao.y.z == am['y']['z']
748 assert ao.y.z == am['y']['z']
749
749
750 k1, k2 = 'y', 'z'
750 k1, k2 = 'y', 'z'
751 assert ao[k1][k2] == am[k1][k2]
751 assert ao[k1][k2] == am[k1][k2]
752
752
753 am2 = dict(ao)
753 am2 = dict(ao)
754 assert am['x'] == am2['x']
754 assert am['x'] == am2['x']
755 assert am['y']['z'] == am2['y']['z']
755 assert am['y']['z'] == am2['y']['z']
756
756
General Comments 0
You need to be logged in to leave comments. Login now