##// END OF EJS Templates
Adding tests for zmq.session.
Brian E. Granger -
Show More
@@ -1,696 +1,696 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Session object for building, serializing, sending, and receiving messages in
2 """Session object for building, serializing, sending, and receiving messages in
3 IPython. The Session object supports serialization, HMAC signatures, and
3 IPython. The Session object supports serialization, HMAC signatures, and
4 metadata on messages.
4 metadata on messages.
5
5
6 Also defined here are utilities for working with Sessions:
6 Also defined here are utilities for working with Sessions:
7 * A SessionFactory to be used as a base class for configurables that work with
7 * A SessionFactory to be used as a base class for configurables that work with
8 Sessions.
8 Sessions.
9 * A Message object for convenience that allows attribute-access to the msg dict.
9 * A Message object for convenience that allows attribute-access to the msg dict.
10
10
11 Authors:
11 Authors:
12
12
13 * Min RK
13 * Min RK
14 * Brian Granger
14 * Brian Granger
15 * Fernando Perez
15 * Fernando Perez
16 """
16 """
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Copyright (C) 2010-2011 The IPython Development Team
18 # Copyright (C) 2010-2011 The IPython Development Team
19 #
19 #
20 # Distributed under the terms of the BSD License. The full license is in
20 # Distributed under the terms of the BSD License. The full license is in
21 # the file COPYING, distributed as part of this software.
21 # the file COPYING, distributed as part of this software.
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Imports
25 # Imports
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 import hmac
28 import hmac
29 import logging
29 import logging
30 import os
30 import os
31 import pprint
31 import pprint
32 import uuid
32 import uuid
33 from datetime import datetime
33 from datetime import datetime
34
34
35 try:
35 try:
36 import cPickle
36 import cPickle
37 pickle = cPickle
37 pickle = cPickle
38 except:
38 except:
39 cPickle = None
39 cPickle = None
40 import pickle
40 import pickle
41
41
42 import zmq
42 import zmq
43 from zmq.utils import jsonapi
43 from zmq.utils import jsonapi
44 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.ioloop import IOLoop
45 from zmq.eventloop.zmqstream import ZMQStream
45 from zmq.eventloop.zmqstream import ZMQStream
46
46
47 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils.importstring import import_item
48 from IPython.utils.importstring import import_item
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 DottedObjectName)
51 DottedObjectName)
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # utility functions
54 # utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 def squash_unicode(obj):
57 def squash_unicode(obj):
58 """coerce unicode back to bytestrings."""
58 """coerce unicode back to bytestrings."""
59 if isinstance(obj,dict):
59 if isinstance(obj,dict):
60 for key in obj.keys():
60 for key in obj.keys():
61 obj[key] = squash_unicode(obj[key])
61 obj[key] = squash_unicode(obj[key])
62 if isinstance(key, unicode):
62 if isinstance(key, unicode):
63 obj[squash_unicode(key)] = obj.pop(key)
63 obj[squash_unicode(key)] = obj.pop(key)
64 elif isinstance(obj, list):
64 elif isinstance(obj, list):
65 for i,v in enumerate(obj):
65 for i,v in enumerate(obj):
66 obj[i] = squash_unicode(v)
66 obj[i] = squash_unicode(v)
67 elif isinstance(obj, unicode):
67 elif isinstance(obj, unicode):
68 obj = obj.encode('utf8')
68 obj = obj.encode('utf8')
69 return obj
69 return obj
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # globals and defaults
72 # globals and defaults
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
77
77
78 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_packer = lambda o: pickle.dumps(o,-1)
79 pickle_unpacker = pickle.loads
79 pickle_unpacker = pickle.loads
80
80
81 default_packer = json_packer
81 default_packer = json_packer
82 default_unpacker = json_unpacker
82 default_unpacker = json_unpacker
83
83
84
84
85 DELIM=b"<IDS|MSG>"
85 DELIM=b"<IDS|MSG>"
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # Classes
88 # Classes
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91 class SessionFactory(LoggingConfigurable):
91 class SessionFactory(LoggingConfigurable):
92 """The Base class for configurables that have a Session, Context, logger,
92 """The Base class for configurables that have a Session, Context, logger,
93 and IOLoop.
93 and IOLoop.
94 """
94 """
95
95
96 logname = Unicode('')
96 logname = Unicode('')
97 def _logname_changed(self, name, old, new):
97 def _logname_changed(self, name, old, new):
98 self.log = logging.getLogger(new)
98 self.log = logging.getLogger(new)
99
99
100 # not configurable:
100 # not configurable:
101 context = Instance('zmq.Context')
101 context = Instance('zmq.Context')
102 def _context_default(self):
102 def _context_default(self):
103 return zmq.Context.instance()
103 return zmq.Context.instance()
104
104
105 session = Instance('IPython.zmq.session.Session')
105 session = Instance('IPython.zmq.session.Session')
106
106
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
108 def _loop_default(self):
108 def _loop_default(self):
109 return IOLoop.instance()
109 return IOLoop.instance()
110
110
111 def __init__(self, **kwargs):
111 def __init__(self, **kwargs):
112 super(SessionFactory, self).__init__(**kwargs)
112 super(SessionFactory, self).__init__(**kwargs)
113
113
114 if self.session is None:
114 if self.session is None:
115 # construct the session
115 # construct the session
116 self.session = Session(**kwargs)
116 self.session = Session(**kwargs)
117
117
118
118
119 class Message(object):
119 class Message(object):
120 """A simple message object that maps dict keys to attributes.
120 """A simple message object that maps dict keys to attributes.
121
121
122 A Message can be created from a dict and a dict from a Message instance
122 A Message can be created from a dict and a dict from a Message instance
123 simply by calling dict(msg_obj)."""
123 simply by calling dict(msg_obj)."""
124
124
125 def __init__(self, msg_dict):
125 def __init__(self, msg_dict):
126 dct = self.__dict__
126 dct = self.__dict__
127 for k, v in dict(msg_dict).iteritems():
127 for k, v in dict(msg_dict).iteritems():
128 if isinstance(v, dict):
128 if isinstance(v, dict):
129 v = Message(v)
129 v = Message(v)
130 dct[k] = v
130 dct[k] = v
131
131
132 # Having this iterator lets dict(msg_obj) work out of the box.
132 # Having this iterator lets dict(msg_obj) work out of the box.
133 def __iter__(self):
133 def __iter__(self):
134 return iter(self.__dict__.iteritems())
134 return iter(self.__dict__.iteritems())
135
135
136 def __repr__(self):
136 def __repr__(self):
137 return repr(self.__dict__)
137 return repr(self.__dict__)
138
138
139 def __str__(self):
139 def __str__(self):
140 return pprint.pformat(self.__dict__)
140 return pprint.pformat(self.__dict__)
141
141
142 def __contains__(self, k):
142 def __contains__(self, k):
143 return k in self.__dict__
143 return k in self.__dict__
144
144
145 def __getitem__(self, k):
145 def __getitem__(self, k):
146 return self.__dict__[k]
146 return self.__dict__[k]
147
147
148
148
149 def msg_header(msg_id, msg_type, username, session):
149 def msg_header(msg_id, msg_type, username, session):
150 date = datetime.now()
150 date = datetime.now()
151 return locals()
151 return locals()
152
152
153 def extract_header(msg_or_header):
153 def extract_header(msg_or_header):
154 """Given a message or header, return the header."""
154 """Given a message or header, return the header."""
155 if not msg_or_header:
155 if not msg_or_header:
156 return {}
156 return {}
157 try:
157 try:
158 # See if msg_or_header is the entire message.
158 # See if msg_or_header is the entire message.
159 h = msg_or_header['header']
159 h = msg_or_header['header']
160 except KeyError:
160 except KeyError:
161 try:
161 try:
162 # See if msg_or_header is just the header
162 # See if msg_or_header is just the header
163 h = msg_or_header['msg_id']
163 h = msg_or_header['msg_id']
164 except KeyError:
164 except KeyError:
165 raise
165 raise
166 else:
166 else:
167 h = msg_or_header
167 h = msg_or_header
168 if not isinstance(h, dict):
168 if not isinstance(h, dict):
169 h = dict(h)
169 h = dict(h)
170 return h
170 return h
171
171
172 class Session(Configurable):
172 class Session(Configurable):
173 """Object for handling serialization and sending of messages.
173 """Object for handling serialization and sending of messages.
174
174
175 The Session object handles building messages and sending them
175 The Session object handles building messages and sending them
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
177 other over the network via Session objects, and only need to work with the
177 other over the network via Session objects, and only need to work with the
178 dict-based IPython message spec. The Session will handle
178 dict-based IPython message spec. The Session will handle
179 serialization/deserialization, security, and metadata.
179 serialization/deserialization, security, and metadata.
180
180
181 Sessions support configurable serialiization via packer/unpacker traits,
181 Sessions support configurable serialiization via packer/unpacker traits,
182 and signing with HMAC digests via the key/keyfile traits.
182 and signing with HMAC digests via the key/keyfile traits.
183
183
184 Parameters
184 Parameters
185 ----------
185 ----------
186
186
187 debug : bool
187 debug : bool
188 whether to trigger extra debugging statements
188 whether to trigger extra debugging statements
189 packer/unpacker : str : 'json', 'pickle' or import_string
189 packer/unpacker : str : 'json', 'pickle' or import_string
190 importstrings for methods to serialize message parts. If just
190 importstrings for methods to serialize message parts. If just
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
192 Otherwise, the entire importstring must be used.
192 Otherwise, the entire importstring must be used.
193
193
194 The functions must accept at least valid JSON input, and output *bytes*.
194 The functions must accept at least valid JSON input, and output *bytes*.
195
195
196 For example, to use msgpack:
196 For example, to use msgpack:
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
198 pack/unpack : callables
198 pack/unpack : callables
199 You can also set the pack/unpack callables for serialization directly.
199 You can also set the pack/unpack callables for serialization directly.
200 session : bytes
200 session : bytes
201 the ID of this Session object. The default is to generate a new UUID.
201 the ID of this Session object. The default is to generate a new UUID.
202 username : unicode
202 username : unicode
203 username added to message headers. The default is to ask the OS.
203 username added to message headers. The default is to ask the OS.
204 key : bytes
204 key : bytes
205 The key used to initialize an HMAC signature. If unset, messages
205 The key used to initialize an HMAC signature. If unset, messages
206 will not be signed or checked.
206 will not be signed or checked.
207 keyfile : filepath
207 keyfile : filepath
208 The file containing a key. If this is set, `key` will be initialized
208 The file containing a key. If this is set, `key` will be initialized
209 to the contents of the file.
209 to the contents of the file.
210
210
211 """
211 """
212
212
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
214
214
215 packer = DottedObjectName('json',config=True,
215 packer = DottedObjectName('json',config=True,
216 help="""The name of the packer for serializing messages.
216 help="""The name of the packer for serializing messages.
217 Should be one of 'json', 'pickle', or an import name
217 Should be one of 'json', 'pickle', or an import name
218 for a custom callable serializer.""")
218 for a custom callable serializer.""")
219 def _packer_changed(self, name, old, new):
219 def _packer_changed(self, name, old, new):
220 if new.lower() == 'json':
220 if new.lower() == 'json':
221 self.pack = json_packer
221 self.pack = json_packer
222 self.unpack = json_unpacker
222 self.unpack = json_unpacker
223 elif new.lower() == 'pickle':
223 elif new.lower() == 'pickle':
224 self.pack = pickle_packer
224 self.pack = pickle_packer
225 self.unpack = pickle_unpacker
225 self.unpack = pickle_unpacker
226 else:
226 else:
227 self.pack = import_item(str(new))
227 self.pack = import_item(str(new))
228
228
229 unpacker = DottedObjectName('json', config=True,
229 unpacker = DottedObjectName('json', config=True,
230 help="""The name of the unpacker for unserializing messages.
230 help="""The name of the unpacker for unserializing messages.
231 Only used with custom functions for `packer`.""")
231 Only used with custom functions for `packer`.""")
232 def _unpacker_changed(self, name, old, new):
232 def _unpacker_changed(self, name, old, new):
233 if new.lower() == 'json':
233 if new.lower() == 'json':
234 self.pack = json_packer
234 self.pack = json_packer
235 self.unpack = json_unpacker
235 self.unpack = json_unpacker
236 elif new.lower() == 'pickle':
236 elif new.lower() == 'pickle':
237 self.pack = pickle_packer
237 self.pack = pickle_packer
238 self.unpack = pickle_unpacker
238 self.unpack = pickle_unpacker
239 else:
239 else:
240 self.unpack = import_item(str(new))
240 self.unpack = import_item(str(new))
241
241
242 session = CBytes(b'', config=True,
242 session = CBytes(b'', config=True,
243 help="""The UUID identifying this session.""")
243 help="""The UUID identifying this session.""")
244 def _session_default(self):
244 def _session_default(self):
245 return bytes(uuid.uuid4())
245 return bytes(uuid.uuid4())
246
246
247 username = Unicode(os.environ.get('USER','username'), config=True,
247 username = Unicode(os.environ.get('USER','username'), config=True,
248 help="""Username for the Session. Default is your system username.""")
248 help="""Username for the Session. Default is your system username.""")
249
249
250 # message signature related traits:
250 # message signature related traits:
251 key = CBytes(b'', config=True,
251 key = CBytes(b'', config=True,
252 help="""execution key, for extra authentication.""")
252 help="""execution key, for extra authentication.""")
253 def _key_changed(self, name, old, new):
253 def _key_changed(self, name, old, new):
254 if new:
254 if new:
255 self.auth = hmac.HMAC(new)
255 self.auth = hmac.HMAC(new)
256 else:
256 else:
257 self.auth = None
257 self.auth = None
258 auth = Instance(hmac.HMAC)
258 auth = Instance(hmac.HMAC)
259 digest_history = Set()
259 digest_history = Set()
260
260
261 keyfile = Unicode('', config=True,
261 keyfile = Unicode('', config=True,
262 help="""path to file containing execution key.""")
262 help="""path to file containing execution key.""")
263 def _keyfile_changed(self, name, old, new):
263 def _keyfile_changed(self, name, old, new):
264 with open(new, 'rb') as f:
264 with open(new, 'rb') as f:
265 self.key = f.read().strip()
265 self.key = f.read().strip()
266
266
267 pack = Any(default_packer) # the actual packer function
267 pack = Any(default_packer) # the actual packer function
268 def _pack_changed(self, name, old, new):
268 def _pack_changed(self, name, old, new):
269 if not callable(new):
269 if not callable(new):
270 raise TypeError("packer must be callable, not %s"%type(new))
270 raise TypeError("packer must be callable, not %s"%type(new))
271
271
272 unpack = Any(default_unpacker) # the actual packer function
272 unpack = Any(default_unpacker) # the actual packer function
273 def _unpack_changed(self, name, old, new):
273 def _unpack_changed(self, name, old, new):
274 # unpacker is not checked - it is assumed to be
274 # unpacker is not checked - it is assumed to be
275 if not callable(new):
275 if not callable(new):
276 raise TypeError("unpacker must be callable, not %s"%type(new))
276 raise TypeError("unpacker must be callable, not %s"%type(new))
277
277
278 def __init__(self, **kwargs):
278 def __init__(self, **kwargs):
279 """create a Session object
279 """create a Session object
280
280
281 Parameters
281 Parameters
282 ----------
282 ----------
283
283
284 debug : bool
284 debug : bool
285 whether to trigger extra debugging statements
285 whether to trigger extra debugging statements
286 packer/unpacker : str : 'json', 'pickle' or import_string
286 packer/unpacker : str : 'json', 'pickle' or import_string
287 importstrings for methods to serialize message parts. If just
287 importstrings for methods to serialize message parts. If just
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
289 Otherwise, the entire importstring must be used.
289 Otherwise, the entire importstring must be used.
290
290
291 The functions must accept at least valid JSON input, and output
291 The functions must accept at least valid JSON input, and output
292 *bytes*.
292 *bytes*.
293
293
294 For example, to use msgpack:
294 For example, to use msgpack:
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
296 pack/unpack : callables
296 pack/unpack : callables
297 You can also set the pack/unpack callables for serialization
297 You can also set the pack/unpack callables for serialization
298 directly.
298 directly.
299 session : bytes
299 session : bytes
300 the ID of this Session object. The default is to generate a new
300 the ID of this Session object. The default is to generate a new
301 UUID.
301 UUID.
302 username : unicode
302 username : unicode
303 username added to message headers. The default is to ask the OS.
303 username added to message headers. The default is to ask the OS.
304 key : bytes
304 key : bytes
305 The key used to initialize an HMAC signature. If unset, messages
305 The key used to initialize an HMAC signature. If unset, messages
306 will not be signed or checked.
306 will not be signed or checked.
307 keyfile : filepath
307 keyfile : filepath
308 The file containing a key. If this is set, `key` will be
308 The file containing a key. If this is set, `key` will be
309 initialized to the contents of the file.
309 initialized to the contents of the file.
310 """
310 """
311 super(Session, self).__init__(**kwargs)
311 super(Session, self).__init__(**kwargs)
312 self._check_packers()
312 self._check_packers()
313 self.none = self.pack({})
313 self.none = self.pack({})
314
314
315 @property
315 @property
316 def msg_id(self):
316 def msg_id(self):
317 """always return new uuid"""
317 """always return new uuid"""
318 return str(uuid.uuid4())
318 return str(uuid.uuid4())
319
319
320 def _check_packers(self):
320 def _check_packers(self):
321 """check packers for binary data and datetime support."""
321 """check packers for binary data and datetime support."""
322 pack = self.pack
322 pack = self.pack
323 unpack = self.unpack
323 unpack = self.unpack
324
324
325 # check simple serialization
325 # check simple serialization
326 msg = dict(a=[1,'hi'])
326 msg = dict(a=[1,'hi'])
327 try:
327 try:
328 packed = pack(msg)
328 packed = pack(msg)
329 except Exception:
329 except Exception:
330 raise ValueError("packer could not serialize a simple message")
330 raise ValueError("packer could not serialize a simple message")
331
331
332 # ensure packed message is bytes
332 # ensure packed message is bytes
333 if not isinstance(packed, bytes):
333 if not isinstance(packed, bytes):
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
335
335
336 # check that unpack is pack's inverse
336 # check that unpack is pack's inverse
337 try:
337 try:
338 unpacked = unpack(packed)
338 unpacked = unpack(packed)
339 except Exception:
339 except Exception:
340 raise ValueError("unpacker could not handle the packer's output")
340 raise ValueError("unpacker could not handle the packer's output")
341
341
342 # check datetime support
342 # check datetime support
343 msg = dict(t=datetime.now())
343 msg = dict(t=datetime.now())
344 try:
344 try:
345 unpacked = unpack(pack(msg))
345 unpacked = unpack(pack(msg))
346 except Exception:
346 except Exception:
347 self.pack = lambda o: pack(squash_dates(o))
347 self.pack = lambda o: pack(squash_dates(o))
348 self.unpack = lambda s: extract_dates(unpack(s))
348 self.unpack = lambda s: extract_dates(unpack(s))
349
349
350 def msg_header(self, msg_type):
350 def msg_header(self, msg_type):
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
352
352
353 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
353 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
354 """Return the nested message dict.
354 """Return the nested message dict.
355
355
356 This format is different from what is sent over the wire. The
356 This format is different from what is sent over the wire. The
357 serialize/unserialize methods converts this nested message dict to the wire
357 serialize/unserialize methods converts this nested message dict to the wire
358 format, which is a list of message parts.
358 format, which is a list of message parts.
359 """
359 """
360 msg = {}
360 msg = {}
361 msg['header'] = self.msg_header(msg_type) if header is None else header
361 msg['header'] = self.msg_header(msg_type) if header is None else header
362 msg['parent_header'] = {} if parent is None else extract_header(parent)
362 msg['parent_header'] = {} if parent is None else extract_header(parent)
363 msg['content'] = {} if content is None else content
363 msg['content'] = {} if content is None else content
364 sub = {} if subheader is None else subheader
364 sub = {} if subheader is None else subheader
365 msg['header'].update(sub)
365 msg['header'].update(sub)
366 return msg
366 return msg
367
367
368 def sign(self, msg_list):
368 def sign(self, msg_list):
369 """Sign a message with HMAC digest. If no auth, return b''.
369 """Sign a message with HMAC digest. If no auth, return b''.
370
370
371 Parameters
371 Parameters
372 ----------
372 ----------
373 msg_list : list
373 msg_list : list
374 The [p_header,p_parent,p_content] part of the message list.
374 The [p_header,p_parent,p_content] part of the message list.
375 """
375 """
376 if self.auth is None:
376 if self.auth is None:
377 return b''
377 return b''
378 h = self.auth.copy()
378 h = self.auth.copy()
379 for m in msg_list:
379 for m in msg_list:
380 h.update(m)
380 h.update(m)
381 return h.hexdigest()
381 return h.hexdigest()
382
382
383 def serialize(self, msg, ident=None):
383 def serialize(self, msg, ident=None):
384 """Serialize the message components to bytes.
384 """Serialize the message components to bytes.
385
385
386 This is roughly the inverse of unserialize. The serialize/unserialize
386 This is roughly the inverse of unserialize. The serialize/unserialize
387 methods work with full message lists, whereas pack/unpack work with
387 methods work with full message lists, whereas pack/unpack work with
388 the individual message parts in the message list.
388 the individual message parts in the message list.
389
389
390 Parameters
390 Parameters
391 ----------
391 ----------
392 msg : dict or Message
392 msg : dict or Message
393 The nexted message dict as returned by the self.msg method.
393 The nexted message dict as returned by the self.msg method.
394
394
395 Returns
395 Returns
396 -------
396 -------
397 msg_list : list
397 msg_list : list
398 The list of bytes objects to be sent with the format:
398 The list of bytes objects to be sent with the format:
399 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
399 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
400 buffer1,buffer2,...]. In this list, the p_* entities are
400 buffer1,buffer2,...]. In this list, the p_* entities are
401 the packed or serialized versions, so if JSON is used, these
401 the packed or serialized versions, so if JSON is used, these
402 are uft8 encoded JSON strings.
402 are uft8 encoded JSON strings.
403 """
403 """
404 content = msg.get('content', {})
404 content = msg.get('content', {})
405 if content is None:
405 if content is None:
406 content = self.none
406 content = self.none
407 elif isinstance(content, dict):
407 elif isinstance(content, dict):
408 content = self.pack(content)
408 content = self.pack(content)
409 elif isinstance(content, bytes):
409 elif isinstance(content, bytes):
410 # content is already packed, as in a relayed message
410 # content is already packed, as in a relayed message
411 pass
411 pass
412 elif isinstance(content, unicode):
412 elif isinstance(content, unicode):
413 # should be bytes, but JSON often spits out unicode
413 # should be bytes, but JSON often spits out unicode
414 content = content.encode('utf8')
414 content = content.encode('utf8')
415 else:
415 else:
416 raise TypeError("Content incorrect type: %s"%type(content))
416 raise TypeError("Content incorrect type: %s"%type(content))
417
417
418 real_message = [self.pack(msg['header']),
418 real_message = [self.pack(msg['header']),
419 self.pack(msg['parent_header']),
419 self.pack(msg['parent_header']),
420 content
420 content
421 ]
421 ]
422
422
423 to_send = []
423 to_send = []
424
424
425 if isinstance(ident, list):
425 if isinstance(ident, list):
426 # accept list of idents
426 # accept list of idents
427 to_send.extend(ident)
427 to_send.extend(ident)
428 elif ident is not None:
428 elif ident is not None:
429 to_send.append(ident)
429 to_send.append(ident)
430 to_send.append(DELIM)
430 to_send.append(DELIM)
431
431
432 signature = self.sign(real_message)
432 signature = self.sign(real_message)
433 to_send.append(signature)
433 to_send.append(signature)
434
434
435 to_send.extend(real_message)
435 to_send.extend(real_message)
436
436
437 return to_send
437 return to_send
438
438
439 def send(self, stream, msg_or_type, content=None, parent=None, ident=None
439 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
440 buffers=None, subheader=None, track=False, header=None):
440 buffers=None, subheader=None, track=False, header=None):
441 """Build and send a message via stream or socket.
441 """Build and send a message via stream or socket.
442
442
443 The message format used by this function internally is as follows:
443 The message format used by this function internally is as follows:
444
444
445 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
445 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
446 buffer1,buffer2,...]
446 buffer1,buffer2,...]
447
447
448 The serialize/unserialize methods convert the nested message dict into this
448 The serialize/unserialize methods convert the nested message dict into this
449 format.
449 format.
450
450
451 Parameters
451 Parameters
452 ----------
452 ----------
453
453
454 stream : zmq.Socket or ZMQStream
454 stream : zmq.Socket or ZMQStream
455 The socket-like object used to send the data.
455 The socket-like object used to send the data.
456 msg_or_type : str or Message/dict
456 msg_or_type : str or Message/dict
457 Normally, msg_or_type will be a msg_type unless a message is being
457 Normally, msg_or_type will be a msg_type unless a message is being
458 sent more than once.
458 sent more than once.
459
459
460 content : dict or None
460 content : dict or None
461 The content of the message (ignored if msg_or_type is a message).
461 The content of the message (ignored if msg_or_type is a message).
462 header : dict or None
462 header : dict or None
463 The header dict for the message (ignores if msg_to_type is a message).
463 The header dict for the message (ignores if msg_to_type is a message).
464 parent : Message or dict or None
464 parent : Message or dict or None
465 The parent or parent header describing the parent of this message
465 The parent or parent header describing the parent of this message
466 (ignored if msg_or_type is a message).
466 (ignored if msg_or_type is a message).
467 ident : bytes or list of bytes
467 ident : bytes or list of bytes
468 The zmq.IDENTITY routing path.
468 The zmq.IDENTITY routing path.
469 subheader : dict or None
469 subheader : dict or None
470 Extra header keys for this message's header (ignored if msg_or_type
470 Extra header keys for this message's header (ignored if msg_or_type
471 is a message).
471 is a message).
472 buffers : list or None
472 buffers : list or None
473 The already-serialized buffers to be appended to the message.
473 The already-serialized buffers to be appended to the message.
474 track : bool
474 track : bool
475 Whether to track. Only for use with Sockets, because ZMQStream
475 Whether to track. Only for use with Sockets, because ZMQStream
476 objects cannot track messages.
476 objects cannot track messages.
477
477
478 Returns
478 Returns
479 -------
479 -------
480 msg : dict
480 msg : dict
481 The constructed message.
481 The constructed message.
482 (msg,tracker) : (dict, MessageTracker)
482 (msg,tracker) : (dict, MessageTracker)
483 if track=True, then a 2-tuple will be returned,
483 if track=True, then a 2-tuple will be returned,
484 the first element being the constructed
484 the first element being the constructed
485 message, and the second being the MessageTracker
485 message, and the second being the MessageTracker
486
486
487 """
487 """
488
488
489 if not isinstance(stream, (zmq.Socket, ZMQStream)):
489 if not isinstance(stream, (zmq.Socket, ZMQStream)):
490 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
490 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
491 elif track and isinstance(stream, ZMQStream):
491 elif track and isinstance(stream, ZMQStream):
492 raise TypeError("ZMQStream cannot track messages")
492 raise TypeError("ZMQStream cannot track messages")
493
493
494 if isinstance(msg_or_type, (Message, dict)):
494 if isinstance(msg_or_type, (Message, dict)):
495 # We got a Message or message dict, not a msg_type so don't
495 # We got a Message or message dict, not a msg_type so don't
496 # build a new Message.
496 # build a new Message.
497 msg = msg_or_type
497 msg = msg_or_type
498 else:
498 else:
499 msg = self.msg(msg_or_type, content=content, parent=parent,
499 msg = self.msg(msg_or_type, content=content, parent=parent,
500 subheader=subheader, header=header)
500 subheader=subheader, header=header)
501
501
502 buffers = [] if buffers is None else buffers
502 buffers = [] if buffers is None else buffers
503 to_send = self.serialize(msg, ident)
503 to_send = self.serialize(msg, ident)
504 flag = 0
504 flag = 0
505 if buffers:
505 if buffers:
506 flag = zmq.SNDMORE
506 flag = zmq.SNDMORE
507 _track = False
507 _track = False
508 else:
508 else:
509 _track=track
509 _track=track
510 if track:
510 if track:
511 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
511 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
512 else:
512 else:
513 tracker = stream.send_multipart(to_send, flag, copy=False)
513 tracker = stream.send_multipart(to_send, flag, copy=False)
514 for b in buffers[:-1]:
514 for b in buffers[:-1]:
515 stream.send(b, flag, copy=False)
515 stream.send(b, flag, copy=False)
516 if buffers:
516 if buffers:
517 if track:
517 if track:
518 tracker = stream.send(buffers[-1], copy=False, track=track)
518 tracker = stream.send(buffers[-1], copy=False, track=track)
519 else:
519 else:
520 tracker = stream.send(buffers[-1], copy=False)
520 tracker = stream.send(buffers[-1], copy=False)
521
521
522 # omsg = Message(msg)
522 # omsg = Message(msg)
523 if self.debug:
523 if self.debug:
524 pprint.pprint(msg)
524 pprint.pprint(msg)
525 pprint.pprint(to_send)
525 pprint.pprint(to_send)
526 pprint.pprint(buffers)
526 pprint.pprint(buffers)
527
527
528 msg['tracker'] = tracker
528 msg['tracker'] = tracker
529
529
530 return msg
530 return msg
531
531
532 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
532 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
533 """Send a raw message via ident path.
533 """Send a raw message via ident path.
534
534
535 This method is used to send a already serialized message.
535 This method is used to send a already serialized message.
536
536
537 Parameters
537 Parameters
538 ----------
538 ----------
539 stream : ZMQStream or Socket
539 stream : ZMQStream or Socket
540 The ZMQ stream or socket to use for sending the message.
540 The ZMQ stream or socket to use for sending the message.
541 msg_list : list
541 msg_list : list
542 The serialized list of messages to send. This only includes the
542 The serialized list of messages to send. This only includes the
543 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
543 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
544 the message.
544 the message.
545 ident : ident or list
545 ident : ident or list
546 A single ident or a list of idents to use in sending.
546 A single ident or a list of idents to use in sending.
547 """
547 """
548 to_send = []
548 to_send = []
549 if isinstance(ident, bytes):
549 if isinstance(ident, bytes):
550 ident = [ident]
550 ident = [ident]
551 if ident is not None:
551 if ident is not None:
552 to_send.extend(ident)
552 to_send.extend(ident)
553
553
554 to_send.append(DELIM)
554 to_send.append(DELIM)
555 to_send.append(self.sign(msg_list))
555 to_send.append(self.sign(msg_list))
556 to_send.extend(msg_list)
556 to_send.extend(msg_list)
557 stream.send_multipart(msg_list, flags, copy=copy)
557 stream.send_multipart(msg_list, flags, copy=copy)
558
558
559 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
559 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
560 """Receive and unpack a message.
560 """Receive and unpack a message.
561
561
562 Parameters
562 Parameters
563 ----------
563 ----------
564 socket : ZMQStream or Socket
564 socket : ZMQStream or Socket
565 The socket or stream to use in receiving.
565 The socket or stream to use in receiving.
566
566
567 Returns
567 Returns
568 -------
568 -------
569 [idents], msg
569 [idents], msg
570 [idents] is a list of idents and msg is a nested message dict of
570 [idents] is a list of idents and msg is a nested message dict of
571 same format as self.msg returns.
571 same format as self.msg returns.
572 """
572 """
573 if isinstance(socket, ZMQStream):
573 if isinstance(socket, ZMQStream):
574 socket = socket.socket
574 socket = socket.socket
575 try:
575 try:
576 msg_list = socket.recv_multipart(mode)
576 msg_list = socket.recv_multipart(mode)
577 except zmq.ZMQError as e:
577 except zmq.ZMQError as e:
578 if e.errno == zmq.EAGAIN:
578 if e.errno == zmq.EAGAIN:
579 # We can convert EAGAIN to None as we know in this case
579 # We can convert EAGAIN to None as we know in this case
580 # recv_multipart won't return None.
580 # recv_multipart won't return None.
581 return None,None
581 return None,None
582 else:
582 else:
583 raise
583 raise
584 # split multipart message into identity list and message dict
584 # split multipart message into identity list and message dict
585 # invalid large messages can cause very expensive string comparisons
585 # invalid large messages can cause very expensive string comparisons
586 idents, msg_list = self.feed_identities(msg_list, copy)
586 idents, msg_list = self.feed_identities(msg_list, copy)
587 try:
587 try:
588 return idents, self.unserialize(msg_list, content=content, copy=copy)
588 return idents, self.unserialize(msg_list, content=content, copy=copy)
589 except Exception as e:
589 except Exception as e:
590 print (idents, msg_list)
590 print (idents, msg_list)
591 # TODO: handle it
591 # TODO: handle it
592 raise e
592 raise e
593
593
594 def feed_identities(self, msg_list, copy=True):
594 def feed_identities(self, msg_list, copy=True):
595 """Split the identities from the rest of the message.
595 """Split the identities from the rest of the message.
596
596
597 Feed until DELIM is reached, then return the prefix as idents and
597 Feed until DELIM is reached, then return the prefix as idents and
598 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
598 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
599 but that would be silly.
599 but that would be silly.
600
600
601 Parameters
601 Parameters
602 ----------
602 ----------
603 msg_list : a list of Message or bytes objects
603 msg_list : a list of Message or bytes objects
604 The message to be split.
604 The message to be split.
605 copy : bool
605 copy : bool
606 flag determining whether the arguments are bytes or Messages
606 flag determining whether the arguments are bytes or Messages
607
607
608 Returns
608 Returns
609 -------
609 -------
610 (idents, msg_list) : two lists
610 (idents, msg_list) : two lists
611 idents will always be a list of bytes, each of which is a ZMQ
611 idents will always be a list of bytes, each of which is a ZMQ
612 identity. msg_list will be a list of bytes or zmq.Messages of the
612 identity. msg_list will be a list of bytes or zmq.Messages of the
613 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
613 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
614 should be unpackable/unserializable via self.unserialize at this
614 should be unpackable/unserializable via self.unserialize at this
615 point.
615 point.
616 """
616 """
617 if copy:
617 if copy:
618 idx = msg_list.index(DELIM)
618 idx = msg_list.index(DELIM)
619 return msg_list[:idx], msg_list[idx+1:]
619 return msg_list[:idx], msg_list[idx+1:]
620 else:
620 else:
621 failed = True
621 failed = True
622 for idx,m in enumerate(msg_list):
622 for idx,m in enumerate(msg_list):
623 if m.bytes == DELIM:
623 if m.bytes == DELIM:
624 failed = False
624 failed = False
625 break
625 break
626 if failed:
626 if failed:
627 raise ValueError("DELIM not in msg_list")
627 raise ValueError("DELIM not in msg_list")
628 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
628 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
629 return [m.bytes for m in idents], msg_list
629 return [m.bytes for m in idents], msg_list
630
630
631 def unserialize(self, msg_list, content=True, copy=True):
631 def unserialize(self, msg_list, content=True, copy=True):
632 """Unserialize a msg_list to a nested message dict.
632 """Unserialize a msg_list to a nested message dict.
633
633
634 This is roughly the inverse of serialize. The serialize/unserialize
634 This is roughly the inverse of serialize. The serialize/unserialize
635 methods work with full message lists, whereas pack/unpack work with
635 methods work with full message lists, whereas pack/unpack work with
636 the individual message parts in the message list.
636 the individual message parts in the message list.
637
637
638 Parameters:
638 Parameters:
639 -----------
639 -----------
640 msg_list : list of bytes or Message objects
640 msg_list : list of bytes or Message objects
641 The list of message parts of the form [HMAC,p_header,p_parent,
641 The list of message parts of the form [HMAC,p_header,p_parent,
642 p_content,buffer1,buffer2,...].
642 p_content,buffer1,buffer2,...].
643 content : bool (True)
643 content : bool (True)
644 Whether to unpack the content dict (True), or leave it packed
644 Whether to unpack the content dict (True), or leave it packed
645 (False).
645 (False).
646 copy : bool (True)
646 copy : bool (True)
647 Whether to return the bytes (True), or the non-copying Message
647 Whether to return the bytes (True), or the non-copying Message
648 object in each place (False).
648 object in each place (False).
649
649
650 Returns
650 Returns
651 -------
651 -------
652 msg : dict
652 msg : dict
653 The nested message dict with top-level keys [header, parent_header,
653 The nested message dict with top-level keys [header, parent_header,
654 content, buffers].
654 content, buffers].
655 """
655 """
656 minlen = 4
656 minlen = 4
657 message = {}
657 message = {}
658 if not copy:
658 if not copy:
659 for i in range(minlen):
659 for i in range(minlen):
660 msg_list[i] = msg_list[i].bytes
660 msg_list[i] = msg_list[i].bytes
661 if self.auth is not None:
661 if self.auth is not None:
662 signature = msg_list[0]
662 signature = msg_list[0]
663 if signature in self.digest_history:
663 if signature in self.digest_history:
664 raise ValueError("Duplicate Signature: %r"%signature)
664 raise ValueError("Duplicate Signature: %r"%signature)
665 self.digest_history.add(signature)
665 self.digest_history.add(signature)
666 check = self.sign(msg_list[1:4])
666 check = self.sign(msg_list[1:4])
667 if not signature == check:
667 if not signature == check:
668 raise ValueError("Invalid Signature: %r"%signature)
668 raise ValueError("Invalid Signature: %r"%signature)
669 if not len(msg_list) >= minlen:
669 if not len(msg_list) >= minlen:
670 raise TypeError("malformed message, must have at least %i elements"%minlen)
670 raise TypeError("malformed message, must have at least %i elements"%minlen)
671 message['header'] = self.unpack(msg_list[1])
671 message['header'] = self.unpack(msg_list[1])
672 message['parent_header'] = self.unpack(msg_list[2])
672 message['parent_header'] = self.unpack(msg_list[2])
673 if content:
673 if content:
674 message['content'] = self.unpack(msg_list[3])
674 message['content'] = self.unpack(msg_list[3])
675 else:
675 else:
676 message['content'] = msg_list[3]
676 message['content'] = msg_list[3]
677
677
678 message['buffers'] = msg_list[4:]
678 message['buffers'] = msg_list[4:]
679 return message
679 return message
680
680
681 def test_msg2obj():
681 def test_msg2obj():
682 am = dict(x=1)
682 am = dict(x=1)
683 ao = Message(am)
683 ao = Message(am)
684 assert ao.x == am['x']
684 assert ao.x == am['x']
685
685
686 am['y'] = dict(z=1)
686 am['y'] = dict(z=1)
687 ao = Message(am)
687 ao = Message(am)
688 assert ao.y.z == am['y']['z']
688 assert ao.y.z == am['y']['z']
689
689
690 k1, k2 = 'y', 'z'
690 k1, k2 = 'y', 'z'
691 assert ao[k1][k2] == am[k1][k2]
691 assert ao[k1][k2] == am[k1][k2]
692
692
693 am2 = dict(ao)
693 am2 = dict(ao)
694 assert am['x'] == am2['x']
694 assert am['x'] == am2['x']
695 assert am['y']['z'] == am2['y']['z']
695 assert am['y']['z'] == am2['y']['z']
696
696
@@ -1,109 +1,120 b''
1 """test building messages with streamsession"""
1 """test building messages with streamsession"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import uuid
15 import uuid
16 import zmq
16 import zmq
17
17
18 from zmq.tests import BaseZMQTestCase
18 from zmq.tests import BaseZMQTestCase
19 from zmq.eventloop.zmqstream import ZMQStream
19 from zmq.eventloop.zmqstream import ZMQStream
20
20
21 from IPython.zmq import session as ss
21 from IPython.zmq import session as ss
22
22
23 class SessionTestCase(BaseZMQTestCase):
23 class SessionTestCase(BaseZMQTestCase):
24
24
25 def setUp(self):
25 def setUp(self):
26 BaseZMQTestCase.setUp(self)
26 BaseZMQTestCase.setUp(self)
27 self.session = ss.Session()
27 self.session = ss.Session()
28
28
29 class TestSession(SessionTestCase):
29 class TestSession(SessionTestCase):
30
30
31 def test_msg(self):
31 def test_msg(self):
32 """message format"""
32 """message format"""
33 msg = self.session.msg('execute')
33 msg = self.session.msg('execute')
34 thekeys = set('header msg_id parent_header msg_type content'.split())
34 thekeys = set('header parent_header content'.split())
35 s = set(msg.keys())
35 s = set(msg.keys())
36 self.assertEquals(s, thekeys)
36 self.assertEquals(s, thekeys)
37 self.assertTrue(isinstance(msg['content'],dict))
37 self.assertTrue(isinstance(msg['content'],dict))
38 self.assertTrue(isinstance(msg['header'],dict))
38 self.assertTrue(isinstance(msg['header'],dict))
39 self.assertTrue(isinstance(msg['parent_header'],dict))
39 self.assertTrue(isinstance(msg['parent_header'],dict))
40 self.assertEquals(msg['header']['msg_type'], 'execute')
40 self.assertEquals(msg['header']['msg_type'], 'execute')
41
41
42 def test_serialize(self):
43 msg = self.session.msg('execute')
44 msg_list = self.session.serialize(msg, ident=b'foo')
45 ident, msg_list = self.session.feed_identities(msg_list)
46 new_msg = self.session.unserialize(msg_list)
47 self.assertEquals(ident[0], b'foo')
48 self.assertEquals(new_msg['header'],msg['header'])
49 self.assertEquals(new_msg['content'],msg['content'])
50 self.assertEquals(new_msg['parent_header'],msg['parent_header'])
51
42 def test_args(self):
52 def test_args(self):
43 """initialization arguments for Session"""
53 """initialization arguments for Session"""
44 s = self.session
54 s = self.session
45 self.assertTrue(s.pack is ss.default_packer)
55 self.assertTrue(s.pack is ss.default_packer)
46 self.assertTrue(s.unpack is ss.default_unpacker)
56 self.assertTrue(s.unpack is ss.default_unpacker)
47 self.assertEquals(s.username, os.environ.get('USER', 'username'))
57 self.assertEquals(s.username, os.environ.get('USER', 'username'))
48
58
49 s = ss.Session()
59 s = ss.Session()
50 self.assertEquals(s.username, os.environ.get('USER', 'username'))
60 self.assertEquals(s.username, os.environ.get('USER', 'username'))
51
61
52 self.assertRaises(TypeError, ss.Session, pack='hi')
62 self.assertRaises(TypeError, ss.Session, pack='hi')
53 self.assertRaises(TypeError, ss.Session, unpack='hi')
63 self.assertRaises(TypeError, ss.Session, unpack='hi')
54 u = str(uuid.uuid4())
64 u = str(uuid.uuid4())
55 s = ss.Session(username='carrot', session=u)
65 s = ss.Session(username='carrot', session=u)
56 self.assertEquals(s.session, u)
66 self.assertEquals(s.session, u)
57 self.assertEquals(s.username, 'carrot')
67 self.assertEquals(s.username, 'carrot')
58
68
59 def test_tracking(self):
69 def test_tracking(self):
60 """test tracking messages"""
70 """test tracking messages"""
61 a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
71 a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
62 s = self.session
72 s = self.session
63 stream = ZMQStream(a)
73 stream = ZMQStream(a)
64 msg = s.send(a, 'hello', track=False)
74 msg = s.send(a, 'hello', track=False)
65 self.assertTrue(msg['tracker'] is None)
75 self.assertTrue(msg['tracker'] is None)
66 msg = s.send(a, 'hello', track=True)
76 msg = s.send(a, 'hello', track=True)
67 self.assertTrue(isinstance(msg['tracker'], zmq.MessageTracker))
77 self.assertTrue(isinstance(msg['tracker'], zmq.MessageTracker))
68 M = zmq.Message(b'hi there', track=True)
78 M = zmq.Message(b'hi there', track=True)
69 msg = s.send(a, 'hello', buffers=[M], track=True)
79 msg = s.send(a, 'hello', buffers=[M], track=True)
70 t = msg['tracker']
80 t = msg['tracker']
71 self.assertTrue(isinstance(t, zmq.MessageTracker))
81 self.assertTrue(isinstance(t, zmq.MessageTracker))
72 self.assertRaises(zmq.NotDone, t.wait, .1)
82 self.assertRaises(zmq.NotDone, t.wait, .1)
73 del M
83 del M
74 t.wait(1) # this will raise
84 t.wait(1) # this will raise
75
85
76
86
77 # def test_rekey(self):
87 # def test_rekey(self):
78 # """rekeying dict around json str keys"""
88 # """rekeying dict around json str keys"""
79 # d = {'0': uuid.uuid4(), 0:uuid.uuid4()}
89 # d = {'0': uuid.uuid4(), 0:uuid.uuid4()}
80 # self.assertRaises(KeyError, ss.rekey, d)
90 # self.assertRaises(KeyError, ss.rekey, d)
81 #
91 #
82 # d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()}
92 # d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()}
83 # d2 = {0:d['0'],1:d[1],'asdf':d['asdf']}
93 # d2 = {0:d['0'],1:d[1],'asdf':d['asdf']}
84 # rd = ss.rekey(d)
94 # rd = ss.rekey(d)
85 # self.assertEquals(d2,rd)
95 # self.assertEquals(d2,rd)
86 #
96 #
87 # d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()}
97 # d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()}
88 # d2 = {1.5:d['1.5'],1:d['1']}
98 # d2 = {1.5:d['1.5'],1:d['1']}
89 # rd = ss.rekey(d)
99 # rd = ss.rekey(d)
90 # self.assertEquals(d2,rd)
100 # self.assertEquals(d2,rd)
91 #
101 #
92 # d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()}
102 # d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()}
93 # self.assertRaises(KeyError, ss.rekey, d)
103 # self.assertRaises(KeyError, ss.rekey, d)
94 #
104 #
95 def test_unique_msg_ids(self):
105 def test_unique_msg_ids(self):
96 """test that messages receive unique ids"""
106 """test that messages receive unique ids"""
97 ids = set()
107 ids = set()
98 for i in range(2**12):
108 for i in range(2**12):
99 h = self.session.msg_header('test')
109 h = self.session.msg_header('test')
100 msg_id = h['msg_id']
110 msg_id = h['msg_id']
101 self.assertTrue(msg_id not in ids)
111 self.assertTrue(msg_id not in ids)
102 ids.add(msg_id)
112 ids.add(msg_id)
103
113
104 def test_feed_identities(self):
114 def test_feed_identities(self):
105 """scrub the front for zmq IDENTITIES"""
115 """scrub the front for zmq IDENTITIES"""
106 theids = "engine client other".split()
116 theids = "engine client other".split()
107 content = dict(code='whoda',stuff=object())
117 content = dict(code='whoda',stuff=object())
108 themsg = self.session.msg('execute',content=content)
118 themsg = self.session.msg('execute',content=content)
109 pmsg = theids
119 pmsg = theids
120
General Comments 0
You need to be logged in to leave comments. Login now