##// END OF EJS Templates
Added more docstrings to IPython.zmq.session....
Brian E. Granger -
Show More
@@ -1,628 +1,679 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Session object for building, serializing, sending, and receiving messages in
2 """Session object for building, serializing, sending, and receiving messages in
3 IPython. The Session object supports serialization, HMAC signatures, and
3 IPython. The Session object supports serialization, HMAC signatures, and
4 metadata on messages.
4 metadata on messages.
5
5
6 Also defined here are utilities for working with Sessions:
6 Also defined here are utilities for working with Sessions:
7 * A SessionFactory to be used as a base class for configurables that work with
7 * A SessionFactory to be used as a base class for configurables that work with
8 Sessions.
8 Sessions.
9 * A Message object for convenience that allows attribute-access to the msg dict.
9 * A Message object for convenience that allows attribute-access to the msg dict.
10
10
11 Authors:
11 Authors:
12
12
13 * Min RK
13 * Min RK
14 * Brian Granger
14 * Brian Granger
15 * Fernando Perez
15 * Fernando Perez
16 """
16 """
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Copyright (C) 2010-2011 The IPython Development Team
18 # Copyright (C) 2010-2011 The IPython Development Team
19 #
19 #
20 # Distributed under the terms of the BSD License. The full license is in
20 # Distributed under the terms of the BSD License. The full license is in
21 # the file COPYING, distributed as part of this software.
21 # the file COPYING, distributed as part of this software.
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Imports
25 # Imports
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 import hmac
28 import hmac
29 import logging
29 import logging
30 import os
30 import os
31 import pprint
31 import pprint
32 import uuid
32 import uuid
33 from datetime import datetime
33 from datetime import datetime
34
34
35 try:
35 try:
36 import cPickle
36 import cPickle
37 pickle = cPickle
37 pickle = cPickle
38 except:
38 except:
39 cPickle = None
39 cPickle = None
40 import pickle
40 import pickle
41
41
42 import zmq
42 import zmq
43 from zmq.utils import jsonapi
43 from zmq.utils import jsonapi
44 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.ioloop import IOLoop
45 from zmq.eventloop.zmqstream import ZMQStream
45 from zmq.eventloop.zmqstream import ZMQStream
46
46
47 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils.importstring import import_item
48 from IPython.utils.importstring import import_item
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 DottedObjectName)
51 DottedObjectName)
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # utility functions
54 # utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 def squash_unicode(obj):
57 def squash_unicode(obj):
58 """coerce unicode back to bytestrings."""
58 """coerce unicode back to bytestrings."""
59 if isinstance(obj,dict):
59 if isinstance(obj,dict):
60 for key in obj.keys():
60 for key in obj.keys():
61 obj[key] = squash_unicode(obj[key])
61 obj[key] = squash_unicode(obj[key])
62 if isinstance(key, unicode):
62 if isinstance(key, unicode):
63 obj[squash_unicode(key)] = obj.pop(key)
63 obj[squash_unicode(key)] = obj.pop(key)
64 elif isinstance(obj, list):
64 elif isinstance(obj, list):
65 for i,v in enumerate(obj):
65 for i,v in enumerate(obj):
66 obj[i] = squash_unicode(v)
66 obj[i] = squash_unicode(v)
67 elif isinstance(obj, unicode):
67 elif isinstance(obj, unicode):
68 obj = obj.encode('utf8')
68 obj = obj.encode('utf8')
69 return obj
69 return obj
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # globals and defaults
72 # globals and defaults
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
77
77
78 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_packer = lambda o: pickle.dumps(o,-1)
79 pickle_unpacker = pickle.loads
79 pickle_unpacker = pickle.loads
80
80
81 default_packer = json_packer
81 default_packer = json_packer
82 default_unpacker = json_unpacker
82 default_unpacker = json_unpacker
83
83
84
84
85 DELIM=b"<IDS|MSG>"
85 DELIM=b"<IDS|MSG>"
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # Classes
88 # Classes
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91 class SessionFactory(LoggingConfigurable):
91 class SessionFactory(LoggingConfigurable):
92 """The Base class for configurables that have a Session, Context, logger,
92 """The Base class for configurables that have a Session, Context, logger,
93 and IOLoop.
93 and IOLoop.
94 """
94 """
95
95
96 logname = Unicode('')
96 logname = Unicode('')
97 def _logname_changed(self, name, old, new):
97 def _logname_changed(self, name, old, new):
98 self.log = logging.getLogger(new)
98 self.log = logging.getLogger(new)
99
99
100 # not configurable:
100 # not configurable:
101 context = Instance('zmq.Context')
101 context = Instance('zmq.Context')
102 def _context_default(self):
102 def _context_default(self):
103 return zmq.Context.instance()
103 return zmq.Context.instance()
104
104
105 session = Instance('IPython.zmq.session.Session')
105 session = Instance('IPython.zmq.session.Session')
106
106
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
108 def _loop_default(self):
108 def _loop_default(self):
109 return IOLoop.instance()
109 return IOLoop.instance()
110
110
111 def __init__(self, **kwargs):
111 def __init__(self, **kwargs):
112 super(SessionFactory, self).__init__(**kwargs)
112 super(SessionFactory, self).__init__(**kwargs)
113
113
114 if self.session is None:
114 if self.session is None:
115 # construct the session
115 # construct the session
116 self.session = Session(**kwargs)
116 self.session = Session(**kwargs)
117
117
118
118
119 class Message(object):
119 class Message(object):
120 """A simple message object that maps dict keys to attributes.
120 """A simple message object that maps dict keys to attributes.
121
121
122 A Message can be created from a dict and a dict from a Message instance
122 A Message can be created from a dict and a dict from a Message instance
123 simply by calling dict(msg_obj)."""
123 simply by calling dict(msg_obj)."""
124
124
125 def __init__(self, msg_dict):
125 def __init__(self, msg_dict):
126 dct = self.__dict__
126 dct = self.__dict__
127 for k, v in dict(msg_dict).iteritems():
127 for k, v in dict(msg_dict).iteritems():
128 if isinstance(v, dict):
128 if isinstance(v, dict):
129 v = Message(v)
129 v = Message(v)
130 dct[k] = v
130 dct[k] = v
131
131
132 # Having this iterator lets dict(msg_obj) work out of the box.
132 # Having this iterator lets dict(msg_obj) work out of the box.
133 def __iter__(self):
133 def __iter__(self):
134 return iter(self.__dict__.iteritems())
134 return iter(self.__dict__.iteritems())
135
135
136 def __repr__(self):
136 def __repr__(self):
137 return repr(self.__dict__)
137 return repr(self.__dict__)
138
138
139 def __str__(self):
139 def __str__(self):
140 return pprint.pformat(self.__dict__)
140 return pprint.pformat(self.__dict__)
141
141
142 def __contains__(self, k):
142 def __contains__(self, k):
143 return k in self.__dict__
143 return k in self.__dict__
144
144
145 def __getitem__(self, k):
145 def __getitem__(self, k):
146 return self.__dict__[k]
146 return self.__dict__[k]
147
147
148
148
149 def msg_header(msg_id, msg_type, username, session):
149 def msg_header(msg_id, msg_type, username, session):
150 date = datetime.now()
150 date = datetime.now()
151 return locals()
151 return locals()
152
152
153 def extract_header(msg_or_header):
153 def extract_header(msg_or_header):
154 """Given a message or header, return the header."""
154 """Given a message or header, return the header."""
155 if not msg_or_header:
155 if not msg_or_header:
156 return {}
156 return {}
157 try:
157 try:
158 # See if msg_or_header is the entire message.
158 # See if msg_or_header is the entire message.
159 h = msg_or_header['header']
159 h = msg_or_header['header']
160 except KeyError:
160 except KeyError:
161 try:
161 try:
162 # See if msg_or_header is just the header
162 # See if msg_or_header is just the header
163 h = msg_or_header['msg_id']
163 h = msg_or_header['msg_id']
164 except KeyError:
164 except KeyError:
165 raise
165 raise
166 else:
166 else:
167 h = msg_or_header
167 h = msg_or_header
168 if not isinstance(h, dict):
168 if not isinstance(h, dict):
169 h = dict(h)
169 h = dict(h)
170 return h
170 return h
171
171
172 class Session(Configurable):
172 class Session(Configurable):
173 """Object for handling serialization and sending of messages.
173 """Object for handling serialization and sending of messages.
174
174
175 The Session object handles building messages and sending them
175 The Session object handles building messages and sending them
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
177 other over the network via Session objects, and only need to work with the
177 other over the network via Session objects, and only need to work with the
178 dict-based IPython message spec. The Session will handle
178 dict-based IPython message spec. The Session will handle
179 serialization/deserialization, security, and metadata.
179 serialization/deserialization, security, and metadata.
180
180
181 Sessions support configurable serialiization via packer/unpacker traits,
181 Sessions support configurable serialiization via packer/unpacker traits,
182 and signing with HMAC digests via the key/keyfile traits.
182 and signing with HMAC digests via the key/keyfile traits.
183
183
184 Parameters
184 Parameters
185 ----------
185 ----------
186
186
187 debug : bool
187 debug : bool
188 whether to trigger extra debugging statements
188 whether to trigger extra debugging statements
189 packer/unpacker : str : 'json', 'pickle' or import_string
189 packer/unpacker : str : 'json', 'pickle' or import_string
190 importstrings for methods to serialize message parts. If just
190 importstrings for methods to serialize message parts. If just
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
192 Otherwise, the entire importstring must be used.
192 Otherwise, the entire importstring must be used.
193
193
194 The functions must accept at least valid JSON input, and output *bytes*.
194 The functions must accept at least valid JSON input, and output *bytes*.
195
195
196 For example, to use msgpack:
196 For example, to use msgpack:
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
198 pack/unpack : callables
198 pack/unpack : callables
199 You can also set the pack/unpack callables for serialization directly.
199 You can also set the pack/unpack callables for serialization directly.
200 session : bytes
200 session : bytes
201 the ID of this Session object. The default is to generate a new UUID.
201 the ID of this Session object. The default is to generate a new UUID.
202 username : unicode
202 username : unicode
203 username added to message headers. The default is to ask the OS.
203 username added to message headers. The default is to ask the OS.
204 key : bytes
204 key : bytes
205 The key used to initialize an HMAC signature. If unset, messages
205 The key used to initialize an HMAC signature. If unset, messages
206 will not be signed or checked.
206 will not be signed or checked.
207 keyfile : filepath
207 keyfile : filepath
208 The file containing a key. If this is set, `key` will be initialized
208 The file containing a key. If this is set, `key` will be initialized
209 to the contents of the file.
209 to the contents of the file.
210
210
211 """
211 """
212
212
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
214
214
215 packer = DottedObjectName('json',config=True,
215 packer = DottedObjectName('json',config=True,
216 help="""The name of the packer for serializing messages.
216 help="""The name of the packer for serializing messages.
217 Should be one of 'json', 'pickle', or an import name
217 Should be one of 'json', 'pickle', or an import name
218 for a custom callable serializer.""")
218 for a custom callable serializer.""")
219 def _packer_changed(self, name, old, new):
219 def _packer_changed(self, name, old, new):
220 if new.lower() == 'json':
220 if new.lower() == 'json':
221 self.pack = json_packer
221 self.pack = json_packer
222 self.unpack = json_unpacker
222 self.unpack = json_unpacker
223 elif new.lower() == 'pickle':
223 elif new.lower() == 'pickle':
224 self.pack = pickle_packer
224 self.pack = pickle_packer
225 self.unpack = pickle_unpacker
225 self.unpack = pickle_unpacker
226 else:
226 else:
227 self.pack = import_item(str(new))
227 self.pack = import_item(str(new))
228
228
229 unpacker = DottedObjectName('json', config=True,
229 unpacker = DottedObjectName('json', config=True,
230 help="""The name of the unpacker for unserializing messages.
230 help="""The name of the unpacker for unserializing messages.
231 Only used with custom functions for `packer`.""")
231 Only used with custom functions for `packer`.""")
232 def _unpacker_changed(self, name, old, new):
232 def _unpacker_changed(self, name, old, new):
233 if new.lower() == 'json':
233 if new.lower() == 'json':
234 self.pack = json_packer
234 self.pack = json_packer
235 self.unpack = json_unpacker
235 self.unpack = json_unpacker
236 elif new.lower() == 'pickle':
236 elif new.lower() == 'pickle':
237 self.pack = pickle_packer
237 self.pack = pickle_packer
238 self.unpack = pickle_unpacker
238 self.unpack = pickle_unpacker
239 else:
239 else:
240 self.unpack = import_item(str(new))
240 self.unpack = import_item(str(new))
241
241
242 session = CBytes(b'', config=True,
242 session = CBytes(b'', config=True,
243 help="""The UUID identifying this session.""")
243 help="""The UUID identifying this session.""")
244 def _session_default(self):
244 def _session_default(self):
245 return bytes(uuid.uuid4())
245 return bytes(uuid.uuid4())
246
246
247 username = Unicode(os.environ.get('USER','username'), config=True,
247 username = Unicode(os.environ.get('USER','username'), config=True,
248 help="""Username for the Session. Default is your system username.""")
248 help="""Username for the Session. Default is your system username.""")
249
249
250 # message signature related traits:
250 # message signature related traits:
251 key = CBytes(b'', config=True,
251 key = CBytes(b'', config=True,
252 help="""execution key, for extra authentication.""")
252 help="""execution key, for extra authentication.""")
253 def _key_changed(self, name, old, new):
253 def _key_changed(self, name, old, new):
254 if new:
254 if new:
255 self.auth = hmac.HMAC(new)
255 self.auth = hmac.HMAC(new)
256 else:
256 else:
257 self.auth = None
257 self.auth = None
258 auth = Instance(hmac.HMAC)
258 auth = Instance(hmac.HMAC)
259 digest_history = Set()
259 digest_history = Set()
260
260
261 keyfile = Unicode('', config=True,
261 keyfile = Unicode('', config=True,
262 help="""path to file containing execution key.""")
262 help="""path to file containing execution key.""")
263 def _keyfile_changed(self, name, old, new):
263 def _keyfile_changed(self, name, old, new):
264 with open(new, 'rb') as f:
264 with open(new, 'rb') as f:
265 self.key = f.read().strip()
265 self.key = f.read().strip()
266
266
267 pack = Any(default_packer) # the actual packer function
267 pack = Any(default_packer) # the actual packer function
268 def _pack_changed(self, name, old, new):
268 def _pack_changed(self, name, old, new):
269 if not callable(new):
269 if not callable(new):
270 raise TypeError("packer must be callable, not %s"%type(new))
270 raise TypeError("packer must be callable, not %s"%type(new))
271
271
272 unpack = Any(default_unpacker) # the actual packer function
272 unpack = Any(default_unpacker) # the actual packer function
273 def _unpack_changed(self, name, old, new):
273 def _unpack_changed(self, name, old, new):
274 # unpacker is not checked - it is assumed to be
274 # unpacker is not checked - it is assumed to be
275 if not callable(new):
275 if not callable(new):
276 raise TypeError("unpacker must be callable, not %s"%type(new))
276 raise TypeError("unpacker must be callable, not %s"%type(new))
277
277
278 def __init__(self, **kwargs):
278 def __init__(self, **kwargs):
279 """create a Session object
279 """create a Session object
280
280
281 Parameters
281 Parameters
282 ----------
282 ----------
283
283
284 debug : bool
284 debug : bool
285 whether to trigger extra debugging statements
285 whether to trigger extra debugging statements
286 packer/unpacker : str : 'json', 'pickle' or import_string
286 packer/unpacker : str : 'json', 'pickle' or import_string
287 importstrings for methods to serialize message parts. If just
287 importstrings for methods to serialize message parts. If just
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
289 Otherwise, the entire importstring must be used.
289 Otherwise, the entire importstring must be used.
290
290
291 The functions must accept at least valid JSON input, and output
291 The functions must accept at least valid JSON input, and output
292 *bytes*.
292 *bytes*.
293
293
294 For example, to use msgpack:
294 For example, to use msgpack:
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
296 pack/unpack : callables
296 pack/unpack : callables
297 You can also set the pack/unpack callables for serialization
297 You can also set the pack/unpack callables for serialization
298 directly.
298 directly.
299 session : bytes
299 session : bytes
300 the ID of this Session object. The default is to generate a new
300 the ID of this Session object. The default is to generate a new
301 UUID.
301 UUID.
302 username : unicode
302 username : unicode
303 username added to message headers. The default is to ask the OS.
303 username added to message headers. The default is to ask the OS.
304 key : bytes
304 key : bytes
305 The key used to initialize an HMAC signature. If unset, messages
305 The key used to initialize an HMAC signature. If unset, messages
306 will not be signed or checked.
306 will not be signed or checked.
307 keyfile : filepath
307 keyfile : filepath
308 The file containing a key. If this is set, `key` will be
308 The file containing a key. If this is set, `key` will be
309 initialized to the contents of the file.
309 initialized to the contents of the file.
310 """
310 """
311 super(Session, self).__init__(**kwargs)
311 super(Session, self).__init__(**kwargs)
312 self._check_packers()
312 self._check_packers()
313 self.none = self.pack({})
313 self.none = self.pack({})
314
314
315 @property
315 @property
316 def msg_id(self):
316 def msg_id(self):
317 """always return new uuid"""
317 """always return new uuid"""
318 return str(uuid.uuid4())
318 return str(uuid.uuid4())
319
319
320 def _check_packers(self):
320 def _check_packers(self):
321 """check packers for binary data and datetime support."""
321 """check packers for binary data and datetime support."""
322 pack = self.pack
322 pack = self.pack
323 unpack = self.unpack
323 unpack = self.unpack
324
324
325 # check simple serialization
325 # check simple serialization
326 msg = dict(a=[1,'hi'])
326 msg = dict(a=[1,'hi'])
327 try:
327 try:
328 packed = pack(msg)
328 packed = pack(msg)
329 except Exception:
329 except Exception:
330 raise ValueError("packer could not serialize a simple message")
330 raise ValueError("packer could not serialize a simple message")
331
331
332 # ensure packed message is bytes
332 # ensure packed message is bytes
333 if not isinstance(packed, bytes):
333 if not isinstance(packed, bytes):
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
335
335
336 # check that unpack is pack's inverse
336 # check that unpack is pack's inverse
337 try:
337 try:
338 unpacked = unpack(packed)
338 unpacked = unpack(packed)
339 except Exception:
339 except Exception:
340 raise ValueError("unpacker could not handle the packer's output")
340 raise ValueError("unpacker could not handle the packer's output")
341
341
342 # check datetime support
342 # check datetime support
343 msg = dict(t=datetime.now())
343 msg = dict(t=datetime.now())
344 try:
344 try:
345 unpacked = unpack(pack(msg))
345 unpacked = unpack(pack(msg))
346 except Exception:
346 except Exception:
347 self.pack = lambda o: pack(squash_dates(o))
347 self.pack = lambda o: pack(squash_dates(o))
348 self.unpack = lambda s: extract_dates(unpack(s))
348 self.unpack = lambda s: extract_dates(unpack(s))
349
349
350 def msg_header(self, msg_type):
350 def msg_header(self, msg_type):
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
352
352
353 def msg(self, msg_type, content=None, parent=None, subheader=None):
353 def msg(self, msg_type, content=None, parent=None, subheader=None):
354 """Return the nested message dict.
355
356 This format is different from what is sent over the wire. The
357 self.serialize method converts this nested message dict to the wire
358 format, which uses a message list.
359 """
354 msg = {}
360 msg = {}
355 msg['header'] = self.msg_header(msg_type)
361 msg['header'] = self.msg_header(msg_type)
356 msg['msg_id'] = msg['header']['msg_id']
362 msg['msg_id'] = msg['header']['msg_id']
357 msg['parent_header'] = {} if parent is None else extract_header(parent)
363 msg['parent_header'] = {} if parent is None else extract_header(parent)
358 msg['msg_type'] = msg_type
364 msg['msg_type'] = msg_type
359 msg['content'] = {} if content is None else content
365 msg['content'] = {} if content is None else content
360 sub = {} if subheader is None else subheader
366 sub = {} if subheader is None else subheader
361 msg['header'].update(sub)
367 msg['header'].update(sub)
362 return msg
368 return msg
363
369
364 def sign(self, msg):
370 def sign(self, msg_list):
365 """Sign a message with HMAC digest. If no auth, return b''."""
371 """Sign a message with HMAC digest. If no auth, return b''.
372
373 Parameters
374 ----------
375 msg_list : list
376 The [p_header,p_parent,p_content] part of the message list.
377 """
366 if self.auth is None:
378 if self.auth is None:
367 return b''
379 return b''
368 h = self.auth.copy()
380 h = self.auth.copy()
369 for m in msg:
381 for m in msg_list:
370 h.update(m)
382 h.update(m)
371 return h.hexdigest()
383 return h.hexdigest()
372
384
373 def serialize(self, msg, ident=None):
385 def serialize(self, msg, ident=None):
374 """Serialize the message components to bytes.
386 """Serialize the message components to bytes.
375
387
388 Parameters
389 ----------
390 msg : dict or Message
391 The nexted message dict as returned by the self.msg method.
392
376 Returns
393 Returns
377 -------
394 -------
378
395 msg_list : list
379 list of bytes objects
396 The list of bytes objects to be sent with the format:
380
397 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
398 buffer1,buffer2,...]. In this list, the p_* entities are
399 the packed or serialized versions, so if JSON is used, these
400 are uft8 encoded JSON strings.
381 """
401 """
382 content = msg.get('content', {})
402 content = msg.get('content', {})
383 if content is None:
403 if content is None:
384 content = self.none
404 content = self.none
385 elif isinstance(content, dict):
405 elif isinstance(content, dict):
386 content = self.pack(content)
406 content = self.pack(content)
387 elif isinstance(content, bytes):
407 elif isinstance(content, bytes):
388 # content is already packed, as in a relayed message
408 # content is already packed, as in a relayed message
389 pass
409 pass
390 elif isinstance(content, unicode):
410 elif isinstance(content, unicode):
391 # should be bytes, but JSON often spits out unicode
411 # should be bytes, but JSON often spits out unicode
392 content = content.encode('utf8')
412 content = content.encode('utf8')
393 else:
413 else:
394 raise TypeError("Content incorrect type: %s"%type(content))
414 raise TypeError("Content incorrect type: %s"%type(content))
395
415
396 real_message = [self.pack(msg['header']),
416 real_message = [self.pack(msg['header']),
397 self.pack(msg['parent_header']),
417 self.pack(msg['parent_header']),
398 content
418 content
399 ]
419 ]
400
420
401 to_send = []
421 to_send = []
402
422
403 if isinstance(ident, list):
423 if isinstance(ident, list):
404 # accept list of idents
424 # accept list of idents
405 to_send.extend(ident)
425 to_send.extend(ident)
406 elif ident is not None:
426 elif ident is not None:
407 to_send.append(ident)
427 to_send.append(ident)
408 to_send.append(DELIM)
428 to_send.append(DELIM)
409
429
410 signature = self.sign(real_message)
430 signature = self.sign(real_message)
411 to_send.append(signature)
431 to_send.append(signature)
412
432
413 to_send.extend(real_message)
433 to_send.extend(real_message)
414
434
415 return to_send
435 return to_send
416
436
417 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
437 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
418 buffers=None, subheader=None, track=False):
438 buffers=None, subheader=None, track=False):
419 """Build and send a message via stream or socket.
439 """Build and send a message via stream or socket.
420
440
441 The message format used by this function internally is as follows:
442
443 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
444 buffer1,buffer2,...]
445
446 The self.serialize method converts the nested message dict into this
447 format.
448
421 Parameters
449 Parameters
422 ----------
450 ----------
423
451
424 stream : zmq.Socket or ZMQStream
452 stream : zmq.Socket or ZMQStream
425 the socket-like object used to send the data
453 the socket-like object used to send the data
426 msg_or_type : str or Message/dict
454 msg_or_type : str or Message/dict
427 Normally, msg_or_type will be a msg_type unless a message is being
455 Normally, msg_or_type will be a msg_type unless a message is being
428 sent more than once.
456 sent more than once.
429
457
430 content : dict or None
458 content : dict or None
431 the content of the message (ignored if msg_or_type is a message)
459 the content of the message (ignored if msg_or_type is a message)
432 parent : Message or dict or None
460 parent : Message or dict or None
433 the parent or parent header describing the parent of this message
461 the parent or parent header describing the parent of this message
434 ident : bytes or list of bytes
462 ident : bytes or list of bytes
435 the zmq.IDENTITY routing path
463 the zmq.IDENTITY routing path
436 subheader : dict or None
464 subheader : dict or None
437 extra header keys for this message's header
465 extra header keys for this message's header
438 buffers : list or None
466 buffers : list or None
439 the already-serialized buffers to be appended to the message
467 the already-serialized buffers to be appended to the message
440 track : bool
468 track : bool
441 whether to track. Only for use with Sockets,
469 whether to track. Only for use with Sockets,
442 because ZMQStream objects cannot track messages.
470 because ZMQStream objects cannot track messages.
443
471
444 Returns
472 Returns
445 -------
473 -------
446 msg : message dict
474 msg : message dict
447 the constructed message
475 the constructed message
448 (msg,tracker) : (message dict, MessageTracker)
476 (msg,tracker) : (message dict, MessageTracker)
449 if track=True, then a 2-tuple will be returned,
477 if track=True, then a 2-tuple will be returned,
450 the first element being the constructed
478 the first element being the constructed
451 message, and the second being the MessageTracker
479 message, and the second being the MessageTracker
452
480
453 """
481 """
454
482
455 if not isinstance(stream, (zmq.Socket, ZMQStream)):
483 if not isinstance(stream, (zmq.Socket, ZMQStream)):
456 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
484 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
457 elif track and isinstance(stream, ZMQStream):
485 elif track and isinstance(stream, ZMQStream):
458 raise TypeError("ZMQStream cannot track messages")
486 raise TypeError("ZMQStream cannot track messages")
459
487
460 if isinstance(msg_or_type, (Message, dict)):
488 if isinstance(msg_or_type, (Message, dict)):
461 # we got a Message, not a msg_type
489 # we got a Message, not a msg_type
462 # don't build a new Message
490 # don't build a new Message
463 msg = msg_or_type
491 msg = msg_or_type
464 else:
492 else:
465 msg = self.msg(msg_or_type, content, parent, subheader)
493 msg = self.msg(msg_or_type, content, parent, subheader)
466
494
467 buffers = [] if buffers is None else buffers
495 buffers = [] if buffers is None else buffers
468 to_send = self.serialize(msg, ident)
496 to_send = self.serialize(msg, ident)
469 flag = 0
497 flag = 0
470 if buffers:
498 if buffers:
471 flag = zmq.SNDMORE
499 flag = zmq.SNDMORE
472 _track = False
500 _track = False
473 else:
501 else:
474 _track=track
502 _track=track
475 if track:
503 if track:
476 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
504 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
477 else:
505 else:
478 tracker = stream.send_multipart(to_send, flag, copy=False)
506 tracker = stream.send_multipart(to_send, flag, copy=False)
479 for b in buffers[:-1]:
507 for b in buffers[:-1]:
480 stream.send(b, flag, copy=False)
508 stream.send(b, flag, copy=False)
481 if buffers:
509 if buffers:
482 if track:
510 if track:
483 tracker = stream.send(buffers[-1], copy=False, track=track)
511 tracker = stream.send(buffers[-1], copy=False, track=track)
484 else:
512 else:
485 tracker = stream.send(buffers[-1], copy=False)
513 tracker = stream.send(buffers[-1], copy=False)
486
514
487 # omsg = Message(msg)
515 # omsg = Message(msg)
488 if self.debug:
516 if self.debug:
489 pprint.pprint(msg)
517 pprint.pprint(msg)
490 pprint.pprint(to_send)
518 pprint.pprint(to_send)
491 pprint.pprint(buffers)
519 pprint.pprint(buffers)
492
520
493 msg['tracker'] = tracker
521 msg['tracker'] = tracker
494
522
495 return msg
523 return msg
496
524
497 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
525 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
498 """Send a raw message via ident path.
526 """Send a raw message via ident path.
499
527
528 This method is used to send a already serialized message.
529
500 Parameters
530 Parameters
501 ----------
531 ----------
502 msg : list of sendable buffers"""
532 stream : ZMQStream or Socket
533 The ZMQ stream or socket to use for sending the message.
534 msg_list : list
535 The serialized list of messages to send. This only includes the
536 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
537 the message.
538 ident : ident or list
539 A single ident or a list of idents to use in sending.
540 """
503 to_send = []
541 to_send = []
504 if isinstance(ident, bytes):
542 if isinstance(ident, bytes):
505 ident = [ident]
543 ident = [ident]
506 if ident is not None:
544 if ident is not None:
507 to_send.extend(ident)
545 to_send.extend(ident)
508
546
509 to_send.append(DELIM)
547 to_send.append(DELIM)
510 to_send.append(self.sign(msg))
548 to_send.append(self.sign(msg_list))
511 to_send.extend(msg)
549 to_send.extend(msg_list)
512 stream.send_multipart(msg, flags, copy=copy)
550 stream.send_multipart(msg_list, flags, copy=copy)
513
551
514 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
552 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
515 """receives and unpacks a message
553 """Receive and unpack a message.
516 returns [idents], msg"""
554
555 Parameters
556 ----------
557 socket : ZMQStream or Socket
558 The socket or stream to use in receiving.
559
560 Returns
561 -------
562 [idents], msg
563 [idents] is a list of idents and msg is a nested message dict of
564 same format as self.msg returns.
565 """
517 if isinstance(socket, ZMQStream):
566 if isinstance(socket, ZMQStream):
518 socket = socket.socket
567 socket = socket.socket
519 try:
568 try:
520 msg = socket.recv_multipart(mode)
569 msg_list = socket.recv_multipart(mode)
521 except zmq.ZMQError as e:
570 except zmq.ZMQError as e:
522 if e.errno == zmq.EAGAIN:
571 if e.errno == zmq.EAGAIN:
523 # We can convert EAGAIN to None as we know in this case
572 # We can convert EAGAIN to None as we know in this case
524 # recv_multipart won't return None.
573 # recv_multipart won't return None.
525 return None,None
574 return None,None
526 else:
575 else:
527 raise
576 raise
528 # split multipart message into identity list and message dict
577 # split multipart message into identity list and message dict
529 # invalid large messages can cause very expensive string comparisons
578 # invalid large messages can cause very expensive string comparisons
530 idents, msg = self.feed_identities(msg, copy)
579 idents, msg_list = self.feed_identities(msg_list, copy)
531 try:
580 try:
532 return idents, self.unpack_message(msg, content=content, copy=copy)
581 return idents, self.unpack_message(msg_list, content=content, copy=copy)
533 except Exception as e:
582 except Exception as e:
534 print (idents, msg)
583 print (idents, msg_list)
535 # TODO: handle it
584 # TODO: handle it
536 raise e
585 raise e
537
586
538 def feed_identities(self, msg, copy=True):
587 def feed_identities(self, msg_list, copy=True):
539 """feed until DELIM is reached, then return the prefix as idents and
588 """Split the identities from the rest of the message.
540 remainder as msg. This is easily broken by setting an IDENT to DELIM,
589
590 Feed until DELIM is reached, then return the prefix as idents and
591 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
541 but that would be silly.
592 but that would be silly.
542
593
543 Parameters
594 Parameters
544 ----------
595 ----------
545 msg : a list of Message or bytes objects
596 msg_list : a list of Message or bytes objects
546 the message to be split
597 The message to be split.
547 copy : bool
598 copy : bool
548 flag determining whether the arguments are bytes or Messages
599 flag determining whether the arguments are bytes or Messages
549
600
550 Returns
601 Returns
551 -------
602 -------
552 (idents,msg) : two lists
603 (idents,msg_list) : two lists
553 idents will always be a list of bytes - the indentity prefix
604 idents will always be a list of bytes - the indentity prefix
554 msg will be a list of bytes or Messages, unchanged from input
605 msg_list will be a list of bytes or Messages, unchanged from input
555 msg should be unpackable via self.unpack_message at this point.
606 msg_list should be unpackable via self.unpack_message at this point.
556 """
607 """
557 if copy:
608 if copy:
558 idx = msg.index(DELIM)
609 idx = msg_list.index(DELIM)
559 return msg[:idx], msg[idx+1:]
610 return msg_list[:idx], msg_list[idx+1:]
560 else:
611 else:
561 failed = True
612 failed = True
562 for idx,m in enumerate(msg):
613 for idx,m in enumerate(msg_list):
563 if m.bytes == DELIM:
614 if m.bytes == DELIM:
564 failed = False
615 failed = False
565 break
616 break
566 if failed:
617 if failed:
567 raise ValueError("DELIM not in msg")
618 raise ValueError("DELIM not in msg_list")
568 idents, msg = msg[:idx], msg[idx+1:]
619 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
569 return [m.bytes for m in idents], msg
620 return [m.bytes for m in idents], msg_list
570
621
571 def unpack_message(self, msg, content=True, copy=True):
622 def unpack_message(self, msg_list, content=True, copy=True):
572 """Return a message object from the format
623 """Return a message object from the format
573 sent by self.send.
624 sent by self.send.
574
625
575 Parameters:
626 Parameters:
576 -----------
627 -----------
577
628
578 content : bool (True)
629 content : bool (True)
579 whether to unpack the content dict (True),
630 whether to unpack the content dict (True),
580 or leave it serialized (False)
631 or leave it serialized (False)
581
632
582 copy : bool (True)
633 copy : bool (True)
583 whether to return the bytes (True),
634 whether to return the bytes (True),
584 or the non-copying Message object in each place (False)
635 or the non-copying Message object in each place (False)
585
636
586 """
637 """
587 minlen = 4
638 minlen = 4
588 message = {}
639 message = {}
589 if not copy:
640 if not copy:
590 for i in range(minlen):
641 for i in range(minlen):
591 msg[i] = msg[i].bytes
642 msg_list[i] = msg_list[i].bytes
592 if self.auth is not None:
643 if self.auth is not None:
593 signature = msg[0]
644 signature = msg_list[0]
594 if signature in self.digest_history:
645 if signature in self.digest_history:
595 raise ValueError("Duplicate Signature: %r"%signature)
646 raise ValueError("Duplicate Signature: %r"%signature)
596 self.digest_history.add(signature)
647 self.digest_history.add(signature)
597 check = self.sign(msg[1:4])
648 check = self.sign(msg_list[1:4])
598 if not signature == check:
649 if not signature == check:
599 raise ValueError("Invalid Signature: %r"%signature)
650 raise ValueError("Invalid Signature: %r"%signature)
600 if not len(msg) >= minlen:
651 if not len(msg_list) >= minlen:
601 raise TypeError("malformed message, must have at least %i elements"%minlen)
652 raise TypeError("malformed message, must have at least %i elements"%minlen)
602 message['header'] = self.unpack(msg[1])
653 message['header'] = self.unpack(msg_list[1])
603 message['msg_type'] = message['header']['msg_type']
654 message['msg_type'] = message['header']['msg_type']
604 message['parent_header'] = self.unpack(msg[2])
655 message['parent_header'] = self.unpack(msg_list[2])
605 if content:
656 if content:
606 message['content'] = self.unpack(msg[3])
657 message['content'] = self.unpack(msg_list[3])
607 else:
658 else:
608 message['content'] = msg[3]
659 message['content'] = msg_list[3]
609
660
610 message['buffers'] = msg[4:]
661 message['buffers'] = msg_list[4:]
611 return message
662 return message
612
663
613 def test_msg2obj():
664 def test_msg2obj():
614 am = dict(x=1)
665 am = dict(x=1)
615 ao = Message(am)
666 ao = Message(am)
616 assert ao.x == am['x']
667 assert ao.x == am['x']
617
668
618 am['y'] = dict(z=1)
669 am['y'] = dict(z=1)
619 ao = Message(am)
670 ao = Message(am)
620 assert ao.y.z == am['y']['z']
671 assert ao.y.z == am['y']['z']
621
672
622 k1, k2 = 'y', 'z'
673 k1, k2 = 'y', 'z'
623 assert ao[k1][k2] == am[k1][k2]
674 assert ao[k1][k2] == am[k1][k2]
624
675
625 am2 = dict(ao)
676 am2 = dict(ao)
626 assert am['x'] == am2['x']
677 assert am['x'] == am2['x']
627 assert am['y']['z'] == am2['y']['z']
678 assert am['y']['z'] == am2['y']['z']
628
679
General Comments 0
You need to be logged in to leave comments. Login now