##// END OF EJS Templates
Added more docstrings to IPython.zmq.session....
Brian E. Granger -
Show More
@@ -1,628 +1,679 b''
1 1 #!/usr/bin/env python
2 2 """Session object for building, serializing, sending, and receiving messages in
3 3 IPython. The Session object supports serialization, HMAC signatures, and
4 4 metadata on messages.
5 5
6 6 Also defined here are utilities for working with Sessions:
7 7 * A SessionFactory to be used as a base class for configurables that work with
8 8 Sessions.
9 9 * A Message object for convenience that allows attribute-access to the msg dict.
10 10
11 11 Authors:
12 12
13 13 * Min RK
14 14 * Brian Granger
15 15 * Fernando Perez
16 16 """
17 17 #-----------------------------------------------------------------------------
18 18 # Copyright (C) 2010-2011 The IPython Development Team
19 19 #
20 20 # Distributed under the terms of the BSD License. The full license is in
21 21 # the file COPYING, distributed as part of this software.
22 22 #-----------------------------------------------------------------------------
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Imports
26 26 #-----------------------------------------------------------------------------
27 27
28 28 import hmac
29 29 import logging
30 30 import os
31 31 import pprint
32 32 import uuid
33 33 from datetime import datetime
34 34
35 35 try:
36 36 import cPickle
37 37 pickle = cPickle
38 38 except:
39 39 cPickle = None
40 40 import pickle
41 41
42 42 import zmq
43 43 from zmq.utils import jsonapi
44 44 from zmq.eventloop.ioloop import IOLoop
45 45 from zmq.eventloop.zmqstream import ZMQStream
46 46
47 47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 48 from IPython.utils.importstring import import_item
49 49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 51 DottedObjectName)
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # utility functions
55 55 #-----------------------------------------------------------------------------
56 56
57 57 def squash_unicode(obj):
58 58 """coerce unicode back to bytestrings."""
59 59 if isinstance(obj,dict):
60 60 for key in obj.keys():
61 61 obj[key] = squash_unicode(obj[key])
62 62 if isinstance(key, unicode):
63 63 obj[squash_unicode(key)] = obj.pop(key)
64 64 elif isinstance(obj, list):
65 65 for i,v in enumerate(obj):
66 66 obj[i] = squash_unicode(v)
67 67 elif isinstance(obj, unicode):
68 68 obj = obj.encode('utf8')
69 69 return obj
70 70
71 71 #-----------------------------------------------------------------------------
72 72 # globals and defaults
73 73 #-----------------------------------------------------------------------------
74 74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
75 75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
76 76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
77 77
78 78 pickle_packer = lambda o: pickle.dumps(o,-1)
79 79 pickle_unpacker = pickle.loads
80 80
81 81 default_packer = json_packer
82 82 default_unpacker = json_unpacker
83 83
84 84
85 85 DELIM=b"<IDS|MSG>"
86 86
87 87 #-----------------------------------------------------------------------------
88 88 # Classes
89 89 #-----------------------------------------------------------------------------
90 90
91 91 class SessionFactory(LoggingConfigurable):
92 92 """The Base class for configurables that have a Session, Context, logger,
93 93 and IOLoop.
94 94 """
95 95
96 96 logname = Unicode('')
97 97 def _logname_changed(self, name, old, new):
98 98 self.log = logging.getLogger(new)
99 99
100 100 # not configurable:
101 101 context = Instance('zmq.Context')
102 102 def _context_default(self):
103 103 return zmq.Context.instance()
104 104
105 105 session = Instance('IPython.zmq.session.Session')
106 106
107 107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
108 108 def _loop_default(self):
109 109 return IOLoop.instance()
110 110
111 111 def __init__(self, **kwargs):
112 112 super(SessionFactory, self).__init__(**kwargs)
113 113
114 114 if self.session is None:
115 115 # construct the session
116 116 self.session = Session(**kwargs)
117 117
118 118
119 119 class Message(object):
120 120 """A simple message object that maps dict keys to attributes.
121 121
122 122 A Message can be created from a dict and a dict from a Message instance
123 123 simply by calling dict(msg_obj)."""
124 124
125 125 def __init__(self, msg_dict):
126 126 dct = self.__dict__
127 127 for k, v in dict(msg_dict).iteritems():
128 128 if isinstance(v, dict):
129 129 v = Message(v)
130 130 dct[k] = v
131 131
132 132 # Having this iterator lets dict(msg_obj) work out of the box.
133 133 def __iter__(self):
134 134 return iter(self.__dict__.iteritems())
135 135
136 136 def __repr__(self):
137 137 return repr(self.__dict__)
138 138
139 139 def __str__(self):
140 140 return pprint.pformat(self.__dict__)
141 141
142 142 def __contains__(self, k):
143 143 return k in self.__dict__
144 144
145 145 def __getitem__(self, k):
146 146 return self.__dict__[k]
147 147
148 148
149 149 def msg_header(msg_id, msg_type, username, session):
150 150 date = datetime.now()
151 151 return locals()
152 152
153 153 def extract_header(msg_or_header):
154 154 """Given a message or header, return the header."""
155 155 if not msg_or_header:
156 156 return {}
157 157 try:
158 158 # See if msg_or_header is the entire message.
159 159 h = msg_or_header['header']
160 160 except KeyError:
161 161 try:
162 162 # See if msg_or_header is just the header
163 163 h = msg_or_header['msg_id']
164 164 except KeyError:
165 165 raise
166 166 else:
167 167 h = msg_or_header
168 168 if not isinstance(h, dict):
169 169 h = dict(h)
170 170 return h
171 171
172 172 class Session(Configurable):
173 173 """Object for handling serialization and sending of messages.
174 174
175 175 The Session object handles building messages and sending them
176 176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
177 177 other over the network via Session objects, and only need to work with the
178 178 dict-based IPython message spec. The Session will handle
179 179 serialization/deserialization, security, and metadata.
180 180
181 181 Sessions support configurable serialiization via packer/unpacker traits,
182 182 and signing with HMAC digests via the key/keyfile traits.
183 183
184 184 Parameters
185 185 ----------
186 186
187 187 debug : bool
188 188 whether to trigger extra debugging statements
189 189 packer/unpacker : str : 'json', 'pickle' or import_string
190 190 importstrings for methods to serialize message parts. If just
191 191 'json' or 'pickle', predefined JSON and pickle packers will be used.
192 192 Otherwise, the entire importstring must be used.
193 193
194 194 The functions must accept at least valid JSON input, and output *bytes*.
195 195
196 196 For example, to use msgpack:
197 197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
198 198 pack/unpack : callables
199 199 You can also set the pack/unpack callables for serialization directly.
200 200 session : bytes
201 201 the ID of this Session object. The default is to generate a new UUID.
202 202 username : unicode
203 203 username added to message headers. The default is to ask the OS.
204 204 key : bytes
205 205 The key used to initialize an HMAC signature. If unset, messages
206 206 will not be signed or checked.
207 207 keyfile : filepath
208 208 The file containing a key. If this is set, `key` will be initialized
209 209 to the contents of the file.
210 210
211 211 """
212 212
213 213 debug=Bool(False, config=True, help="""Debug output in the Session""")
214 214
215 215 packer = DottedObjectName('json',config=True,
216 216 help="""The name of the packer for serializing messages.
217 217 Should be one of 'json', 'pickle', or an import name
218 218 for a custom callable serializer.""")
219 219 def _packer_changed(self, name, old, new):
220 220 if new.lower() == 'json':
221 221 self.pack = json_packer
222 222 self.unpack = json_unpacker
223 223 elif new.lower() == 'pickle':
224 224 self.pack = pickle_packer
225 225 self.unpack = pickle_unpacker
226 226 else:
227 227 self.pack = import_item(str(new))
228 228
229 229 unpacker = DottedObjectName('json', config=True,
230 230 help="""The name of the unpacker for unserializing messages.
231 231 Only used with custom functions for `packer`.""")
232 232 def _unpacker_changed(self, name, old, new):
233 233 if new.lower() == 'json':
234 234 self.pack = json_packer
235 235 self.unpack = json_unpacker
236 236 elif new.lower() == 'pickle':
237 237 self.pack = pickle_packer
238 238 self.unpack = pickle_unpacker
239 239 else:
240 240 self.unpack = import_item(str(new))
241 241
242 242 session = CBytes(b'', config=True,
243 243 help="""The UUID identifying this session.""")
244 244 def _session_default(self):
245 245 return bytes(uuid.uuid4())
246 246
247 247 username = Unicode(os.environ.get('USER','username'), config=True,
248 248 help="""Username for the Session. Default is your system username.""")
249 249
250 250 # message signature related traits:
251 251 key = CBytes(b'', config=True,
252 252 help="""execution key, for extra authentication.""")
253 253 def _key_changed(self, name, old, new):
254 254 if new:
255 255 self.auth = hmac.HMAC(new)
256 256 else:
257 257 self.auth = None
258 258 auth = Instance(hmac.HMAC)
259 259 digest_history = Set()
260 260
261 261 keyfile = Unicode('', config=True,
262 262 help="""path to file containing execution key.""")
263 263 def _keyfile_changed(self, name, old, new):
264 264 with open(new, 'rb') as f:
265 265 self.key = f.read().strip()
266 266
267 267 pack = Any(default_packer) # the actual packer function
268 268 def _pack_changed(self, name, old, new):
269 269 if not callable(new):
270 270 raise TypeError("packer must be callable, not %s"%type(new))
271 271
272 272 unpack = Any(default_unpacker) # the actual packer function
273 273 def _unpack_changed(self, name, old, new):
274 274 # unpacker is not checked - it is assumed to be
275 275 if not callable(new):
276 276 raise TypeError("unpacker must be callable, not %s"%type(new))
277 277
278 278 def __init__(self, **kwargs):
279 279 """create a Session object
280 280
281 281 Parameters
282 282 ----------
283 283
284 284 debug : bool
285 285 whether to trigger extra debugging statements
286 286 packer/unpacker : str : 'json', 'pickle' or import_string
287 287 importstrings for methods to serialize message parts. If just
288 288 'json' or 'pickle', predefined JSON and pickle packers will be used.
289 289 Otherwise, the entire importstring must be used.
290 290
291 291 The functions must accept at least valid JSON input, and output
292 292 *bytes*.
293 293
294 294 For example, to use msgpack:
295 295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
296 296 pack/unpack : callables
297 297 You can also set the pack/unpack callables for serialization
298 298 directly.
299 299 session : bytes
300 300 the ID of this Session object. The default is to generate a new
301 301 UUID.
302 302 username : unicode
303 303 username added to message headers. The default is to ask the OS.
304 304 key : bytes
305 305 The key used to initialize an HMAC signature. If unset, messages
306 306 will not be signed or checked.
307 307 keyfile : filepath
308 308 The file containing a key. If this is set, `key` will be
309 309 initialized to the contents of the file.
310 310 """
311 311 super(Session, self).__init__(**kwargs)
312 312 self._check_packers()
313 313 self.none = self.pack({})
314 314
315 315 @property
316 316 def msg_id(self):
317 317 """always return new uuid"""
318 318 return str(uuid.uuid4())
319 319
320 320 def _check_packers(self):
321 321 """check packers for binary data and datetime support."""
322 322 pack = self.pack
323 323 unpack = self.unpack
324 324
325 325 # check simple serialization
326 326 msg = dict(a=[1,'hi'])
327 327 try:
328 328 packed = pack(msg)
329 329 except Exception:
330 330 raise ValueError("packer could not serialize a simple message")
331 331
332 332 # ensure packed message is bytes
333 333 if not isinstance(packed, bytes):
334 334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
335 335
336 336 # check that unpack is pack's inverse
337 337 try:
338 338 unpacked = unpack(packed)
339 339 except Exception:
340 340 raise ValueError("unpacker could not handle the packer's output")
341 341
342 342 # check datetime support
343 343 msg = dict(t=datetime.now())
344 344 try:
345 345 unpacked = unpack(pack(msg))
346 346 except Exception:
347 347 self.pack = lambda o: pack(squash_dates(o))
348 348 self.unpack = lambda s: extract_dates(unpack(s))
349 349
350 350 def msg_header(self, msg_type):
351 351 return msg_header(self.msg_id, msg_type, self.username, self.session)
352 352
353 353 def msg(self, msg_type, content=None, parent=None, subheader=None):
354 """Return the nested message dict.
355
356 This format is different from what is sent over the wire. The
357 self.serialize method converts this nested message dict to the wire
358 format, which uses a message list.
359 """
354 360 msg = {}
355 361 msg['header'] = self.msg_header(msg_type)
356 362 msg['msg_id'] = msg['header']['msg_id']
357 363 msg['parent_header'] = {} if parent is None else extract_header(parent)
358 364 msg['msg_type'] = msg_type
359 365 msg['content'] = {} if content is None else content
360 366 sub = {} if subheader is None else subheader
361 367 msg['header'].update(sub)
362 368 return msg
363 369
364 def sign(self, msg):
365 """Sign a message with HMAC digest. If no auth, return b''."""
370 def sign(self, msg_list):
371 """Sign a message with HMAC digest. If no auth, return b''.
372
373 Parameters
374 ----------
375 msg_list : list
376 The [p_header,p_parent,p_content] part of the message list.
377 """
366 378 if self.auth is None:
367 379 return b''
368 380 h = self.auth.copy()
369 for m in msg:
381 for m in msg_list:
370 382 h.update(m)
371 383 return h.hexdigest()
372 384
373 385 def serialize(self, msg, ident=None):
374 386 """Serialize the message components to bytes.
375
387
388 Parameters
389 ----------
390 msg : dict or Message
391 The nexted message dict as returned by the self.msg method.
392
376 393 Returns
377 394 -------
378
379 list of bytes objects
380
395 msg_list : list
396 The list of bytes objects to be sent with the format:
397 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
398 buffer1,buffer2,...]. In this list, the p_* entities are
399 the packed or serialized versions, so if JSON is used, these
400 are uft8 encoded JSON strings.
381 401 """
382 402 content = msg.get('content', {})
383 403 if content is None:
384 404 content = self.none
385 405 elif isinstance(content, dict):
386 406 content = self.pack(content)
387 407 elif isinstance(content, bytes):
388 408 # content is already packed, as in a relayed message
389 409 pass
390 410 elif isinstance(content, unicode):
391 411 # should be bytes, but JSON often spits out unicode
392 412 content = content.encode('utf8')
393 413 else:
394 414 raise TypeError("Content incorrect type: %s"%type(content))
395 415
396 416 real_message = [self.pack(msg['header']),
397 417 self.pack(msg['parent_header']),
398 418 content
399 419 ]
400 420
401 421 to_send = []
402 422
403 423 if isinstance(ident, list):
404 424 # accept list of idents
405 425 to_send.extend(ident)
406 426 elif ident is not None:
407 427 to_send.append(ident)
408 428 to_send.append(DELIM)
409 429
410 430 signature = self.sign(real_message)
411 431 to_send.append(signature)
412 432
413 433 to_send.extend(real_message)
414 434
415 435 return to_send
416 436
417 437 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
418 438 buffers=None, subheader=None, track=False):
419 439 """Build and send a message via stream or socket.
420
440
441 The message format used by this function internally is as follows:
442
443 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
444 buffer1,buffer2,...]
445
446 The self.serialize method converts the nested message dict into this
447 format.
448
421 449 Parameters
422 450 ----------
423 451
424 452 stream : zmq.Socket or ZMQStream
425 453 the socket-like object used to send the data
426 454 msg_or_type : str or Message/dict
427 455 Normally, msg_or_type will be a msg_type unless a message is being
428 456 sent more than once.
429 457
430 458 content : dict or None
431 459 the content of the message (ignored if msg_or_type is a message)
432 460 parent : Message or dict or None
433 461 the parent or parent header describing the parent of this message
434 462 ident : bytes or list of bytes
435 463 the zmq.IDENTITY routing path
436 464 subheader : dict or None
437 465 extra header keys for this message's header
438 466 buffers : list or None
439 467 the already-serialized buffers to be appended to the message
440 468 track : bool
441 469 whether to track. Only for use with Sockets,
442 470 because ZMQStream objects cannot track messages.
443 471
444 472 Returns
445 473 -------
446 474 msg : message dict
447 475 the constructed message
448 476 (msg,tracker) : (message dict, MessageTracker)
449 477 if track=True, then a 2-tuple will be returned,
450 478 the first element being the constructed
451 479 message, and the second being the MessageTracker
452 480
453 481 """
454 482
455 483 if not isinstance(stream, (zmq.Socket, ZMQStream)):
456 484 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
457 485 elif track and isinstance(stream, ZMQStream):
458 486 raise TypeError("ZMQStream cannot track messages")
459 487
460 488 if isinstance(msg_or_type, (Message, dict)):
461 489 # we got a Message, not a msg_type
462 490 # don't build a new Message
463 491 msg = msg_or_type
464 492 else:
465 493 msg = self.msg(msg_or_type, content, parent, subheader)
466 494
467 495 buffers = [] if buffers is None else buffers
468 496 to_send = self.serialize(msg, ident)
469 497 flag = 0
470 498 if buffers:
471 499 flag = zmq.SNDMORE
472 500 _track = False
473 501 else:
474 502 _track=track
475 503 if track:
476 504 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
477 505 else:
478 506 tracker = stream.send_multipart(to_send, flag, copy=False)
479 507 for b in buffers[:-1]:
480 508 stream.send(b, flag, copy=False)
481 509 if buffers:
482 510 if track:
483 511 tracker = stream.send(buffers[-1], copy=False, track=track)
484 512 else:
485 513 tracker = stream.send(buffers[-1], copy=False)
486
514
487 515 # omsg = Message(msg)
488 516 if self.debug:
489 517 pprint.pprint(msg)
490 518 pprint.pprint(to_send)
491 519 pprint.pprint(buffers)
492 520
493 521 msg['tracker'] = tracker
494 522
495 523 return msg
496 524
497 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
525 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
498 526 """Send a raw message via ident path.
499
527
528 This method is used to send a already serialized message.
529
500 530 Parameters
501 531 ----------
502 msg : list of sendable buffers"""
532 stream : ZMQStream or Socket
533 The ZMQ stream or socket to use for sending the message.
534 msg_list : list
535 The serialized list of messages to send. This only includes the
536 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
537 the message.
538 ident : ident or list
539 A single ident or a list of idents to use in sending.
540 """
503 541 to_send = []
504 542 if isinstance(ident, bytes):
505 543 ident = [ident]
506 544 if ident is not None:
507 545 to_send.extend(ident)
508 546
509 547 to_send.append(DELIM)
510 to_send.append(self.sign(msg))
511 to_send.extend(msg)
512 stream.send_multipart(msg, flags, copy=copy)
548 to_send.append(self.sign(msg_list))
549 to_send.extend(msg_list)
550 stream.send_multipart(msg_list, flags, copy=copy)
513 551
514 552 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
515 """receives and unpacks a message
516 returns [idents], msg"""
553 """Receive and unpack a message.
554
555 Parameters
556 ----------
557 socket : ZMQStream or Socket
558 The socket or stream to use in receiving.
559
560 Returns
561 -------
562 [idents], msg
563 [idents] is a list of idents and msg is a nested message dict of
564 same format as self.msg returns.
565 """
517 566 if isinstance(socket, ZMQStream):
518 567 socket = socket.socket
519 568 try:
520 msg = socket.recv_multipart(mode)
569 msg_list = socket.recv_multipart(mode)
521 570 except zmq.ZMQError as e:
522 571 if e.errno == zmq.EAGAIN:
523 572 # We can convert EAGAIN to None as we know in this case
524 573 # recv_multipart won't return None.
525 574 return None,None
526 575 else:
527 576 raise
528 577 # split multipart message into identity list and message dict
529 578 # invalid large messages can cause very expensive string comparisons
530 idents, msg = self.feed_identities(msg, copy)
579 idents, msg_list = self.feed_identities(msg_list, copy)
531 580 try:
532 return idents, self.unpack_message(msg, content=content, copy=copy)
581 return idents, self.unpack_message(msg_list, content=content, copy=copy)
533 582 except Exception as e:
534 print (idents, msg)
583 print (idents, msg_list)
535 584 # TODO: handle it
536 585 raise e
537 586
538 def feed_identities(self, msg, copy=True):
539 """feed until DELIM is reached, then return the prefix as idents and
540 remainder as msg. This is easily broken by setting an IDENT to DELIM,
587 def feed_identities(self, msg_list, copy=True):
588 """Split the identities from the rest of the message.
589
590 Feed until DELIM is reached, then return the prefix as idents and
591 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
541 592 but that would be silly.
542 593
543 594 Parameters
544 595 ----------
545 msg : a list of Message or bytes objects
546 the message to be split
596 msg_list : a list of Message or bytes objects
597 The message to be split.
547 598 copy : bool
548 599 flag determining whether the arguments are bytes or Messages
549 600
550 601 Returns
551 602 -------
552 (idents,msg) : two lists
603 (idents,msg_list) : two lists
553 604 idents will always be a list of bytes - the indentity prefix
554 msg will be a list of bytes or Messages, unchanged from input
555 msg should be unpackable via self.unpack_message at this point.
605 msg_list will be a list of bytes or Messages, unchanged from input
606 msg_list should be unpackable via self.unpack_message at this point.
556 607 """
557 608 if copy:
558 idx = msg.index(DELIM)
559 return msg[:idx], msg[idx+1:]
609 idx = msg_list.index(DELIM)
610 return msg_list[:idx], msg_list[idx+1:]
560 611 else:
561 612 failed = True
562 for idx,m in enumerate(msg):
613 for idx,m in enumerate(msg_list):
563 614 if m.bytes == DELIM:
564 615 failed = False
565 616 break
566 617 if failed:
567 raise ValueError("DELIM not in msg")
568 idents, msg = msg[:idx], msg[idx+1:]
569 return [m.bytes for m in idents], msg
618 raise ValueError("DELIM not in msg_list")
619 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
620 return [m.bytes for m in idents], msg_list
570 621
571 def unpack_message(self, msg, content=True, copy=True):
622 def unpack_message(self, msg_list, content=True, copy=True):
572 623 """Return a message object from the format
573 624 sent by self.send.
574 625
575 626 Parameters:
576 627 -----------
577 628
578 629 content : bool (True)
579 630 whether to unpack the content dict (True),
580 631 or leave it serialized (False)
581 632
582 633 copy : bool (True)
583 634 whether to return the bytes (True),
584 635 or the non-copying Message object in each place (False)
585 636
586 637 """
587 638 minlen = 4
588 639 message = {}
589 640 if not copy:
590 641 for i in range(minlen):
591 msg[i] = msg[i].bytes
642 msg_list[i] = msg_list[i].bytes
592 643 if self.auth is not None:
593 signature = msg[0]
644 signature = msg_list[0]
594 645 if signature in self.digest_history:
595 646 raise ValueError("Duplicate Signature: %r"%signature)
596 647 self.digest_history.add(signature)
597 check = self.sign(msg[1:4])
648 check = self.sign(msg_list[1:4])
598 649 if not signature == check:
599 650 raise ValueError("Invalid Signature: %r"%signature)
600 if not len(msg) >= minlen:
651 if not len(msg_list) >= minlen:
601 652 raise TypeError("malformed message, must have at least %i elements"%minlen)
602 message['header'] = self.unpack(msg[1])
653 message['header'] = self.unpack(msg_list[1])
603 654 message['msg_type'] = message['header']['msg_type']
604 message['parent_header'] = self.unpack(msg[2])
655 message['parent_header'] = self.unpack(msg_list[2])
605 656 if content:
606 message['content'] = self.unpack(msg[3])
657 message['content'] = self.unpack(msg_list[3])
607 658 else:
608 message['content'] = msg[3]
659 message['content'] = msg_list[3]
609 660
610 message['buffers'] = msg[4:]
661 message['buffers'] = msg_list[4:]
611 662 return message
612 663
613 664 def test_msg2obj():
614 665 am = dict(x=1)
615 666 ao = Message(am)
616 667 assert ao.x == am['x']
617 668
618 669 am['y'] = dict(z=1)
619 670 ao = Message(am)
620 671 assert ao.y.z == am['y']['z']
621 672
622 673 k1, k2 = 'y', 'z'
623 674 assert ao[k1][k2] == am[k1][k2]
624 675
625 676 am2 = dict(ao)
626 677 assert am['x'] == am2['x']
627 678 assert am['y']['z'] == am2['y']['z']
628 679
General Comments 0
You need to be logged in to leave comments. Login now