##// END OF EJS Templates
don't print info when message unpacking fails...
MinRK -
Show More
@@ -1,697 +1,698 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Session object for building, serializing, sending, and receiving messages in
2 """Session object for building, serializing, sending, and receiving messages in
3 IPython. The Session object supports serialization, HMAC signatures, and
3 IPython. The Session object supports serialization, HMAC signatures, and
4 metadata on messages.
4 metadata on messages.
5
5
6 Also defined here are utilities for working with Sessions:
6 Also defined here are utilities for working with Sessions:
7 * A SessionFactory to be used as a base class for configurables that work with
7 * A SessionFactory to be used as a base class for configurables that work with
8 Sessions.
8 Sessions.
9 * A Message object for convenience that allows attribute-access to the msg dict.
9 * A Message object for convenience that allows attribute-access to the msg dict.
10
10
11 Authors:
11 Authors:
12
12
13 * Min RK
13 * Min RK
14 * Brian Granger
14 * Brian Granger
15 * Fernando Perez
15 * Fernando Perez
16 """
16 """
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Copyright (C) 2010-2011 The IPython Development Team
18 # Copyright (C) 2010-2011 The IPython Development Team
19 #
19 #
20 # Distributed under the terms of the BSD License. The full license is in
20 # Distributed under the terms of the BSD License. The full license is in
21 # the file COPYING, distributed as part of this software.
21 # the file COPYING, distributed as part of this software.
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Imports
25 # Imports
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 import hmac
28 import hmac
29 import logging
29 import logging
30 import os
30 import os
31 import pprint
31 import pprint
32 import uuid
32 import uuid
33 from datetime import datetime
33 from datetime import datetime
34
34
35 try:
35 try:
36 import cPickle
36 import cPickle
37 pickle = cPickle
37 pickle = cPickle
38 except:
38 except:
39 cPickle = None
39 cPickle = None
40 import pickle
40 import pickle
41
41
42 import zmq
42 import zmq
43 from zmq.utils import jsonapi
43 from zmq.utils import jsonapi
44 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.ioloop import IOLoop
45 from zmq.eventloop.zmqstream import ZMQStream
45 from zmq.eventloop.zmqstream import ZMQStream
46
46
47 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils.importstring import import_item
48 from IPython.utils.importstring import import_item
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 DottedObjectName)
51 DottedObjectName)
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # utility functions
54 # utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 def squash_unicode(obj):
57 def squash_unicode(obj):
58 """coerce unicode back to bytestrings."""
58 """coerce unicode back to bytestrings."""
59 if isinstance(obj,dict):
59 if isinstance(obj,dict):
60 for key in obj.keys():
60 for key in obj.keys():
61 obj[key] = squash_unicode(obj[key])
61 obj[key] = squash_unicode(obj[key])
62 if isinstance(key, unicode):
62 if isinstance(key, unicode):
63 obj[squash_unicode(key)] = obj.pop(key)
63 obj[squash_unicode(key)] = obj.pop(key)
64 elif isinstance(obj, list):
64 elif isinstance(obj, list):
65 for i,v in enumerate(obj):
65 for i,v in enumerate(obj):
66 obj[i] = squash_unicode(v)
66 obj[i] = squash_unicode(v)
67 elif isinstance(obj, unicode):
67 elif isinstance(obj, unicode):
68 obj = obj.encode('utf8')
68 obj = obj.encode('utf8')
69 return obj
69 return obj
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # globals and defaults
72 # globals and defaults
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
77
77
78 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_packer = lambda o: pickle.dumps(o,-1)
79 pickle_unpacker = pickle.loads
79 pickle_unpacker = pickle.loads
80
80
81 default_packer = json_packer
81 default_packer = json_packer
82 default_unpacker = json_unpacker
82 default_unpacker = json_unpacker
83
83
84
84
85 DELIM=b"<IDS|MSG>"
85 DELIM=b"<IDS|MSG>"
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # Classes
88 # Classes
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91 class SessionFactory(LoggingConfigurable):
91 class SessionFactory(LoggingConfigurable):
92 """The Base class for configurables that have a Session, Context, logger,
92 """The Base class for configurables that have a Session, Context, logger,
93 and IOLoop.
93 and IOLoop.
94 """
94 """
95
95
96 logname = Unicode('')
96 logname = Unicode('')
97 def _logname_changed(self, name, old, new):
97 def _logname_changed(self, name, old, new):
98 self.log = logging.getLogger(new)
98 self.log = logging.getLogger(new)
99
99
100 # not configurable:
100 # not configurable:
101 context = Instance('zmq.Context')
101 context = Instance('zmq.Context')
102 def _context_default(self):
102 def _context_default(self):
103 return zmq.Context.instance()
103 return zmq.Context.instance()
104
104
105 session = Instance('IPython.zmq.session.Session')
105 session = Instance('IPython.zmq.session.Session')
106
106
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
108 def _loop_default(self):
108 def _loop_default(self):
109 return IOLoop.instance()
109 return IOLoop.instance()
110
110
111 def __init__(self, **kwargs):
111 def __init__(self, **kwargs):
112 super(SessionFactory, self).__init__(**kwargs)
112 super(SessionFactory, self).__init__(**kwargs)
113
113
114 if self.session is None:
114 if self.session is None:
115 # construct the session
115 # construct the session
116 self.session = Session(**kwargs)
116 self.session = Session(**kwargs)
117
117
118
118
119 class Message(object):
119 class Message(object):
120 """A simple message object that maps dict keys to attributes.
120 """A simple message object that maps dict keys to attributes.
121
121
122 A Message can be created from a dict and a dict from a Message instance
122 A Message can be created from a dict and a dict from a Message instance
123 simply by calling dict(msg_obj)."""
123 simply by calling dict(msg_obj)."""
124
124
125 def __init__(self, msg_dict):
125 def __init__(self, msg_dict):
126 dct = self.__dict__
126 dct = self.__dict__
127 for k, v in dict(msg_dict).iteritems():
127 for k, v in dict(msg_dict).iteritems():
128 if isinstance(v, dict):
128 if isinstance(v, dict):
129 v = Message(v)
129 v = Message(v)
130 dct[k] = v
130 dct[k] = v
131
131
132 # Having this iterator lets dict(msg_obj) work out of the box.
132 # Having this iterator lets dict(msg_obj) work out of the box.
133 def __iter__(self):
133 def __iter__(self):
134 return iter(self.__dict__.iteritems())
134 return iter(self.__dict__.iteritems())
135
135
136 def __repr__(self):
136 def __repr__(self):
137 return repr(self.__dict__)
137 return repr(self.__dict__)
138
138
139 def __str__(self):
139 def __str__(self):
140 return pprint.pformat(self.__dict__)
140 return pprint.pformat(self.__dict__)
141
141
142 def __contains__(self, k):
142 def __contains__(self, k):
143 return k in self.__dict__
143 return k in self.__dict__
144
144
145 def __getitem__(self, k):
145 def __getitem__(self, k):
146 return self.__dict__[k]
146 return self.__dict__[k]
147
147
148
148
149 def msg_header(msg_id, msg_type, username, session):
149 def msg_header(msg_id, msg_type, username, session):
150 date = datetime.now()
150 date = datetime.now()
151 return locals()
151 return locals()
152
152
153 def extract_header(msg_or_header):
153 def extract_header(msg_or_header):
154 """Given a message or header, return the header."""
154 """Given a message or header, return the header."""
155 if not msg_or_header:
155 if not msg_or_header:
156 return {}
156 return {}
157 try:
157 try:
158 # See if msg_or_header is the entire message.
158 # See if msg_or_header is the entire message.
159 h = msg_or_header['header']
159 h = msg_or_header['header']
160 except KeyError:
160 except KeyError:
161 try:
161 try:
162 # See if msg_or_header is just the header
162 # See if msg_or_header is just the header
163 h = msg_or_header['msg_id']
163 h = msg_or_header['msg_id']
164 except KeyError:
164 except KeyError:
165 raise
165 raise
166 else:
166 else:
167 h = msg_or_header
167 h = msg_or_header
168 if not isinstance(h, dict):
168 if not isinstance(h, dict):
169 h = dict(h)
169 h = dict(h)
170 return h
170 return h
171
171
172 class Session(Configurable):
172 class Session(Configurable):
173 """Object for handling serialization and sending of messages.
173 """Object for handling serialization and sending of messages.
174
174
175 The Session object handles building messages and sending them
175 The Session object handles building messages and sending them
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
177 other over the network via Session objects, and only need to work with the
177 other over the network via Session objects, and only need to work with the
178 dict-based IPython message spec. The Session will handle
178 dict-based IPython message spec. The Session will handle
179 serialization/deserialization, security, and metadata.
179 serialization/deserialization, security, and metadata.
180
180
181 Sessions support configurable serialiization via packer/unpacker traits,
181 Sessions support configurable serialiization via packer/unpacker traits,
182 and signing with HMAC digests via the key/keyfile traits.
182 and signing with HMAC digests via the key/keyfile traits.
183
183
184 Parameters
184 Parameters
185 ----------
185 ----------
186
186
187 debug : bool
187 debug : bool
188 whether to trigger extra debugging statements
188 whether to trigger extra debugging statements
189 packer/unpacker : str : 'json', 'pickle' or import_string
189 packer/unpacker : str : 'json', 'pickle' or import_string
190 importstrings for methods to serialize message parts. If just
190 importstrings for methods to serialize message parts. If just
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
192 Otherwise, the entire importstring must be used.
192 Otherwise, the entire importstring must be used.
193
193
194 The functions must accept at least valid JSON input, and output *bytes*.
194 The functions must accept at least valid JSON input, and output *bytes*.
195
195
196 For example, to use msgpack:
196 For example, to use msgpack:
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
198 pack/unpack : callables
198 pack/unpack : callables
199 You can also set the pack/unpack callables for serialization directly.
199 You can also set the pack/unpack callables for serialization directly.
200 session : bytes
200 session : bytes
201 the ID of this Session object. The default is to generate a new UUID.
201 the ID of this Session object. The default is to generate a new UUID.
202 username : unicode
202 username : unicode
203 username added to message headers. The default is to ask the OS.
203 username added to message headers. The default is to ask the OS.
204 key : bytes
204 key : bytes
205 The key used to initialize an HMAC signature. If unset, messages
205 The key used to initialize an HMAC signature. If unset, messages
206 will not be signed or checked.
206 will not be signed or checked.
207 keyfile : filepath
207 keyfile : filepath
208 The file containing a key. If this is set, `key` will be initialized
208 The file containing a key. If this is set, `key` will be initialized
209 to the contents of the file.
209 to the contents of the file.
210
210
211 """
211 """
212
212
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
214
214
215 packer = DottedObjectName('json',config=True,
215 packer = DottedObjectName('json',config=True,
216 help="""The name of the packer for serializing messages.
216 help="""The name of the packer for serializing messages.
217 Should be one of 'json', 'pickle', or an import name
217 Should be one of 'json', 'pickle', or an import name
218 for a custom callable serializer.""")
218 for a custom callable serializer.""")
219 def _packer_changed(self, name, old, new):
219 def _packer_changed(self, name, old, new):
220 if new.lower() == 'json':
220 if new.lower() == 'json':
221 self.pack = json_packer
221 self.pack = json_packer
222 self.unpack = json_unpacker
222 self.unpack = json_unpacker
223 elif new.lower() == 'pickle':
223 elif new.lower() == 'pickle':
224 self.pack = pickle_packer
224 self.pack = pickle_packer
225 self.unpack = pickle_unpacker
225 self.unpack = pickle_unpacker
226 else:
226 else:
227 self.pack = import_item(str(new))
227 self.pack = import_item(str(new))
228
228
229 unpacker = DottedObjectName('json', config=True,
229 unpacker = DottedObjectName('json', config=True,
230 help="""The name of the unpacker for unserializing messages.
230 help="""The name of the unpacker for unserializing messages.
231 Only used with custom functions for `packer`.""")
231 Only used with custom functions for `packer`.""")
232 def _unpacker_changed(self, name, old, new):
232 def _unpacker_changed(self, name, old, new):
233 if new.lower() == 'json':
233 if new.lower() == 'json':
234 self.pack = json_packer
234 self.pack = json_packer
235 self.unpack = json_unpacker
235 self.unpack = json_unpacker
236 elif new.lower() == 'pickle':
236 elif new.lower() == 'pickle':
237 self.pack = pickle_packer
237 self.pack = pickle_packer
238 self.unpack = pickle_unpacker
238 self.unpack = pickle_unpacker
239 else:
239 else:
240 self.unpack = import_item(str(new))
240 self.unpack = import_item(str(new))
241
241
242 session = CBytes(b'', config=True,
242 session = CBytes(b'', config=True,
243 help="""The UUID identifying this session.""")
243 help="""The UUID identifying this session.""")
244 def _session_default(self):
244 def _session_default(self):
245 return bytes(uuid.uuid4())
245 return bytes(uuid.uuid4())
246
246
247 username = Unicode(os.environ.get('USER',u'username'), config=True,
247 username = Unicode(os.environ.get('USER',u'username'), config=True,
248 help="""Username for the Session. Default is your system username.""")
248 help="""Username for the Session. Default is your system username.""")
249
249
250 # message signature related traits:
250 # message signature related traits:
251 key = CBytes(b'', config=True,
251 key = CBytes(b'', config=True,
252 help="""execution key, for extra authentication.""")
252 help="""execution key, for extra authentication.""")
253 def _key_changed(self, name, old, new):
253 def _key_changed(self, name, old, new):
254 if new:
254 if new:
255 self.auth = hmac.HMAC(new)
255 self.auth = hmac.HMAC(new)
256 else:
256 else:
257 self.auth = None
257 self.auth = None
258 auth = Instance(hmac.HMAC)
258 auth = Instance(hmac.HMAC)
259 digest_history = Set()
259 digest_history = Set()
260
260
261 keyfile = Unicode('', config=True,
261 keyfile = Unicode('', config=True,
262 help="""path to file containing execution key.""")
262 help="""path to file containing execution key.""")
263 def _keyfile_changed(self, name, old, new):
263 def _keyfile_changed(self, name, old, new):
264 with open(new, 'rb') as f:
264 with open(new, 'rb') as f:
265 self.key = f.read().strip()
265 self.key = f.read().strip()
266
266
267 pack = Any(default_packer) # the actual packer function
267 pack = Any(default_packer) # the actual packer function
268 def _pack_changed(self, name, old, new):
268 def _pack_changed(self, name, old, new):
269 if not callable(new):
269 if not callable(new):
270 raise TypeError("packer must be callable, not %s"%type(new))
270 raise TypeError("packer must be callable, not %s"%type(new))
271
271
272 unpack = Any(default_unpacker) # the actual packer function
272 unpack = Any(default_unpacker) # the actual packer function
273 def _unpack_changed(self, name, old, new):
273 def _unpack_changed(self, name, old, new):
274 # unpacker is not checked - it is assumed to be
274 # unpacker is not checked - it is assumed to be
275 if not callable(new):
275 if not callable(new):
276 raise TypeError("unpacker must be callable, not %s"%type(new))
276 raise TypeError("unpacker must be callable, not %s"%type(new))
277
277
278 def __init__(self, **kwargs):
278 def __init__(self, **kwargs):
279 """create a Session object
279 """create a Session object
280
280
281 Parameters
281 Parameters
282 ----------
282 ----------
283
283
284 debug : bool
284 debug : bool
285 whether to trigger extra debugging statements
285 whether to trigger extra debugging statements
286 packer/unpacker : str : 'json', 'pickle' or import_string
286 packer/unpacker : str : 'json', 'pickle' or import_string
287 importstrings for methods to serialize message parts. If just
287 importstrings for methods to serialize message parts. If just
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
289 Otherwise, the entire importstring must be used.
289 Otherwise, the entire importstring must be used.
290
290
291 The functions must accept at least valid JSON input, and output
291 The functions must accept at least valid JSON input, and output
292 *bytes*.
292 *bytes*.
293
293
294 For example, to use msgpack:
294 For example, to use msgpack:
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
296 pack/unpack : callables
296 pack/unpack : callables
297 You can also set the pack/unpack callables for serialization
297 You can also set the pack/unpack callables for serialization
298 directly.
298 directly.
299 session : bytes
299 session : bytes
300 the ID of this Session object. The default is to generate a new
300 the ID of this Session object. The default is to generate a new
301 UUID.
301 UUID.
302 username : unicode
302 username : unicode
303 username added to message headers. The default is to ask the OS.
303 username added to message headers. The default is to ask the OS.
304 key : bytes
304 key : bytes
305 The key used to initialize an HMAC signature. If unset, messages
305 The key used to initialize an HMAC signature. If unset, messages
306 will not be signed or checked.
306 will not be signed or checked.
307 keyfile : filepath
307 keyfile : filepath
308 The file containing a key. If this is set, `key` will be
308 The file containing a key. If this is set, `key` will be
309 initialized to the contents of the file.
309 initialized to the contents of the file.
310 """
310 """
311 super(Session, self).__init__(**kwargs)
311 super(Session, self).__init__(**kwargs)
312 self._check_packers()
312 self._check_packers()
313 self.none = self.pack({})
313 self.none = self.pack({})
314
314
315 @property
315 @property
316 def msg_id(self):
316 def msg_id(self):
317 """always return new uuid"""
317 """always return new uuid"""
318 return str(uuid.uuid4())
318 return str(uuid.uuid4())
319
319
320 def _check_packers(self):
320 def _check_packers(self):
321 """check packers for binary data and datetime support."""
321 """check packers for binary data and datetime support."""
322 pack = self.pack
322 pack = self.pack
323 unpack = self.unpack
323 unpack = self.unpack
324
324
325 # check simple serialization
325 # check simple serialization
326 msg = dict(a=[1,'hi'])
326 msg = dict(a=[1,'hi'])
327 try:
327 try:
328 packed = pack(msg)
328 packed = pack(msg)
329 except Exception:
329 except Exception:
330 raise ValueError("packer could not serialize a simple message")
330 raise ValueError("packer could not serialize a simple message")
331
331
332 # ensure packed message is bytes
332 # ensure packed message is bytes
333 if not isinstance(packed, bytes):
333 if not isinstance(packed, bytes):
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
335
335
336 # check that unpack is pack's inverse
336 # check that unpack is pack's inverse
337 try:
337 try:
338 unpacked = unpack(packed)
338 unpacked = unpack(packed)
339 except Exception:
339 except Exception:
340 raise ValueError("unpacker could not handle the packer's output")
340 raise ValueError("unpacker could not handle the packer's output")
341
341
342 # check datetime support
342 # check datetime support
343 msg = dict(t=datetime.now())
343 msg = dict(t=datetime.now())
344 try:
344 try:
345 unpacked = unpack(pack(msg))
345 unpacked = unpack(pack(msg))
346 except Exception:
346 except Exception:
347 self.pack = lambda o: pack(squash_dates(o))
347 self.pack = lambda o: pack(squash_dates(o))
348 self.unpack = lambda s: extract_dates(unpack(s))
348 self.unpack = lambda s: extract_dates(unpack(s))
349
349
350 def msg_header(self, msg_type):
350 def msg_header(self, msg_type):
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
352
352
353 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
353 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
354 """Return the nested message dict.
354 """Return the nested message dict.
355
355
356 This format is different from what is sent over the wire. The
356 This format is different from what is sent over the wire. The
357 serialize/unserialize methods converts this nested message dict to the wire
357 serialize/unserialize methods converts this nested message dict to the wire
358 format, which is a list of message parts.
358 format, which is a list of message parts.
359 """
359 """
360 msg = {}
360 msg = {}
361 msg['header'] = self.msg_header(msg_type) if header is None else header
361 msg['header'] = self.msg_header(msg_type) if header is None else header
362 msg['parent_header'] = {} if parent is None else extract_header(parent)
362 msg['parent_header'] = {} if parent is None else extract_header(parent)
363 msg['content'] = {} if content is None else content
363 msg['content'] = {} if content is None else content
364 sub = {} if subheader is None else subheader
364 sub = {} if subheader is None else subheader
365 msg['header'].update(sub)
365 msg['header'].update(sub)
366 return msg
366 return msg
367
367
368 def sign(self, msg_list):
368 def sign(self, msg_list):
369 """Sign a message with HMAC digest. If no auth, return b''.
369 """Sign a message with HMAC digest. If no auth, return b''.
370
370
371 Parameters
371 Parameters
372 ----------
372 ----------
373 msg_list : list
373 msg_list : list
374 The [p_header,p_parent,p_content] part of the message list.
374 The [p_header,p_parent,p_content] part of the message list.
375 """
375 """
376 if self.auth is None:
376 if self.auth is None:
377 return b''
377 return b''
378 h = self.auth.copy()
378 h = self.auth.copy()
379 for m in msg_list:
379 for m in msg_list:
380 h.update(m)
380 h.update(m)
381 return h.hexdigest()
381 return h.hexdigest()
382
382
383 def serialize(self, msg, ident=None):
383 def serialize(self, msg, ident=None):
384 """Serialize the message components to bytes.
384 """Serialize the message components to bytes.
385
385
386 This is roughly the inverse of unserialize. The serialize/unserialize
386 This is roughly the inverse of unserialize. The serialize/unserialize
387 methods work with full message lists, whereas pack/unpack work with
387 methods work with full message lists, whereas pack/unpack work with
388 the individual message parts in the message list.
388 the individual message parts in the message list.
389
389
390 Parameters
390 Parameters
391 ----------
391 ----------
392 msg : dict or Message
392 msg : dict or Message
393 The nexted message dict as returned by the self.msg method.
393 The nexted message dict as returned by the self.msg method.
394
394
395 Returns
395 Returns
396 -------
396 -------
397 msg_list : list
397 msg_list : list
398 The list of bytes objects to be sent with the format:
398 The list of bytes objects to be sent with the format:
399 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
399 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
400 buffer1,buffer2,...]. In this list, the p_* entities are
400 buffer1,buffer2,...]. In this list, the p_* entities are
401 the packed or serialized versions, so if JSON is used, these
401 the packed or serialized versions, so if JSON is used, these
402 are uft8 encoded JSON strings.
402 are uft8 encoded JSON strings.
403 """
403 """
404 content = msg.get('content', {})
404 content = msg.get('content', {})
405 if content is None:
405 if content is None:
406 content = self.none
406 content = self.none
407 elif isinstance(content, dict):
407 elif isinstance(content, dict):
408 content = self.pack(content)
408 content = self.pack(content)
409 elif isinstance(content, bytes):
409 elif isinstance(content, bytes):
410 # content is already packed, as in a relayed message
410 # content is already packed, as in a relayed message
411 pass
411 pass
412 elif isinstance(content, unicode):
412 elif isinstance(content, unicode):
413 # should be bytes, but JSON often spits out unicode
413 # should be bytes, but JSON often spits out unicode
414 content = content.encode('utf8')
414 content = content.encode('utf8')
415 else:
415 else:
416 raise TypeError("Content incorrect type: %s"%type(content))
416 raise TypeError("Content incorrect type: %s"%type(content))
417
417
418 real_message = [self.pack(msg['header']),
418 real_message = [self.pack(msg['header']),
419 self.pack(msg['parent_header']),
419 self.pack(msg['parent_header']),
420 content
420 content
421 ]
421 ]
422
422
423 to_send = []
423 to_send = []
424
424
425 if isinstance(ident, list):
425 if isinstance(ident, list):
426 # accept list of idents
426 # accept list of idents
427 to_send.extend(ident)
427 to_send.extend(ident)
428 elif ident is not None:
428 elif ident is not None:
429 to_send.append(ident)
429 to_send.append(ident)
430 to_send.append(DELIM)
430 to_send.append(DELIM)
431
431
432 signature = self.sign(real_message)
432 signature = self.sign(real_message)
433 to_send.append(signature)
433 to_send.append(signature)
434
434
435 to_send.extend(real_message)
435 to_send.extend(real_message)
436
436
437 return to_send
437 return to_send
438
438
439 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
439 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
440 buffers=None, subheader=None, track=False, header=None):
440 buffers=None, subheader=None, track=False, header=None):
441 """Build and send a message via stream or socket.
441 """Build and send a message via stream or socket.
442
442
443 The message format used by this function internally is as follows:
443 The message format used by this function internally is as follows:
444
444
445 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
445 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
446 buffer1,buffer2,...]
446 buffer1,buffer2,...]
447
447
448 The serialize/unserialize methods convert the nested message dict into this
448 The serialize/unserialize methods convert the nested message dict into this
449 format.
449 format.
450
450
451 Parameters
451 Parameters
452 ----------
452 ----------
453
453
454 stream : zmq.Socket or ZMQStream
454 stream : zmq.Socket or ZMQStream
455 The socket-like object used to send the data.
455 The socket-like object used to send the data.
456 msg_or_type : str or Message/dict
456 msg_or_type : str or Message/dict
457 Normally, msg_or_type will be a msg_type unless a message is being
457 Normally, msg_or_type will be a msg_type unless a message is being
458 sent more than once. If a header is supplied, this can be set to
458 sent more than once. If a header is supplied, this can be set to
459 None and the msg_type will be pulled from the header.
459 None and the msg_type will be pulled from the header.
460
460
461 content : dict or None
461 content : dict or None
462 The content of the message (ignored if msg_or_type is a message).
462 The content of the message (ignored if msg_or_type is a message).
463 header : dict or None
463 header : dict or None
464 The header dict for the message (ignores if msg_to_type is a message).
464 The header dict for the message (ignores if msg_to_type is a message).
465 parent : Message or dict or None
465 parent : Message or dict or None
466 The parent or parent header describing the parent of this message
466 The parent or parent header describing the parent of this message
467 (ignored if msg_or_type is a message).
467 (ignored if msg_or_type is a message).
468 ident : bytes or list of bytes
468 ident : bytes or list of bytes
469 The zmq.IDENTITY routing path.
469 The zmq.IDENTITY routing path.
470 subheader : dict or None
470 subheader : dict or None
471 Extra header keys for this message's header (ignored if msg_or_type
471 Extra header keys for this message's header (ignored if msg_or_type
472 is a message).
472 is a message).
473 buffers : list or None
473 buffers : list or None
474 The already-serialized buffers to be appended to the message.
474 The already-serialized buffers to be appended to the message.
475 track : bool
475 track : bool
476 Whether to track. Only for use with Sockets, because ZMQStream
476 Whether to track. Only for use with Sockets, because ZMQStream
477 objects cannot track messages.
477 objects cannot track messages.
478
478
479 Returns
479 Returns
480 -------
480 -------
481 msg : dict
481 msg : dict
482 The constructed message.
482 The constructed message.
483 (msg,tracker) : (dict, MessageTracker)
483 (msg,tracker) : (dict, MessageTracker)
484 if track=True, then a 2-tuple will be returned,
484 if track=True, then a 2-tuple will be returned,
485 the first element being the constructed
485 the first element being the constructed
486 message, and the second being the MessageTracker
486 message, and the second being the MessageTracker
487
487
488 """
488 """
489
489
490 if not isinstance(stream, (zmq.Socket, ZMQStream)):
490 if not isinstance(stream, (zmq.Socket, ZMQStream)):
491 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
491 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
492 elif track and isinstance(stream, ZMQStream):
492 elif track and isinstance(stream, ZMQStream):
493 raise TypeError("ZMQStream cannot track messages")
493 raise TypeError("ZMQStream cannot track messages")
494
494
495 if isinstance(msg_or_type, (Message, dict)):
495 if isinstance(msg_or_type, (Message, dict)):
496 # We got a Message or message dict, not a msg_type so don't
496 # We got a Message or message dict, not a msg_type so don't
497 # build a new Message.
497 # build a new Message.
498 msg = msg_or_type
498 msg = msg_or_type
499 else:
499 else:
500 msg = self.msg(msg_or_type, content=content, parent=parent,
500 msg = self.msg(msg_or_type, content=content, parent=parent,
501 subheader=subheader, header=header)
501 subheader=subheader, header=header)
502
502
503 buffers = [] if buffers is None else buffers
503 buffers = [] if buffers is None else buffers
504 to_send = self.serialize(msg, ident)
504 to_send = self.serialize(msg, ident)
505 flag = 0
505 flag = 0
506 if buffers:
506 if buffers:
507 flag = zmq.SNDMORE
507 flag = zmq.SNDMORE
508 _track = False
508 _track = False
509 else:
509 else:
510 _track=track
510 _track=track
511 if track:
511 if track:
512 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
512 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
513 else:
513 else:
514 tracker = stream.send_multipart(to_send, flag, copy=False)
514 tracker = stream.send_multipart(to_send, flag, copy=False)
515 for b in buffers[:-1]:
515 for b in buffers[:-1]:
516 stream.send(b, flag, copy=False)
516 stream.send(b, flag, copy=False)
517 if buffers:
517 if buffers:
518 if track:
518 if track:
519 tracker = stream.send(buffers[-1], copy=False, track=track)
519 tracker = stream.send(buffers[-1], copy=False, track=track)
520 else:
520 else:
521 tracker = stream.send(buffers[-1], copy=False)
521 tracker = stream.send(buffers[-1], copy=False)
522
522
523 # omsg = Message(msg)
523 # omsg = Message(msg)
524 if self.debug:
524 if self.debug:
525 pprint.pprint(msg)
525 pprint.pprint(msg)
526 pprint.pprint(to_send)
526 pprint.pprint(to_send)
527 pprint.pprint(buffers)
527 pprint.pprint(buffers)
528
528
529 msg['tracker'] = tracker
529 msg['tracker'] = tracker
530
530
531 return msg
531 return msg
532
532
533 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
533 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
534 """Send a raw message via ident path.
534 """Send a raw message via ident path.
535
535
536 This method is used to send a already serialized message.
536 This method is used to send a already serialized message.
537
537
538 Parameters
538 Parameters
539 ----------
539 ----------
540 stream : ZMQStream or Socket
540 stream : ZMQStream or Socket
541 The ZMQ stream or socket to use for sending the message.
541 The ZMQ stream or socket to use for sending the message.
542 msg_list : list
542 msg_list : list
543 The serialized list of messages to send. This only includes the
543 The serialized list of messages to send. This only includes the
544 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
544 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
545 the message.
545 the message.
546 ident : ident or list
546 ident : ident or list
547 A single ident or a list of idents to use in sending.
547 A single ident or a list of idents to use in sending.
548 """
548 """
549 to_send = []
549 to_send = []
550 if isinstance(ident, bytes):
550 if isinstance(ident, bytes):
551 ident = [ident]
551 ident = [ident]
552 if ident is not None:
552 if ident is not None:
553 to_send.extend(ident)
553 to_send.extend(ident)
554
554
555 to_send.append(DELIM)
555 to_send.append(DELIM)
556 to_send.append(self.sign(msg_list))
556 to_send.append(self.sign(msg_list))
557 to_send.extend(msg_list)
557 to_send.extend(msg_list)
558 stream.send_multipart(msg_list, flags, copy=copy)
558 stream.send_multipart(msg_list, flags, copy=copy)
559
559
560 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
560 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
561 """Receive and unpack a message.
561 """Receive and unpack a message.
562
562
563 Parameters
563 Parameters
564 ----------
564 ----------
565 socket : ZMQStream or Socket
565 socket : ZMQStream or Socket
566 The socket or stream to use in receiving.
566 The socket or stream to use in receiving.
567
567
568 Returns
568 Returns
569 -------
569 -------
570 [idents], msg
570 [idents], msg
571 [idents] is a list of idents and msg is a nested message dict of
571 [idents] is a list of idents and msg is a nested message dict of
572 same format as self.msg returns.
572 same format as self.msg returns.
573 """
573 """
574 if isinstance(socket, ZMQStream):
574 if isinstance(socket, ZMQStream):
575 socket = socket.socket
575 socket = socket.socket
576 try:
576 try:
577 msg_list = socket.recv_multipart(mode)
577 msg_list = socket.recv_multipart(mode)
578 except zmq.ZMQError as e:
578 except zmq.ZMQError as e:
579 if e.errno == zmq.EAGAIN:
579 if e.errno == zmq.EAGAIN:
580 # We can convert EAGAIN to None as we know in this case
580 # We can convert EAGAIN to None as we know in this case
581 # recv_multipart won't return None.
581 # recv_multipart won't return None.
582 return None,None
582 return None,None
583 else:
583 else:
584 raise
584 raise
585 # split multipart message into identity list and message dict
585 # split multipart message into identity list and message dict
586 # invalid large messages can cause very expensive string comparisons
586 # invalid large messages can cause very expensive string comparisons
587 idents, msg_list = self.feed_identities(msg_list, copy)
587 idents, msg_list = self.feed_identities(msg_list, copy)
588 try:
588 try:
589 return idents, self.unserialize(msg_list, content=content, copy=copy)
589 return idents, self.unserialize(msg_list, content=content, copy=copy)
590 except Exception as e:
590 except Exception as e:
591 print (idents, msg_list)
592 # TODO: handle it
591 # TODO: handle it
593 raise e
592 raise e
594
593
595 def feed_identities(self, msg_list, copy=True):
594 def feed_identities(self, msg_list, copy=True):
596 """Split the identities from the rest of the message.
595 """Split the identities from the rest of the message.
597
596
598 Feed until DELIM is reached, then return the prefix as idents and
597 Feed until DELIM is reached, then return the prefix as idents and
599 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
598 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
600 but that would be silly.
599 but that would be silly.
601
600
602 Parameters
601 Parameters
603 ----------
602 ----------
604 msg_list : a list of Message or bytes objects
603 msg_list : a list of Message or bytes objects
605 The message to be split.
604 The message to be split.
606 copy : bool
605 copy : bool
607 flag determining whether the arguments are bytes or Messages
606 flag determining whether the arguments are bytes or Messages
608
607
609 Returns
608 Returns
610 -------
609 -------
611 (idents, msg_list) : two lists
610 (idents, msg_list) : two lists
612 idents will always be a list of bytes, each of which is a ZMQ
611 idents will always be a list of bytes, each of which is a ZMQ
613 identity. msg_list will be a list of bytes or zmq.Messages of the
612 identity. msg_list will be a list of bytes or zmq.Messages of the
614 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
613 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
615 should be unpackable/unserializable via self.unserialize at this
614 should be unpackable/unserializable via self.unserialize at this
616 point.
615 point.
617 """
616 """
618 if copy:
617 if copy:
619 idx = msg_list.index(DELIM)
618 idx = msg_list.index(DELIM)
620 return msg_list[:idx], msg_list[idx+1:]
619 return msg_list[:idx], msg_list[idx+1:]
621 else:
620 else:
622 failed = True
621 failed = True
623 for idx,m in enumerate(msg_list):
622 for idx,m in enumerate(msg_list):
624 if m.bytes == DELIM:
623 if m.bytes == DELIM:
625 failed = False
624 failed = False
626 break
625 break
627 if failed:
626 if failed:
628 raise ValueError("DELIM not in msg_list")
627 raise ValueError("DELIM not in msg_list")
629 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
628 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
630 return [m.bytes for m in idents], msg_list
629 return [m.bytes for m in idents], msg_list
631
630
632 def unserialize(self, msg_list, content=True, copy=True):
631 def unserialize(self, msg_list, content=True, copy=True):
633 """Unserialize a msg_list to a nested message dict.
632 """Unserialize a msg_list to a nested message dict.
634
633
635 This is roughly the inverse of serialize. The serialize/unserialize
634 This is roughly the inverse of serialize. The serialize/unserialize
636 methods work with full message lists, whereas pack/unpack work with
635 methods work with full message lists, whereas pack/unpack work with
637 the individual message parts in the message list.
636 the individual message parts in the message list.
638
637
639 Parameters:
638 Parameters:
640 -----------
639 -----------
641 msg_list : list of bytes or Message objects
640 msg_list : list of bytes or Message objects
642 The list of message parts of the form [HMAC,p_header,p_parent,
641 The list of message parts of the form [HMAC,p_header,p_parent,
643 p_content,buffer1,buffer2,...].
642 p_content,buffer1,buffer2,...].
644 content : bool (True)
643 content : bool (True)
645 Whether to unpack the content dict (True), or leave it packed
644 Whether to unpack the content dict (True), or leave it packed
646 (False).
645 (False).
647 copy : bool (True)
646 copy : bool (True)
648 Whether to return the bytes (True), or the non-copying Message
647 Whether to return the bytes (True), or the non-copying Message
649 object in each place (False).
648 object in each place (False).
650
649
651 Returns
650 Returns
652 -------
651 -------
653 msg : dict
652 msg : dict
654 The nested message dict with top-level keys [header, parent_header,
653 The nested message dict with top-level keys [header, parent_header,
655 content, buffers].
654 content, buffers].
656 """
655 """
657 minlen = 4
656 minlen = 4
658 message = {}
657 message = {}
659 if not copy:
658 if not copy:
660 for i in range(minlen):
659 for i in range(minlen):
661 msg_list[i] = msg_list[i].bytes
660 msg_list[i] = msg_list[i].bytes
662 if self.auth is not None:
661 if self.auth is not None:
663 signature = msg_list[0]
662 signature = msg_list[0]
663 if not signature:
664 raise ValueError("Unsigned Message")
664 if signature in self.digest_history:
665 if signature in self.digest_history:
665 raise ValueError("Duplicate Signature: %r"%signature)
666 raise ValueError("Duplicate Signature: %r"%signature)
666 self.digest_history.add(signature)
667 self.digest_history.add(signature)
667 check = self.sign(msg_list[1:4])
668 check = self.sign(msg_list[1:4])
668 if not signature == check:
669 if not signature == check:
669 raise ValueError("Invalid Signature: %r"%signature)
670 raise ValueError("Invalid Signature: %r"%signature)
670 if not len(msg_list) >= minlen:
671 if not len(msg_list) >= minlen:
671 raise TypeError("malformed message, must have at least %i elements"%minlen)
672 raise TypeError("malformed message, must have at least %i elements"%minlen)
672 message['header'] = self.unpack(msg_list[1])
673 message['header'] = self.unpack(msg_list[1])
673 message['parent_header'] = self.unpack(msg_list[2])
674 message['parent_header'] = self.unpack(msg_list[2])
674 if content:
675 if content:
675 message['content'] = self.unpack(msg_list[3])
676 message['content'] = self.unpack(msg_list[3])
676 else:
677 else:
677 message['content'] = msg_list[3]
678 message['content'] = msg_list[3]
678
679
679 message['buffers'] = msg_list[4:]
680 message['buffers'] = msg_list[4:]
680 return message
681 return message
681
682
682 def test_msg2obj():
683 def test_msg2obj():
683 am = dict(x=1)
684 am = dict(x=1)
684 ao = Message(am)
685 ao = Message(am)
685 assert ao.x == am['x']
686 assert ao.x == am['x']
686
687
687 am['y'] = dict(z=1)
688 am['y'] = dict(z=1)
688 ao = Message(am)
689 ao = Message(am)
689 assert ao.y.z == am['y']['z']
690 assert ao.y.z == am['y']['z']
690
691
691 k1, k2 = 'y', 'z'
692 k1, k2 = 'y', 'z'
692 assert ao[k1][k2] == am[k1][k2]
693 assert ao[k1][k2] == am[k1][k2]
693
694
694 am2 = dict(ao)
695 am2 = dict(ao)
695 assert am['x'] == am2['x']
696 assert am['x'] == am2['x']
696 assert am['y']['z'] == am2['y']['z']
697 assert am['y']['z'] == am2['y']['z']
697
698
General Comments 0
You need to be logged in to leave comments. Login now