##// END OF EJS Templates
Spelling correction.
Thomas Kluyver -
Show More
@@ -1,703 +1,703 b''
1 """Session object for building, serializing, sending, and receiving messages in
1 """Session object for building, serializing, sending, and receiving messages in
2 IPython. The Session object supports serialization, HMAC signatures, and
2 IPython. The Session object supports serialization, HMAC signatures, and
3 metadata on messages.
3 metadata on messages.
4
4
5 Also defined here are utilities for working with Sessions:
5 Also defined here are utilities for working with Sessions:
6 * A SessionFactory to be used as a base class for configurables that work with
6 * A SessionFactory to be used as a base class for configurables that work with
7 Sessions.
7 Sessions.
8 * A Message object for convenience that allows attribute-access to the msg dict.
8 * A Message object for convenience that allows attribute-access to the msg dict.
9
9
10 Authors:
10 Authors:
11
11
12 * Min RK
12 * Min RK
13 * Brian Granger
13 * Brian Granger
14 * Fernando Perez
14 * Fernando Perez
15 """
15 """
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Copyright (C) 2010-2011 The IPython Development Team
17 # Copyright (C) 2010-2011 The IPython Development Team
18 #
18 #
19 # Distributed under the terms of the BSD License. The full license is in
19 # Distributed under the terms of the BSD License. The full license is in
20 # the file COPYING, distributed as part of this software.
20 # the file COPYING, distributed as part of this software.
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
24 # Imports
24 # Imports
25 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
26
26
27 import hmac
27 import hmac
28 import logging
28 import logging
29 import os
29 import os
30 import pprint
30 import pprint
31 import uuid
31 import uuid
32 from datetime import datetime
32 from datetime import datetime
33
33
34 try:
34 try:
35 import cPickle
35 import cPickle
36 pickle = cPickle
36 pickle = cPickle
37 except:
37 except:
38 cPickle = None
38 cPickle = None
39 import pickle
39 import pickle
40
40
41 import zmq
41 import zmq
42 from zmq.utils import jsonapi
42 from zmq.utils import jsonapi
43 from zmq.eventloop.ioloop import IOLoop
43 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.zmqstream import ZMQStream
44 from zmq.eventloop.zmqstream import ZMQStream
45
45
46 from IPython.config.configurable import Configurable, LoggingConfigurable
46 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.utils.importstring import import_item
47 from IPython.utils.importstring import import_item
48 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
48 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
49 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 DottedObjectName)
50 DottedObjectName)
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # utility functions
53 # utility functions
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 def squash_unicode(obj):
56 def squash_unicode(obj):
57 """coerce unicode back to bytestrings."""
57 """coerce unicode back to bytestrings."""
58 if isinstance(obj,dict):
58 if isinstance(obj,dict):
59 for key in obj.keys():
59 for key in obj.keys():
60 obj[key] = squash_unicode(obj[key])
60 obj[key] = squash_unicode(obj[key])
61 if isinstance(key, unicode):
61 if isinstance(key, unicode):
62 obj[squash_unicode(key)] = obj.pop(key)
62 obj[squash_unicode(key)] = obj.pop(key)
63 elif isinstance(obj, list):
63 elif isinstance(obj, list):
64 for i,v in enumerate(obj):
64 for i,v in enumerate(obj):
65 obj[i] = squash_unicode(v)
65 obj[i] = squash_unicode(v)
66 elif isinstance(obj, unicode):
66 elif isinstance(obj, unicode):
67 obj = obj.encode('utf8')
67 obj = obj.encode('utf8')
68 return obj
68 return obj
69
69
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 # globals and defaults
71 # globals and defaults
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
73 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
74 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
75 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76
76
77 pickle_packer = lambda o: pickle.dumps(o,-1)
77 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_unpacker = pickle.loads
78 pickle_unpacker = pickle.loads
79
79
80 default_packer = json_packer
80 default_packer = json_packer
81 default_unpacker = json_unpacker
81 default_unpacker = json_unpacker
82
82
83
83
84 DELIM=b"<IDS|MSG>"
84 DELIM=b"<IDS|MSG>"
85
85
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 # Classes
87 # Classes
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89
89
90 class SessionFactory(LoggingConfigurable):
90 class SessionFactory(LoggingConfigurable):
91 """The Base class for configurables that have a Session, Context, logger,
91 """The Base class for configurables that have a Session, Context, logger,
92 and IOLoop.
92 and IOLoop.
93 """
93 """
94
94
95 logname = Unicode('')
95 logname = Unicode('')
96 def _logname_changed(self, name, old, new):
96 def _logname_changed(self, name, old, new):
97 self.log = logging.getLogger(new)
97 self.log = logging.getLogger(new)
98
98
99 # not configurable:
99 # not configurable:
100 context = Instance('zmq.Context')
100 context = Instance('zmq.Context')
101 def _context_default(self):
101 def _context_default(self):
102 return zmq.Context.instance()
102 return zmq.Context.instance()
103
103
104 session = Instance('IPython.zmq.session.Session')
104 session = Instance('IPython.zmq.session.Session')
105
105
106 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
106 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 def _loop_default(self):
107 def _loop_default(self):
108 return IOLoop.instance()
108 return IOLoop.instance()
109
109
110 def __init__(self, **kwargs):
110 def __init__(self, **kwargs):
111 super(SessionFactory, self).__init__(**kwargs)
111 super(SessionFactory, self).__init__(**kwargs)
112
112
113 if self.session is None:
113 if self.session is None:
114 # construct the session
114 # construct the session
115 self.session = Session(**kwargs)
115 self.session = Session(**kwargs)
116
116
117
117
118 class Message(object):
118 class Message(object):
119 """A simple message object that maps dict keys to attributes.
119 """A simple message object that maps dict keys to attributes.
120
120
121 A Message can be created from a dict and a dict from a Message instance
121 A Message can be created from a dict and a dict from a Message instance
122 simply by calling dict(msg_obj)."""
122 simply by calling dict(msg_obj)."""
123
123
124 def __init__(self, msg_dict):
124 def __init__(self, msg_dict):
125 dct = self.__dict__
125 dct = self.__dict__
126 for k, v in dict(msg_dict).iteritems():
126 for k, v in dict(msg_dict).iteritems():
127 if isinstance(v, dict):
127 if isinstance(v, dict):
128 v = Message(v)
128 v = Message(v)
129 dct[k] = v
129 dct[k] = v
130
130
131 # Having this iterator lets dict(msg_obj) work out of the box.
131 # Having this iterator lets dict(msg_obj) work out of the box.
132 def __iter__(self):
132 def __iter__(self):
133 return iter(self.__dict__.iteritems())
133 return iter(self.__dict__.iteritems())
134
134
135 def __repr__(self):
135 def __repr__(self):
136 return repr(self.__dict__)
136 return repr(self.__dict__)
137
137
138 def __str__(self):
138 def __str__(self):
139 return pprint.pformat(self.__dict__)
139 return pprint.pformat(self.__dict__)
140
140
141 def __contains__(self, k):
141 def __contains__(self, k):
142 return k in self.__dict__
142 return k in self.__dict__
143
143
144 def __getitem__(self, k):
144 def __getitem__(self, k):
145 return self.__dict__[k]
145 return self.__dict__[k]
146
146
147
147
148 def msg_header(msg_id, msg_type, username, session):
148 def msg_header(msg_id, msg_type, username, session):
149 date = datetime.now()
149 date = datetime.now()
150 return locals()
150 return locals()
151
151
152 def extract_header(msg_or_header):
152 def extract_header(msg_or_header):
153 """Given a message or header, return the header."""
153 """Given a message or header, return the header."""
154 if not msg_or_header:
154 if not msg_or_header:
155 return {}
155 return {}
156 try:
156 try:
157 # See if msg_or_header is the entire message.
157 # See if msg_or_header is the entire message.
158 h = msg_or_header['header']
158 h = msg_or_header['header']
159 except KeyError:
159 except KeyError:
160 try:
160 try:
161 # See if msg_or_header is just the header
161 # See if msg_or_header is just the header
162 h = msg_or_header['msg_id']
162 h = msg_or_header['msg_id']
163 except KeyError:
163 except KeyError:
164 raise
164 raise
165 else:
165 else:
166 h = msg_or_header
166 h = msg_or_header
167 if not isinstance(h, dict):
167 if not isinstance(h, dict):
168 h = dict(h)
168 h = dict(h)
169 return h
169 return h
170
170
171 class Session(Configurable):
171 class Session(Configurable):
172 """Object for handling serialization and sending of messages.
172 """Object for handling serialization and sending of messages.
173
173
174 The Session object handles building messages and sending them
174 The Session object handles building messages and sending them
175 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
175 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 other over the network via Session objects, and only need to work with the
176 other over the network via Session objects, and only need to work with the
177 dict-based IPython message spec. The Session will handle
177 dict-based IPython message spec. The Session will handle
178 serialization/deserialization, security, and metadata.
178 serialization/deserialization, security, and metadata.
179
179
180 Sessions support configurable serialiization via packer/unpacker traits,
180 Sessions support configurable serialiization via packer/unpacker traits,
181 and signing with HMAC digests via the key/keyfile traits.
181 and signing with HMAC digests via the key/keyfile traits.
182
182
183 Parameters
183 Parameters
184 ----------
184 ----------
185
185
186 debug : bool
186 debug : bool
187 whether to trigger extra debugging statements
187 whether to trigger extra debugging statements
188 packer/unpacker : str : 'json', 'pickle' or import_string
188 packer/unpacker : str : 'json', 'pickle' or import_string
189 importstrings for methods to serialize message parts. If just
189 importstrings for methods to serialize message parts. If just
190 'json' or 'pickle', predefined JSON and pickle packers will be used.
190 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 Otherwise, the entire importstring must be used.
191 Otherwise, the entire importstring must be used.
192
192
193 The functions must accept at least valid JSON input, and output *bytes*.
193 The functions must accept at least valid JSON input, and output *bytes*.
194
194
195 For example, to use msgpack:
195 For example, to use msgpack:
196 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
196 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 pack/unpack : callables
197 pack/unpack : callables
198 You can also set the pack/unpack callables for serialization directly.
198 You can also set the pack/unpack callables for serialization directly.
199 session : bytes
199 session : bytes
200 the ID of this Session object. The default is to generate a new UUID.
200 the ID of this Session object. The default is to generate a new UUID.
201 username : unicode
201 username : unicode
202 username added to message headers. The default is to ask the OS.
202 username added to message headers. The default is to ask the OS.
203 key : bytes
203 key : bytes
204 The key used to initialize an HMAC signature. If unset, messages
204 The key used to initialize an HMAC signature. If unset, messages
205 will not be signed or checked.
205 will not be signed or checked.
206 keyfile : filepath
206 keyfile : filepath
207 The file containing a key. If this is set, `key` will be initialized
207 The file containing a key. If this is set, `key` will be initialized
208 to the contents of the file.
208 to the contents of the file.
209
209
210 """
210 """
211
211
212 debug=Bool(False, config=True, help="""Debug output in the Session""")
212 debug=Bool(False, config=True, help="""Debug output in the Session""")
213
213
214 packer = DottedObjectName('json',config=True,
214 packer = DottedObjectName('json',config=True,
215 help="""The name of the packer for serializing messages.
215 help="""The name of the packer for serializing messages.
216 Should be one of 'json', 'pickle', or an import name
216 Should be one of 'json', 'pickle', or an import name
217 for a custom callable serializer.""")
217 for a custom callable serializer.""")
218 def _packer_changed(self, name, old, new):
218 def _packer_changed(self, name, old, new):
219 if new.lower() == 'json':
219 if new.lower() == 'json':
220 self.pack = json_packer
220 self.pack = json_packer
221 self.unpack = json_unpacker
221 self.unpack = json_unpacker
222 elif new.lower() == 'pickle':
222 elif new.lower() == 'pickle':
223 self.pack = pickle_packer
223 self.pack = pickle_packer
224 self.unpack = pickle_unpacker
224 self.unpack = pickle_unpacker
225 else:
225 else:
226 self.pack = import_item(str(new))
226 self.pack = import_item(str(new))
227
227
228 unpacker = DottedObjectName('json', config=True,
228 unpacker = DottedObjectName('json', config=True,
229 help="""The name of the unpacker for unserializing messages.
229 help="""The name of the unpacker for unserializing messages.
230 Only used with custom functions for `packer`.""")
230 Only used with custom functions for `packer`.""")
231 def _unpacker_changed(self, name, old, new):
231 def _unpacker_changed(self, name, old, new):
232 if new.lower() == 'json':
232 if new.lower() == 'json':
233 self.pack = json_packer
233 self.pack = json_packer
234 self.unpack = json_unpacker
234 self.unpack = json_unpacker
235 elif new.lower() == 'pickle':
235 elif new.lower() == 'pickle':
236 self.pack = pickle_packer
236 self.pack = pickle_packer
237 self.unpack = pickle_unpacker
237 self.unpack = pickle_unpacker
238 else:
238 else:
239 self.unpack = import_item(str(new))
239 self.unpack = import_item(str(new))
240
240
241 session = CBytes(b'', config=True,
241 session = CBytes(b'', config=True,
242 help="""The UUID identifying this session.""")
242 help="""The UUID identifying this session.""")
243 def _session_default(self):
243 def _session_default(self):
244 return bytes(uuid.uuid4())
244 return bytes(uuid.uuid4())
245
245
246 username = Unicode(os.environ.get('USER',u'username'), config=True,
246 username = Unicode(os.environ.get('USER',u'username'), config=True,
247 help="""Username for the Session. Default is your system username.""")
247 help="""Username for the Session. Default is your system username.""")
248
248
249 # message signature related traits:
249 # message signature related traits:
250 key = CBytes(b'', config=True,
250 key = CBytes(b'', config=True,
251 help="""execution key, for extra authentication.""")
251 help="""execution key, for extra authentication.""")
252 def _key_changed(self, name, old, new):
252 def _key_changed(self, name, old, new):
253 if new:
253 if new:
254 self.auth = hmac.HMAC(new)
254 self.auth = hmac.HMAC(new)
255 else:
255 else:
256 self.auth = None
256 self.auth = None
257 auth = Instance(hmac.HMAC)
257 auth = Instance(hmac.HMAC)
258 digest_history = Set()
258 digest_history = Set()
259
259
260 keyfile = Unicode('', config=True,
260 keyfile = Unicode('', config=True,
261 help="""path to file containing execution key.""")
261 help="""path to file containing execution key.""")
262 def _keyfile_changed(self, name, old, new):
262 def _keyfile_changed(self, name, old, new):
263 with open(new, 'rb') as f:
263 with open(new, 'rb') as f:
264 self.key = f.read().strip()
264 self.key = f.read().strip()
265
265
266 pack = Any(default_packer) # the actual packer function
266 pack = Any(default_packer) # the actual packer function
267 def _pack_changed(self, name, old, new):
267 def _pack_changed(self, name, old, new):
268 if not callable(new):
268 if not callable(new):
269 raise TypeError("packer must be callable, not %s"%type(new))
269 raise TypeError("packer must be callable, not %s"%type(new))
270
270
271 unpack = Any(default_unpacker) # the actual packer function
271 unpack = Any(default_unpacker) # the actual packer function
272 def _unpack_changed(self, name, old, new):
272 def _unpack_changed(self, name, old, new):
273 # unpacker is not checked - it is assumed to be
273 # unpacker is not checked - it is assumed to be
274 if not callable(new):
274 if not callable(new):
275 raise TypeError("unpacker must be callable, not %s"%type(new))
275 raise TypeError("unpacker must be callable, not %s"%type(new))
276
276
277 def __init__(self, **kwargs):
277 def __init__(self, **kwargs):
278 """create a Session object
278 """create a Session object
279
279
280 Parameters
280 Parameters
281 ----------
281 ----------
282
282
283 debug : bool
283 debug : bool
284 whether to trigger extra debugging statements
284 whether to trigger extra debugging statements
285 packer/unpacker : str : 'json', 'pickle' or import_string
285 packer/unpacker : str : 'json', 'pickle' or import_string
286 importstrings for methods to serialize message parts. If just
286 importstrings for methods to serialize message parts. If just
287 'json' or 'pickle', predefined JSON and pickle packers will be used.
287 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 Otherwise, the entire importstring must be used.
288 Otherwise, the entire importstring must be used.
289
289
290 The functions must accept at least valid JSON input, and output
290 The functions must accept at least valid JSON input, and output
291 *bytes*.
291 *bytes*.
292
292
293 For example, to use msgpack:
293 For example, to use msgpack:
294 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
294 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 pack/unpack : callables
295 pack/unpack : callables
296 You can also set the pack/unpack callables for serialization
296 You can also set the pack/unpack callables for serialization
297 directly.
297 directly.
298 session : bytes
298 session : bytes
299 the ID of this Session object. The default is to generate a new
299 the ID of this Session object. The default is to generate a new
300 UUID.
300 UUID.
301 username : unicode
301 username : unicode
302 username added to message headers. The default is to ask the OS.
302 username added to message headers. The default is to ask the OS.
303 key : bytes
303 key : bytes
304 The key used to initialize an HMAC signature. If unset, messages
304 The key used to initialize an HMAC signature. If unset, messages
305 will not be signed or checked.
305 will not be signed or checked.
306 keyfile : filepath
306 keyfile : filepath
307 The file containing a key. If this is set, `key` will be
307 The file containing a key. If this is set, `key` will be
308 initialized to the contents of the file.
308 initialized to the contents of the file.
309 """
309 """
310 super(Session, self).__init__(**kwargs)
310 super(Session, self).__init__(**kwargs)
311 self._check_packers()
311 self._check_packers()
312 self.none = self.pack({})
312 self.none = self.pack({})
313
313
314 @property
314 @property
315 def msg_id(self):
315 def msg_id(self):
316 """always return new uuid"""
316 """always return new uuid"""
317 return str(uuid.uuid4())
317 return str(uuid.uuid4())
318
318
319 def _check_packers(self):
319 def _check_packers(self):
320 """check packers for binary data and datetime support."""
320 """check packers for binary data and datetime support."""
321 pack = self.pack
321 pack = self.pack
322 unpack = self.unpack
322 unpack = self.unpack
323
323
324 # check simple serialization
324 # check simple serialization
325 msg = dict(a=[1,'hi'])
325 msg = dict(a=[1,'hi'])
326 try:
326 try:
327 packed = pack(msg)
327 packed = pack(msg)
328 except Exception:
328 except Exception:
329 raise ValueError("packer could not serialize a simple message")
329 raise ValueError("packer could not serialize a simple message")
330
330
331 # ensure packed message is bytes
331 # ensure packed message is bytes
332 if not isinstance(packed, bytes):
332 if not isinstance(packed, bytes):
333 raise ValueError("message packed to %r, but bytes are required"%type(packed))
333 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334
334
335 # check that unpack is pack's inverse
335 # check that unpack is pack's inverse
336 try:
336 try:
337 unpacked = unpack(packed)
337 unpacked = unpack(packed)
338 except Exception:
338 except Exception:
339 raise ValueError("unpacker could not handle the packer's output")
339 raise ValueError("unpacker could not handle the packer's output")
340
340
341 # check datetime support
341 # check datetime support
342 msg = dict(t=datetime.now())
342 msg = dict(t=datetime.now())
343 try:
343 try:
344 unpacked = unpack(pack(msg))
344 unpacked = unpack(pack(msg))
345 except Exception:
345 except Exception:
346 self.pack = lambda o: pack(squash_dates(o))
346 self.pack = lambda o: pack(squash_dates(o))
347 self.unpack = lambda s: extract_dates(unpack(s))
347 self.unpack = lambda s: extract_dates(unpack(s))
348
348
349 def msg_header(self, msg_type):
349 def msg_header(self, msg_type):
350 return msg_header(self.msg_id, msg_type, self.username, self.session)
350 return msg_header(self.msg_id, msg_type, self.username, self.session)
351
351
352 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
352 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
353 """Return the nested message dict.
353 """Return the nested message dict.
354
354
355 This format is different from what is sent over the wire. The
355 This format is different from what is sent over the wire. The
356 serialize/unserialize methods converts this nested message dict to the wire
356 serialize/unserialize methods converts this nested message dict to the wire
357 format, which is a list of message parts.
357 format, which is a list of message parts.
358 """
358 """
359 msg = {}
359 msg = {}
360 header = self.msg_header(msg_type) if header is None else header
360 header = self.msg_header(msg_type) if header is None else header
361 msg['header'] = header
361 msg['header'] = header
362 msg['msg_id'] = header['msg_id']
362 msg['msg_id'] = header['msg_id']
363 msg['msg_type'] = header['msg_type']
363 msg['msg_type'] = header['msg_type']
364 msg['parent_header'] = {} if parent is None else extract_header(parent)
364 msg['parent_header'] = {} if parent is None else extract_header(parent)
365 msg['content'] = {} if content is None else content
365 msg['content'] = {} if content is None else content
366 sub = {} if subheader is None else subheader
366 sub = {} if subheader is None else subheader
367 msg['header'].update(sub)
367 msg['header'].update(sub)
368 return msg
368 return msg
369
369
370 def sign(self, msg_list):
370 def sign(self, msg_list):
371 """Sign a message with HMAC digest. If no auth, return b''.
371 """Sign a message with HMAC digest. If no auth, return b''.
372
372
373 Parameters
373 Parameters
374 ----------
374 ----------
375 msg_list : list
375 msg_list : list
376 The [p_header,p_parent,p_content] part of the message list.
376 The [p_header,p_parent,p_content] part of the message list.
377 """
377 """
378 if self.auth is None:
378 if self.auth is None:
379 return b''
379 return b''
380 h = self.auth.copy()
380 h = self.auth.copy()
381 for m in msg_list:
381 for m in msg_list:
382 h.update(m)
382 h.update(m)
383 return str_to_bytes(h.hexdigest())
383 return str_to_bytes(h.hexdigest())
384
384
385 def serialize(self, msg, ident=None):
385 def serialize(self, msg, ident=None):
386 """Serialize the message components to bytes.
386 """Serialize the message components to bytes.
387
387
388 This is roughly the inverse of unserialize. The serialize/unserialize
388 This is roughly the inverse of unserialize. The serialize/unserialize
389 methods work with full message lists, whereas pack/unpack work with
389 methods work with full message lists, whereas pack/unpack work with
390 the individual message parts in the message list.
390 the individual message parts in the message list.
391
391
392 Parameters
392 Parameters
393 ----------
393 ----------
394 msg : dict or Message
394 msg : dict or Message
395 The nexted message dict as returned by the self.msg method.
395 The nexted message dict as returned by the self.msg method.
396
396
397 Returns
397 Returns
398 -------
398 -------
399 msg_list : list
399 msg_list : list
400 The list of bytes objects to be sent with the format:
400 The list of bytes objects to be sent with the format:
401 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
401 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
402 buffer1,buffer2,...]. In this list, the p_* entities are
402 buffer1,buffer2,...]. In this list, the p_* entities are
403 the packed or serialized versions, so if JSON is used, these
403 the packed or serialized versions, so if JSON is used, these
404 are uft8 encoded JSON strings.
404 are utf8 encoded JSON strings.
405 """
405 """
406 content = msg.get('content', {})
406 content = msg.get('content', {})
407 if content is None:
407 if content is None:
408 content = self.none
408 content = self.none
409 elif isinstance(content, dict):
409 elif isinstance(content, dict):
410 content = self.pack(content)
410 content = self.pack(content)
411 elif isinstance(content, bytes):
411 elif isinstance(content, bytes):
412 # content is already packed, as in a relayed message
412 # content is already packed, as in a relayed message
413 pass
413 pass
414 elif isinstance(content, unicode):
414 elif isinstance(content, unicode):
415 # should be bytes, but JSON often spits out unicode
415 # should be bytes, but JSON often spits out unicode
416 content = content.encode('utf8')
416 content = content.encode('utf8')
417 else:
417 else:
418 raise TypeError("Content incorrect type: %s"%type(content))
418 raise TypeError("Content incorrect type: %s"%type(content))
419
419
420 real_message = [self.pack(msg['header']),
420 real_message = [self.pack(msg['header']),
421 self.pack(msg['parent_header']),
421 self.pack(msg['parent_header']),
422 content
422 content
423 ]
423 ]
424
424
425 to_send = []
425 to_send = []
426
426
427 if isinstance(ident, list):
427 if isinstance(ident, list):
428 # accept list of idents
428 # accept list of idents
429 to_send.extend(ident)
429 to_send.extend(ident)
430 elif ident is not None:
430 elif ident is not None:
431 to_send.append(ident)
431 to_send.append(ident)
432 to_send.append(DELIM)
432 to_send.append(DELIM)
433
433
434 signature = self.sign(real_message)
434 signature = self.sign(real_message)
435 to_send.append(signature)
435 to_send.append(signature)
436
436
437 to_send.extend(real_message)
437 to_send.extend(real_message)
438
438
439 return to_send
439 return to_send
440
440
441 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
441 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
442 buffers=None, subheader=None, track=False, header=None):
442 buffers=None, subheader=None, track=False, header=None):
443 """Build and send a message via stream or socket.
443 """Build and send a message via stream or socket.
444
444
445 The message format used by this function internally is as follows:
445 The message format used by this function internally is as follows:
446
446
447 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
447 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
448 buffer1,buffer2,...]
448 buffer1,buffer2,...]
449
449
450 The serialize/unserialize methods convert the nested message dict into this
450 The serialize/unserialize methods convert the nested message dict into this
451 format.
451 format.
452
452
453 Parameters
453 Parameters
454 ----------
454 ----------
455
455
456 stream : zmq.Socket or ZMQStream
456 stream : zmq.Socket or ZMQStream
457 The socket-like object used to send the data.
457 The socket-like object used to send the data.
458 msg_or_type : str or Message/dict
458 msg_or_type : str or Message/dict
459 Normally, msg_or_type will be a msg_type unless a message is being
459 Normally, msg_or_type will be a msg_type unless a message is being
460 sent more than once. If a header is supplied, this can be set to
460 sent more than once. If a header is supplied, this can be set to
461 None and the msg_type will be pulled from the header.
461 None and the msg_type will be pulled from the header.
462
462
463 content : dict or None
463 content : dict or None
464 The content of the message (ignored if msg_or_type is a message).
464 The content of the message (ignored if msg_or_type is a message).
465 header : dict or None
465 header : dict or None
466 The header dict for the message (ignores if msg_to_type is a message).
466 The header dict for the message (ignores if msg_to_type is a message).
467 parent : Message or dict or None
467 parent : Message or dict or None
468 The parent or parent header describing the parent of this message
468 The parent or parent header describing the parent of this message
469 (ignored if msg_or_type is a message).
469 (ignored if msg_or_type is a message).
470 ident : bytes or list of bytes
470 ident : bytes or list of bytes
471 The zmq.IDENTITY routing path.
471 The zmq.IDENTITY routing path.
472 subheader : dict or None
472 subheader : dict or None
473 Extra header keys for this message's header (ignored if msg_or_type
473 Extra header keys for this message's header (ignored if msg_or_type
474 is a message).
474 is a message).
475 buffers : list or None
475 buffers : list or None
476 The already-serialized buffers to be appended to the message.
476 The already-serialized buffers to be appended to the message.
477 track : bool
477 track : bool
478 Whether to track. Only for use with Sockets, because ZMQStream
478 Whether to track. Only for use with Sockets, because ZMQStream
479 objects cannot track messages.
479 objects cannot track messages.
480
480
481 Returns
481 Returns
482 -------
482 -------
483 msg : dict
483 msg : dict
484 The constructed message.
484 The constructed message.
485 (msg,tracker) : (dict, MessageTracker)
485 (msg,tracker) : (dict, MessageTracker)
486 if track=True, then a 2-tuple will be returned,
486 if track=True, then a 2-tuple will be returned,
487 the first element being the constructed
487 the first element being the constructed
488 message, and the second being the MessageTracker
488 message, and the second being the MessageTracker
489
489
490 """
490 """
491
491
492 if not isinstance(stream, (zmq.Socket, ZMQStream)):
492 if not isinstance(stream, (zmq.Socket, ZMQStream)):
493 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
493 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
494 elif track and isinstance(stream, ZMQStream):
494 elif track and isinstance(stream, ZMQStream):
495 raise TypeError("ZMQStream cannot track messages")
495 raise TypeError("ZMQStream cannot track messages")
496
496
497 if isinstance(msg_or_type, (Message, dict)):
497 if isinstance(msg_or_type, (Message, dict)):
498 # We got a Message or message dict, not a msg_type so don't
498 # We got a Message or message dict, not a msg_type so don't
499 # build a new Message.
499 # build a new Message.
500 msg = msg_or_type
500 msg = msg_or_type
501 else:
501 else:
502 msg = self.msg(msg_or_type, content=content, parent=parent,
502 msg = self.msg(msg_or_type, content=content, parent=parent,
503 subheader=subheader, header=header)
503 subheader=subheader, header=header)
504
504
505 buffers = [] if buffers is None else buffers
505 buffers = [] if buffers is None else buffers
506 to_send = self.serialize(msg, ident)
506 to_send = self.serialize(msg, ident)
507 flag = 0
507 flag = 0
508 if buffers:
508 if buffers:
509 flag = zmq.SNDMORE
509 flag = zmq.SNDMORE
510 _track = False
510 _track = False
511 else:
511 else:
512 _track=track
512 _track=track
513 if track:
513 if track:
514 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
514 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
515 else:
515 else:
516 tracker = stream.send_multipart(to_send, flag, copy=False)
516 tracker = stream.send_multipart(to_send, flag, copy=False)
517 for b in buffers[:-1]:
517 for b in buffers[:-1]:
518 stream.send(b, flag, copy=False)
518 stream.send(b, flag, copy=False)
519 if buffers:
519 if buffers:
520 if track:
520 if track:
521 tracker = stream.send(buffers[-1], copy=False, track=track)
521 tracker = stream.send(buffers[-1], copy=False, track=track)
522 else:
522 else:
523 tracker = stream.send(buffers[-1], copy=False)
523 tracker = stream.send(buffers[-1], copy=False)
524
524
525 # omsg = Message(msg)
525 # omsg = Message(msg)
526 if self.debug:
526 if self.debug:
527 pprint.pprint(msg)
527 pprint.pprint(msg)
528 pprint.pprint(to_send)
528 pprint.pprint(to_send)
529 pprint.pprint(buffers)
529 pprint.pprint(buffers)
530
530
531 msg['tracker'] = tracker
531 msg['tracker'] = tracker
532
532
533 return msg
533 return msg
534
534
535 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
535 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
536 """Send a raw message via ident path.
536 """Send a raw message via ident path.
537
537
538 This method is used to send a already serialized message.
538 This method is used to send a already serialized message.
539
539
540 Parameters
540 Parameters
541 ----------
541 ----------
542 stream : ZMQStream or Socket
542 stream : ZMQStream or Socket
543 The ZMQ stream or socket to use for sending the message.
543 The ZMQ stream or socket to use for sending the message.
544 msg_list : list
544 msg_list : list
545 The serialized list of messages to send. This only includes the
545 The serialized list of messages to send. This only includes the
546 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
546 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
547 the message.
547 the message.
548 ident : ident or list
548 ident : ident or list
549 A single ident or a list of idents to use in sending.
549 A single ident or a list of idents to use in sending.
550 """
550 """
551 to_send = []
551 to_send = []
552 if isinstance(ident, bytes):
552 if isinstance(ident, bytes):
553 ident = [ident]
553 ident = [ident]
554 if ident is not None:
554 if ident is not None:
555 to_send.extend(ident)
555 to_send.extend(ident)
556
556
557 to_send.append(DELIM)
557 to_send.append(DELIM)
558 to_send.append(self.sign(msg_list))
558 to_send.append(self.sign(msg_list))
559 to_send.extend(msg_list)
559 to_send.extend(msg_list)
560 stream.send_multipart(msg_list, flags, copy=copy)
560 stream.send_multipart(msg_list, flags, copy=copy)
561
561
562 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
562 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
563 """Receive and unpack a message.
563 """Receive and unpack a message.
564
564
565 Parameters
565 Parameters
566 ----------
566 ----------
567 socket : ZMQStream or Socket
567 socket : ZMQStream or Socket
568 The socket or stream to use in receiving.
568 The socket or stream to use in receiving.
569
569
570 Returns
570 Returns
571 -------
571 -------
572 [idents], msg
572 [idents], msg
573 [idents] is a list of idents and msg is a nested message dict of
573 [idents] is a list of idents and msg is a nested message dict of
574 same format as self.msg returns.
574 same format as self.msg returns.
575 """
575 """
576 if isinstance(socket, ZMQStream):
576 if isinstance(socket, ZMQStream):
577 socket = socket.socket
577 socket = socket.socket
578 try:
578 try:
579 msg_list = socket.recv_multipart(mode)
579 msg_list = socket.recv_multipart(mode)
580 except zmq.ZMQError as e:
580 except zmq.ZMQError as e:
581 if e.errno == zmq.EAGAIN:
581 if e.errno == zmq.EAGAIN:
582 # We can convert EAGAIN to None as we know in this case
582 # We can convert EAGAIN to None as we know in this case
583 # recv_multipart won't return None.
583 # recv_multipart won't return None.
584 return None,None
584 return None,None
585 else:
585 else:
586 raise
586 raise
587 # split multipart message into identity list and message dict
587 # split multipart message into identity list and message dict
588 # invalid large messages can cause very expensive string comparisons
588 # invalid large messages can cause very expensive string comparisons
589 idents, msg_list = self.feed_identities(msg_list, copy)
589 idents, msg_list = self.feed_identities(msg_list, copy)
590 try:
590 try:
591 return idents, self.unserialize(msg_list, content=content, copy=copy)
591 return idents, self.unserialize(msg_list, content=content, copy=copy)
592 except Exception as e:
592 except Exception as e:
593 # TODO: handle it
593 # TODO: handle it
594 raise e
594 raise e
595
595
596 def feed_identities(self, msg_list, copy=True):
596 def feed_identities(self, msg_list, copy=True):
597 """Split the identities from the rest of the message.
597 """Split the identities from the rest of the message.
598
598
599 Feed until DELIM is reached, then return the prefix as idents and
599 Feed until DELIM is reached, then return the prefix as idents and
600 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
600 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
601 but that would be silly.
601 but that would be silly.
602
602
603 Parameters
603 Parameters
604 ----------
604 ----------
605 msg_list : a list of Message or bytes objects
605 msg_list : a list of Message or bytes objects
606 The message to be split.
606 The message to be split.
607 copy : bool
607 copy : bool
608 flag determining whether the arguments are bytes or Messages
608 flag determining whether the arguments are bytes or Messages
609
609
610 Returns
610 Returns
611 -------
611 -------
612 (idents, msg_list) : two lists
612 (idents, msg_list) : two lists
613 idents will always be a list of bytes, each of which is a ZMQ
613 idents will always be a list of bytes, each of which is a ZMQ
614 identity. msg_list will be a list of bytes or zmq.Messages of the
614 identity. msg_list will be a list of bytes or zmq.Messages of the
615 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
615 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
616 should be unpackable/unserializable via self.unserialize at this
616 should be unpackable/unserializable via self.unserialize at this
617 point.
617 point.
618 """
618 """
619 if copy:
619 if copy:
620 idx = msg_list.index(DELIM)
620 idx = msg_list.index(DELIM)
621 return msg_list[:idx], msg_list[idx+1:]
621 return msg_list[:idx], msg_list[idx+1:]
622 else:
622 else:
623 failed = True
623 failed = True
624 for idx,m in enumerate(msg_list):
624 for idx,m in enumerate(msg_list):
625 if m.bytes == DELIM:
625 if m.bytes == DELIM:
626 failed = False
626 failed = False
627 break
627 break
628 if failed:
628 if failed:
629 raise ValueError("DELIM not in msg_list")
629 raise ValueError("DELIM not in msg_list")
630 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
630 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
631 return [m.bytes for m in idents], msg_list
631 return [m.bytes for m in idents], msg_list
632
632
633 def unserialize(self, msg_list, content=True, copy=True):
633 def unserialize(self, msg_list, content=True, copy=True):
634 """Unserialize a msg_list to a nested message dict.
634 """Unserialize a msg_list to a nested message dict.
635
635
636 This is roughly the inverse of serialize. The serialize/unserialize
636 This is roughly the inverse of serialize. The serialize/unserialize
637 methods work with full message lists, whereas pack/unpack work with
637 methods work with full message lists, whereas pack/unpack work with
638 the individual message parts in the message list.
638 the individual message parts in the message list.
639
639
640 Parameters:
640 Parameters:
641 -----------
641 -----------
642 msg_list : list of bytes or Message objects
642 msg_list : list of bytes or Message objects
643 The list of message parts of the form [HMAC,p_header,p_parent,
643 The list of message parts of the form [HMAC,p_header,p_parent,
644 p_content,buffer1,buffer2,...].
644 p_content,buffer1,buffer2,...].
645 content : bool (True)
645 content : bool (True)
646 Whether to unpack the content dict (True), or leave it packed
646 Whether to unpack the content dict (True), or leave it packed
647 (False).
647 (False).
648 copy : bool (True)
648 copy : bool (True)
649 Whether to return the bytes (True), or the non-copying Message
649 Whether to return the bytes (True), or the non-copying Message
650 object in each place (False).
650 object in each place (False).
651
651
652 Returns
652 Returns
653 -------
653 -------
654 msg : dict
654 msg : dict
655 The nested message dict with top-level keys [header, parent_header,
655 The nested message dict with top-level keys [header, parent_header,
656 content, buffers].
656 content, buffers].
657 """
657 """
658 minlen = 4
658 minlen = 4
659 message = {}
659 message = {}
660 if not copy:
660 if not copy:
661 for i in range(minlen):
661 for i in range(minlen):
662 msg_list[i] = msg_list[i].bytes
662 msg_list[i] = msg_list[i].bytes
663 if self.auth is not None:
663 if self.auth is not None:
664 signature = msg_list[0]
664 signature = msg_list[0]
665 if not signature:
665 if not signature:
666 raise ValueError("Unsigned Message")
666 raise ValueError("Unsigned Message")
667 if signature in self.digest_history:
667 if signature in self.digest_history:
668 raise ValueError("Duplicate Signature: %r"%signature)
668 raise ValueError("Duplicate Signature: %r"%signature)
669 self.digest_history.add(signature)
669 self.digest_history.add(signature)
670 check = self.sign(msg_list[1:4])
670 check = self.sign(msg_list[1:4])
671 if not signature == check:
671 if not signature == check:
672 raise ValueError("Invalid Signature: %r"%signature)
672 raise ValueError("Invalid Signature: %r"%signature)
673 if not len(msg_list) >= minlen:
673 if not len(msg_list) >= minlen:
674 raise TypeError("malformed message, must have at least %i elements"%minlen)
674 raise TypeError("malformed message, must have at least %i elements"%minlen)
675 header = self.unpack(msg_list[1])
675 header = self.unpack(msg_list[1])
676 message['header'] = header
676 message['header'] = header
677 message['msg_id'] = header['msg_id']
677 message['msg_id'] = header['msg_id']
678 message['msg_type'] = header['msg_type']
678 message['msg_type'] = header['msg_type']
679 message['parent_header'] = self.unpack(msg_list[2])
679 message['parent_header'] = self.unpack(msg_list[2])
680 if content:
680 if content:
681 message['content'] = self.unpack(msg_list[3])
681 message['content'] = self.unpack(msg_list[3])
682 else:
682 else:
683 message['content'] = msg_list[3]
683 message['content'] = msg_list[3]
684
684
685 message['buffers'] = msg_list[4:]
685 message['buffers'] = msg_list[4:]
686 return message
686 return message
687
687
688 def test_msg2obj():
688 def test_msg2obj():
689 am = dict(x=1)
689 am = dict(x=1)
690 ao = Message(am)
690 ao = Message(am)
691 assert ao.x == am['x']
691 assert ao.x == am['x']
692
692
693 am['y'] = dict(z=1)
693 am['y'] = dict(z=1)
694 ao = Message(am)
694 ao = Message(am)
695 assert ao.y.z == am['y']['z']
695 assert ao.y.z == am['y']['z']
696
696
697 k1, k2 = 'y', 'z'
697 k1, k2 = 'y', 'z'
698 assert ao[k1][k2] == am[k1][k2]
698 assert ao[k1][k2] == am[k1][k2]
699
699
700 am2 = dict(ao)
700 am2 = dict(ao)
701 assert am['x'] == am2['x']
701 assert am['x'] == am2['x']
702 assert am['y']['z'] == am2['y']['z']
702 assert am['y']['z'] == am2['y']['z']
703
703
General Comments 0
You need to be logged in to leave comments. Login now