##// END OF EJS Templates
Spelling correction.
Thomas Kluyver -
Show More
@@ -1,703 +1,703 b''
1 1 """Session object for building, serializing, sending, and receiving messages in
2 2 IPython. The Session object supports serialization, HMAC signatures, and
3 3 metadata on messages.
4 4
5 5 Also defined here are utilities for working with Sessions:
6 6 * A SessionFactory to be used as a base class for configurables that work with
7 7 Sessions.
8 8 * A Message object for convenience that allows attribute-access to the msg dict.
9 9
10 10 Authors:
11 11
12 12 * Min RK
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 """
16 16 #-----------------------------------------------------------------------------
17 17 # Copyright (C) 2010-2011 The IPython Development Team
18 18 #
19 19 # Distributed under the terms of the BSD License. The full license is in
20 20 # the file COPYING, distributed as part of this software.
21 21 #-----------------------------------------------------------------------------
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Imports
25 25 #-----------------------------------------------------------------------------
26 26
27 27 import hmac
28 28 import logging
29 29 import os
30 30 import pprint
31 31 import uuid
32 32 from datetime import datetime
33 33
34 34 try:
35 35 import cPickle
36 36 pickle = cPickle
37 37 except:
38 38 cPickle = None
39 39 import pickle
40 40
41 41 import zmq
42 42 from zmq.utils import jsonapi
43 43 from zmq.eventloop.ioloop import IOLoop
44 44 from zmq.eventloop.zmqstream import ZMQStream
45 45
46 46 from IPython.config.configurable import Configurable, LoggingConfigurable
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 49 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 50 DottedObjectName)
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # utility functions
54 54 #-----------------------------------------------------------------------------
55 55
56 56 def squash_unicode(obj):
57 57 """coerce unicode back to bytestrings."""
58 58 if isinstance(obj,dict):
59 59 for key in obj.keys():
60 60 obj[key] = squash_unicode(obj[key])
61 61 if isinstance(key, unicode):
62 62 obj[squash_unicode(key)] = obj.pop(key)
63 63 elif isinstance(obj, list):
64 64 for i,v in enumerate(obj):
65 65 obj[i] = squash_unicode(v)
66 66 elif isinstance(obj, unicode):
67 67 obj = obj.encode('utf8')
68 68 return obj
69 69
70 70 #-----------------------------------------------------------------------------
71 71 # globals and defaults
72 72 #-----------------------------------------------------------------------------
73 73 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 74 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 75 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76 76
77 77 pickle_packer = lambda o: pickle.dumps(o,-1)
78 78 pickle_unpacker = pickle.loads
79 79
80 80 default_packer = json_packer
81 81 default_unpacker = json_unpacker
82 82
83 83
84 84 DELIM=b"<IDS|MSG>"
85 85
86 86 #-----------------------------------------------------------------------------
87 87 # Classes
88 88 #-----------------------------------------------------------------------------
89 89
90 90 class SessionFactory(LoggingConfigurable):
91 91 """The Base class for configurables that have a Session, Context, logger,
92 92 and IOLoop.
93 93 """
94 94
95 95 logname = Unicode('')
96 96 def _logname_changed(self, name, old, new):
97 97 self.log = logging.getLogger(new)
98 98
99 99 # not configurable:
100 100 context = Instance('zmq.Context')
101 101 def _context_default(self):
102 102 return zmq.Context.instance()
103 103
104 104 session = Instance('IPython.zmq.session.Session')
105 105
106 106 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 107 def _loop_default(self):
108 108 return IOLoop.instance()
109 109
110 110 def __init__(self, **kwargs):
111 111 super(SessionFactory, self).__init__(**kwargs)
112 112
113 113 if self.session is None:
114 114 # construct the session
115 115 self.session = Session(**kwargs)
116 116
117 117
118 118 class Message(object):
119 119 """A simple message object that maps dict keys to attributes.
120 120
121 121 A Message can be created from a dict and a dict from a Message instance
122 122 simply by calling dict(msg_obj)."""
123 123
124 124 def __init__(self, msg_dict):
125 125 dct = self.__dict__
126 126 for k, v in dict(msg_dict).iteritems():
127 127 if isinstance(v, dict):
128 128 v = Message(v)
129 129 dct[k] = v
130 130
131 131 # Having this iterator lets dict(msg_obj) work out of the box.
132 132 def __iter__(self):
133 133 return iter(self.__dict__.iteritems())
134 134
135 135 def __repr__(self):
136 136 return repr(self.__dict__)
137 137
138 138 def __str__(self):
139 139 return pprint.pformat(self.__dict__)
140 140
141 141 def __contains__(self, k):
142 142 return k in self.__dict__
143 143
144 144 def __getitem__(self, k):
145 145 return self.__dict__[k]
146 146
147 147
148 148 def msg_header(msg_id, msg_type, username, session):
149 149 date = datetime.now()
150 150 return locals()
151 151
152 152 def extract_header(msg_or_header):
153 153 """Given a message or header, return the header."""
154 154 if not msg_or_header:
155 155 return {}
156 156 try:
157 157 # See if msg_or_header is the entire message.
158 158 h = msg_or_header['header']
159 159 except KeyError:
160 160 try:
161 161 # See if msg_or_header is just the header
162 162 h = msg_or_header['msg_id']
163 163 except KeyError:
164 164 raise
165 165 else:
166 166 h = msg_or_header
167 167 if not isinstance(h, dict):
168 168 h = dict(h)
169 169 return h
170 170
171 171 class Session(Configurable):
172 172 """Object for handling serialization and sending of messages.
173 173
174 174 The Session object handles building messages and sending them
175 175 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 176 other over the network via Session objects, and only need to work with the
177 177 dict-based IPython message spec. The Session will handle
178 178 serialization/deserialization, security, and metadata.
179 179
180 180 Sessions support configurable serialiization via packer/unpacker traits,
181 181 and signing with HMAC digests via the key/keyfile traits.
182 182
183 183 Parameters
184 184 ----------
185 185
186 186 debug : bool
187 187 whether to trigger extra debugging statements
188 188 packer/unpacker : str : 'json', 'pickle' or import_string
189 189 importstrings for methods to serialize message parts. If just
190 190 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 191 Otherwise, the entire importstring must be used.
192 192
193 193 The functions must accept at least valid JSON input, and output *bytes*.
194 194
195 195 For example, to use msgpack:
196 196 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 197 pack/unpack : callables
198 198 You can also set the pack/unpack callables for serialization directly.
199 199 session : bytes
200 200 the ID of this Session object. The default is to generate a new UUID.
201 201 username : unicode
202 202 username added to message headers. The default is to ask the OS.
203 203 key : bytes
204 204 The key used to initialize an HMAC signature. If unset, messages
205 205 will not be signed or checked.
206 206 keyfile : filepath
207 207 The file containing a key. If this is set, `key` will be initialized
208 208 to the contents of the file.
209 209
210 210 """
211 211
212 212 debug=Bool(False, config=True, help="""Debug output in the Session""")
213 213
214 214 packer = DottedObjectName('json',config=True,
215 215 help="""The name of the packer for serializing messages.
216 216 Should be one of 'json', 'pickle', or an import name
217 217 for a custom callable serializer.""")
218 218 def _packer_changed(self, name, old, new):
219 219 if new.lower() == 'json':
220 220 self.pack = json_packer
221 221 self.unpack = json_unpacker
222 222 elif new.lower() == 'pickle':
223 223 self.pack = pickle_packer
224 224 self.unpack = pickle_unpacker
225 225 else:
226 226 self.pack = import_item(str(new))
227 227
228 228 unpacker = DottedObjectName('json', config=True,
229 229 help="""The name of the unpacker for unserializing messages.
230 230 Only used with custom functions for `packer`.""")
231 231 def _unpacker_changed(self, name, old, new):
232 232 if new.lower() == 'json':
233 233 self.pack = json_packer
234 234 self.unpack = json_unpacker
235 235 elif new.lower() == 'pickle':
236 236 self.pack = pickle_packer
237 237 self.unpack = pickle_unpacker
238 238 else:
239 239 self.unpack = import_item(str(new))
240 240
241 241 session = CBytes(b'', config=True,
242 242 help="""The UUID identifying this session.""")
243 243 def _session_default(self):
244 244 return bytes(uuid.uuid4())
245 245
246 246 username = Unicode(os.environ.get('USER',u'username'), config=True,
247 247 help="""Username for the Session. Default is your system username.""")
248 248
249 249 # message signature related traits:
250 250 key = CBytes(b'', config=True,
251 251 help="""execution key, for extra authentication.""")
252 252 def _key_changed(self, name, old, new):
253 253 if new:
254 254 self.auth = hmac.HMAC(new)
255 255 else:
256 256 self.auth = None
257 257 auth = Instance(hmac.HMAC)
258 258 digest_history = Set()
259 259
260 260 keyfile = Unicode('', config=True,
261 261 help="""path to file containing execution key.""")
262 262 def _keyfile_changed(self, name, old, new):
263 263 with open(new, 'rb') as f:
264 264 self.key = f.read().strip()
265 265
266 266 pack = Any(default_packer) # the actual packer function
267 267 def _pack_changed(self, name, old, new):
268 268 if not callable(new):
269 269 raise TypeError("packer must be callable, not %s"%type(new))
270 270
271 271 unpack = Any(default_unpacker) # the actual packer function
272 272 def _unpack_changed(self, name, old, new):
273 273 # unpacker is not checked - it is assumed to be
274 274 if not callable(new):
275 275 raise TypeError("unpacker must be callable, not %s"%type(new))
276 276
277 277 def __init__(self, **kwargs):
278 278 """create a Session object
279 279
280 280 Parameters
281 281 ----------
282 282
283 283 debug : bool
284 284 whether to trigger extra debugging statements
285 285 packer/unpacker : str : 'json', 'pickle' or import_string
286 286 importstrings for methods to serialize message parts. If just
287 287 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 288 Otherwise, the entire importstring must be used.
289 289
290 290 The functions must accept at least valid JSON input, and output
291 291 *bytes*.
292 292
293 293 For example, to use msgpack:
294 294 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 295 pack/unpack : callables
296 296 You can also set the pack/unpack callables for serialization
297 297 directly.
298 298 session : bytes
299 299 the ID of this Session object. The default is to generate a new
300 300 UUID.
301 301 username : unicode
302 302 username added to message headers. The default is to ask the OS.
303 303 key : bytes
304 304 The key used to initialize an HMAC signature. If unset, messages
305 305 will not be signed or checked.
306 306 keyfile : filepath
307 307 The file containing a key. If this is set, `key` will be
308 308 initialized to the contents of the file.
309 309 """
310 310 super(Session, self).__init__(**kwargs)
311 311 self._check_packers()
312 312 self.none = self.pack({})
313 313
314 314 @property
315 315 def msg_id(self):
316 316 """always return new uuid"""
317 317 return str(uuid.uuid4())
318 318
319 319 def _check_packers(self):
320 320 """check packers for binary data and datetime support."""
321 321 pack = self.pack
322 322 unpack = self.unpack
323 323
324 324 # check simple serialization
325 325 msg = dict(a=[1,'hi'])
326 326 try:
327 327 packed = pack(msg)
328 328 except Exception:
329 329 raise ValueError("packer could not serialize a simple message")
330 330
331 331 # ensure packed message is bytes
332 332 if not isinstance(packed, bytes):
333 333 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334 334
335 335 # check that unpack is pack's inverse
336 336 try:
337 337 unpacked = unpack(packed)
338 338 except Exception:
339 339 raise ValueError("unpacker could not handle the packer's output")
340 340
341 341 # check datetime support
342 342 msg = dict(t=datetime.now())
343 343 try:
344 344 unpacked = unpack(pack(msg))
345 345 except Exception:
346 346 self.pack = lambda o: pack(squash_dates(o))
347 347 self.unpack = lambda s: extract_dates(unpack(s))
348 348
349 349 def msg_header(self, msg_type):
350 350 return msg_header(self.msg_id, msg_type, self.username, self.session)
351 351
352 352 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
353 353 """Return the nested message dict.
354 354
355 355 This format is different from what is sent over the wire. The
356 356 serialize/unserialize methods converts this nested message dict to the wire
357 357 format, which is a list of message parts.
358 358 """
359 359 msg = {}
360 360 header = self.msg_header(msg_type) if header is None else header
361 361 msg['header'] = header
362 362 msg['msg_id'] = header['msg_id']
363 363 msg['msg_type'] = header['msg_type']
364 364 msg['parent_header'] = {} if parent is None else extract_header(parent)
365 365 msg['content'] = {} if content is None else content
366 366 sub = {} if subheader is None else subheader
367 367 msg['header'].update(sub)
368 368 return msg
369 369
370 370 def sign(self, msg_list):
371 371 """Sign a message with HMAC digest. If no auth, return b''.
372 372
373 373 Parameters
374 374 ----------
375 375 msg_list : list
376 376 The [p_header,p_parent,p_content] part of the message list.
377 377 """
378 378 if self.auth is None:
379 379 return b''
380 380 h = self.auth.copy()
381 381 for m in msg_list:
382 382 h.update(m)
383 383 return str_to_bytes(h.hexdigest())
384 384
385 385 def serialize(self, msg, ident=None):
386 386 """Serialize the message components to bytes.
387 387
388 388 This is roughly the inverse of unserialize. The serialize/unserialize
389 389 methods work with full message lists, whereas pack/unpack work with
390 390 the individual message parts in the message list.
391 391
392 392 Parameters
393 393 ----------
394 394 msg : dict or Message
395 395 The nexted message dict as returned by the self.msg method.
396 396
397 397 Returns
398 398 -------
399 399 msg_list : list
400 400 The list of bytes objects to be sent with the format:
401 401 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
402 402 buffer1,buffer2,...]. In this list, the p_* entities are
403 403 the packed or serialized versions, so if JSON is used, these
404 are uft8 encoded JSON strings.
404 are utf8 encoded JSON strings.
405 405 """
406 406 content = msg.get('content', {})
407 407 if content is None:
408 408 content = self.none
409 409 elif isinstance(content, dict):
410 410 content = self.pack(content)
411 411 elif isinstance(content, bytes):
412 412 # content is already packed, as in a relayed message
413 413 pass
414 414 elif isinstance(content, unicode):
415 415 # should be bytes, but JSON often spits out unicode
416 416 content = content.encode('utf8')
417 417 else:
418 418 raise TypeError("Content incorrect type: %s"%type(content))
419 419
420 420 real_message = [self.pack(msg['header']),
421 421 self.pack(msg['parent_header']),
422 422 content
423 423 ]
424 424
425 425 to_send = []
426 426
427 427 if isinstance(ident, list):
428 428 # accept list of idents
429 429 to_send.extend(ident)
430 430 elif ident is not None:
431 431 to_send.append(ident)
432 432 to_send.append(DELIM)
433 433
434 434 signature = self.sign(real_message)
435 435 to_send.append(signature)
436 436
437 437 to_send.extend(real_message)
438 438
439 439 return to_send
440 440
441 441 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
442 442 buffers=None, subheader=None, track=False, header=None):
443 443 """Build and send a message via stream or socket.
444 444
445 445 The message format used by this function internally is as follows:
446 446
447 447 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
448 448 buffer1,buffer2,...]
449 449
450 450 The serialize/unserialize methods convert the nested message dict into this
451 451 format.
452 452
453 453 Parameters
454 454 ----------
455 455
456 456 stream : zmq.Socket or ZMQStream
457 457 The socket-like object used to send the data.
458 458 msg_or_type : str or Message/dict
459 459 Normally, msg_or_type will be a msg_type unless a message is being
460 460 sent more than once. If a header is supplied, this can be set to
461 461 None and the msg_type will be pulled from the header.
462 462
463 463 content : dict or None
464 464 The content of the message (ignored if msg_or_type is a message).
465 465 header : dict or None
466 466 The header dict for the message (ignores if msg_to_type is a message).
467 467 parent : Message or dict or None
468 468 The parent or parent header describing the parent of this message
469 469 (ignored if msg_or_type is a message).
470 470 ident : bytes or list of bytes
471 471 The zmq.IDENTITY routing path.
472 472 subheader : dict or None
473 473 Extra header keys for this message's header (ignored if msg_or_type
474 474 is a message).
475 475 buffers : list or None
476 476 The already-serialized buffers to be appended to the message.
477 477 track : bool
478 478 Whether to track. Only for use with Sockets, because ZMQStream
479 479 objects cannot track messages.
480 480
481 481 Returns
482 482 -------
483 483 msg : dict
484 484 The constructed message.
485 485 (msg,tracker) : (dict, MessageTracker)
486 486 if track=True, then a 2-tuple will be returned,
487 487 the first element being the constructed
488 488 message, and the second being the MessageTracker
489 489
490 490 """
491 491
492 492 if not isinstance(stream, (zmq.Socket, ZMQStream)):
493 493 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
494 494 elif track and isinstance(stream, ZMQStream):
495 495 raise TypeError("ZMQStream cannot track messages")
496 496
497 497 if isinstance(msg_or_type, (Message, dict)):
498 498 # We got a Message or message dict, not a msg_type so don't
499 499 # build a new Message.
500 500 msg = msg_or_type
501 501 else:
502 502 msg = self.msg(msg_or_type, content=content, parent=parent,
503 503 subheader=subheader, header=header)
504 504
505 505 buffers = [] if buffers is None else buffers
506 506 to_send = self.serialize(msg, ident)
507 507 flag = 0
508 508 if buffers:
509 509 flag = zmq.SNDMORE
510 510 _track = False
511 511 else:
512 512 _track=track
513 513 if track:
514 514 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
515 515 else:
516 516 tracker = stream.send_multipart(to_send, flag, copy=False)
517 517 for b in buffers[:-1]:
518 518 stream.send(b, flag, copy=False)
519 519 if buffers:
520 520 if track:
521 521 tracker = stream.send(buffers[-1], copy=False, track=track)
522 522 else:
523 523 tracker = stream.send(buffers[-1], copy=False)
524 524
525 525 # omsg = Message(msg)
526 526 if self.debug:
527 527 pprint.pprint(msg)
528 528 pprint.pprint(to_send)
529 529 pprint.pprint(buffers)
530 530
531 531 msg['tracker'] = tracker
532 532
533 533 return msg
534 534
535 535 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
536 536 """Send a raw message via ident path.
537 537
538 538 This method is used to send a already serialized message.
539 539
540 540 Parameters
541 541 ----------
542 542 stream : ZMQStream or Socket
543 543 The ZMQ stream or socket to use for sending the message.
544 544 msg_list : list
545 545 The serialized list of messages to send. This only includes the
546 546 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
547 547 the message.
548 548 ident : ident or list
549 549 A single ident or a list of idents to use in sending.
550 550 """
551 551 to_send = []
552 552 if isinstance(ident, bytes):
553 553 ident = [ident]
554 554 if ident is not None:
555 555 to_send.extend(ident)
556 556
557 557 to_send.append(DELIM)
558 558 to_send.append(self.sign(msg_list))
559 559 to_send.extend(msg_list)
560 560 stream.send_multipart(msg_list, flags, copy=copy)
561 561
562 562 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
563 563 """Receive and unpack a message.
564 564
565 565 Parameters
566 566 ----------
567 567 socket : ZMQStream or Socket
568 568 The socket or stream to use in receiving.
569 569
570 570 Returns
571 571 -------
572 572 [idents], msg
573 573 [idents] is a list of idents and msg is a nested message dict of
574 574 same format as self.msg returns.
575 575 """
576 576 if isinstance(socket, ZMQStream):
577 577 socket = socket.socket
578 578 try:
579 579 msg_list = socket.recv_multipart(mode)
580 580 except zmq.ZMQError as e:
581 581 if e.errno == zmq.EAGAIN:
582 582 # We can convert EAGAIN to None as we know in this case
583 583 # recv_multipart won't return None.
584 584 return None,None
585 585 else:
586 586 raise
587 587 # split multipart message into identity list and message dict
588 588 # invalid large messages can cause very expensive string comparisons
589 589 idents, msg_list = self.feed_identities(msg_list, copy)
590 590 try:
591 591 return idents, self.unserialize(msg_list, content=content, copy=copy)
592 592 except Exception as e:
593 593 # TODO: handle it
594 594 raise e
595 595
596 596 def feed_identities(self, msg_list, copy=True):
597 597 """Split the identities from the rest of the message.
598 598
599 599 Feed until DELIM is reached, then return the prefix as idents and
600 600 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
601 601 but that would be silly.
602 602
603 603 Parameters
604 604 ----------
605 605 msg_list : a list of Message or bytes objects
606 606 The message to be split.
607 607 copy : bool
608 608 flag determining whether the arguments are bytes or Messages
609 609
610 610 Returns
611 611 -------
612 612 (idents, msg_list) : two lists
613 613 idents will always be a list of bytes, each of which is a ZMQ
614 614 identity. msg_list will be a list of bytes or zmq.Messages of the
615 615 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
616 616 should be unpackable/unserializable via self.unserialize at this
617 617 point.
618 618 """
619 619 if copy:
620 620 idx = msg_list.index(DELIM)
621 621 return msg_list[:idx], msg_list[idx+1:]
622 622 else:
623 623 failed = True
624 624 for idx,m in enumerate(msg_list):
625 625 if m.bytes == DELIM:
626 626 failed = False
627 627 break
628 628 if failed:
629 629 raise ValueError("DELIM not in msg_list")
630 630 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
631 631 return [m.bytes for m in idents], msg_list
632 632
633 633 def unserialize(self, msg_list, content=True, copy=True):
634 634 """Unserialize a msg_list to a nested message dict.
635 635
636 636 This is roughly the inverse of serialize. The serialize/unserialize
637 637 methods work with full message lists, whereas pack/unpack work with
638 638 the individual message parts in the message list.
639 639
640 640 Parameters:
641 641 -----------
642 642 msg_list : list of bytes or Message objects
643 643 The list of message parts of the form [HMAC,p_header,p_parent,
644 644 p_content,buffer1,buffer2,...].
645 645 content : bool (True)
646 646 Whether to unpack the content dict (True), or leave it packed
647 647 (False).
648 648 copy : bool (True)
649 649 Whether to return the bytes (True), or the non-copying Message
650 650 object in each place (False).
651 651
652 652 Returns
653 653 -------
654 654 msg : dict
655 655 The nested message dict with top-level keys [header, parent_header,
656 656 content, buffers].
657 657 """
658 658 minlen = 4
659 659 message = {}
660 660 if not copy:
661 661 for i in range(minlen):
662 662 msg_list[i] = msg_list[i].bytes
663 663 if self.auth is not None:
664 664 signature = msg_list[0]
665 665 if not signature:
666 666 raise ValueError("Unsigned Message")
667 667 if signature in self.digest_history:
668 668 raise ValueError("Duplicate Signature: %r"%signature)
669 669 self.digest_history.add(signature)
670 670 check = self.sign(msg_list[1:4])
671 671 if not signature == check:
672 672 raise ValueError("Invalid Signature: %r"%signature)
673 673 if not len(msg_list) >= minlen:
674 674 raise TypeError("malformed message, must have at least %i elements"%minlen)
675 675 header = self.unpack(msg_list[1])
676 676 message['header'] = header
677 677 message['msg_id'] = header['msg_id']
678 678 message['msg_type'] = header['msg_type']
679 679 message['parent_header'] = self.unpack(msg_list[2])
680 680 if content:
681 681 message['content'] = self.unpack(msg_list[3])
682 682 else:
683 683 message['content'] = msg_list[3]
684 684
685 685 message['buffers'] = msg_list[4:]
686 686 return message
687 687
688 688 def test_msg2obj():
689 689 am = dict(x=1)
690 690 ao = Message(am)
691 691 assert ao.x == am['x']
692 692
693 693 am['y'] = dict(z=1)
694 694 ao = Message(am)
695 695 assert ao.y.z == am['y']['z']
696 696
697 697 k1, k2 = 'y', 'z'
698 698 assert ao[k1][k2] == am[k1][k2]
699 699
700 700 am2 = dict(ao)
701 701 assert am['x'] == am2['x']
702 702 assert am['y']['z'] == am2['y']['z']
703 703
General Comments 0
You need to be logged in to leave comments. Login now