##// END OF EJS Templates
Make session subheader default configurable
Jason Grout -
Show More
@@ -1,756 +1,760 b''
1 1 """Session object for building, serializing, sending, and receiving messages in
2 2 IPython. The Session object supports serialization, HMAC signatures, and
3 3 metadata on messages.
4 4
5 5 Also defined here are utilities for working with Sessions:
6 6 * A SessionFactory to be used as a base class for configurables that work with
7 7 Sessions.
8 8 * A Message object for convenience that allows attribute-access to the msg dict.
9 9
10 10 Authors:
11 11
12 12 * Min RK
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 """
16 16 #-----------------------------------------------------------------------------
17 17 # Copyright (C) 2010-2011 The IPython Development Team
18 18 #
19 19 # Distributed under the terms of the BSD License. The full license is in
20 20 # the file COPYING, distributed as part of this software.
21 21 #-----------------------------------------------------------------------------
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Imports
25 25 #-----------------------------------------------------------------------------
26 26
27 27 import hmac
28 28 import logging
29 29 import os
30 30 import pprint
31 31 import uuid
32 32 from datetime import datetime
33 33
34 34 try:
35 35 import cPickle
36 36 pickle = cPickle
37 37 except:
38 38 cPickle = None
39 39 import pickle
40 40
41 41 import zmq
42 42 from zmq.utils import jsonapi
43 43 from zmq.eventloop.ioloop import IOLoop
44 44 from zmq.eventloop.zmqstream import ZMQStream
45 45
46 46 from IPython.config.application import Application, boolean_flag
47 47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 48 from IPython.utils.importstring import import_item
49 49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 50 from IPython.utils.py3compat import str_to_bytes
51 51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 DottedObjectName, CUnicode)
52 DottedObjectName, CUnicode, Dict)
53 53
54 54 #-----------------------------------------------------------------------------
55 55 # utility functions
56 56 #-----------------------------------------------------------------------------
57 57
58 58 def squash_unicode(obj):
59 59 """coerce unicode back to bytestrings."""
60 60 if isinstance(obj,dict):
61 61 for key in obj.keys():
62 62 obj[key] = squash_unicode(obj[key])
63 63 if isinstance(key, unicode):
64 64 obj[squash_unicode(key)] = obj.pop(key)
65 65 elif isinstance(obj, list):
66 66 for i,v in enumerate(obj):
67 67 obj[i] = squash_unicode(v)
68 68 elif isinstance(obj, unicode):
69 69 obj = obj.encode('utf8')
70 70 return obj
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # globals and defaults
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 # ISO8601-ify datetime objects
78 78 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
79 79 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
80 80
81 81 pickle_packer = lambda o: pickle.dumps(o,-1)
82 82 pickle_unpacker = pickle.loads
83 83
84 84 default_packer = json_packer
85 85 default_unpacker = json_unpacker
86 86
87 87 DELIM=b"<IDS|MSG>"
88 88
89 89
90 90 #-----------------------------------------------------------------------------
91 91 # Mixin tools for apps that use Sessions
92 92 #-----------------------------------------------------------------------------
93 93
94 94 session_aliases = dict(
95 95 ident = 'Session.session',
96 96 user = 'Session.username',
97 97 keyfile = 'Session.keyfile',
98 98 )
99 99
100 100 session_flags = {
101 101 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
102 102 'keyfile' : '' }},
103 103 """Use HMAC digests for authentication of messages.
104 104 Setting this flag will generate a new UUID to use as the HMAC key.
105 105 """),
106 106 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
107 107 """Don't authenticate messages."""),
108 108 }
109 109
110 110 def default_secure(cfg):
111 111 """Set the default behavior for a config environment to be secure.
112 112
113 113 If Session.key/keyfile have not been set, set Session.key to
114 114 a new random UUID.
115 115 """
116 116
117 117 if 'Session' in cfg:
118 118 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
119 119 return
120 120 # key/keyfile not specified, generate new UUID:
121 121 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
122 122
123 123
124 124 #-----------------------------------------------------------------------------
125 125 # Classes
126 126 #-----------------------------------------------------------------------------
127 127
128 128 class SessionFactory(LoggingConfigurable):
129 129 """The Base class for configurables that have a Session, Context, logger,
130 130 and IOLoop.
131 131 """
132 132
133 133 logname = Unicode('')
134 134 def _logname_changed(self, name, old, new):
135 135 self.log = logging.getLogger(new)
136 136
137 137 # not configurable:
138 138 context = Instance('zmq.Context')
139 139 def _context_default(self):
140 140 return zmq.Context.instance()
141 141
142 142 session = Instance('IPython.zmq.session.Session')
143 143
144 144 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
145 145 def _loop_default(self):
146 146 return IOLoop.instance()
147 147
148 148 def __init__(self, **kwargs):
149 149 super(SessionFactory, self).__init__(**kwargs)
150 150
151 151 if self.session is None:
152 152 # construct the session
153 153 self.session = Session(**kwargs)
154 154
155 155
156 156 class Message(object):
157 157 """A simple message object that maps dict keys to attributes.
158 158
159 159 A Message can be created from a dict and a dict from a Message instance
160 160 simply by calling dict(msg_obj)."""
161 161
162 162 def __init__(self, msg_dict):
163 163 dct = self.__dict__
164 164 for k, v in dict(msg_dict).iteritems():
165 165 if isinstance(v, dict):
166 166 v = Message(v)
167 167 dct[k] = v
168 168
169 169 # Having this iterator lets dict(msg_obj) work out of the box.
170 170 def __iter__(self):
171 171 return iter(self.__dict__.iteritems())
172 172
173 173 def __repr__(self):
174 174 return repr(self.__dict__)
175 175
176 176 def __str__(self):
177 177 return pprint.pformat(self.__dict__)
178 178
179 179 def __contains__(self, k):
180 180 return k in self.__dict__
181 181
182 182 def __getitem__(self, k):
183 183 return self.__dict__[k]
184 184
185 185
186 186 def msg_header(msg_id, msg_type, username, session):
187 187 date = datetime.now()
188 188 return locals()
189 189
190 190 def extract_header(msg_or_header):
191 191 """Given a message or header, return the header."""
192 192 if not msg_or_header:
193 193 return {}
194 194 try:
195 195 # See if msg_or_header is the entire message.
196 196 h = msg_or_header['header']
197 197 except KeyError:
198 198 try:
199 199 # See if msg_or_header is just the header
200 200 h = msg_or_header['msg_id']
201 201 except KeyError:
202 202 raise
203 203 else:
204 204 h = msg_or_header
205 205 if not isinstance(h, dict):
206 206 h = dict(h)
207 207 return h
208 208
209 209 class Session(Configurable):
210 210 """Object for handling serialization and sending of messages.
211 211
212 212 The Session object handles building messages and sending them
213 213 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
214 214 other over the network via Session objects, and only need to work with the
215 215 dict-based IPython message spec. The Session will handle
216 216 serialization/deserialization, security, and metadata.
217 217
218 218 Sessions support configurable serialiization via packer/unpacker traits,
219 219 and signing with HMAC digests via the key/keyfile traits.
220 220
221 221 Parameters
222 222 ----------
223 223
224 224 debug : bool
225 225 whether to trigger extra debugging statements
226 226 packer/unpacker : str : 'json', 'pickle' or import_string
227 227 importstrings for methods to serialize message parts. If just
228 228 'json' or 'pickle', predefined JSON and pickle packers will be used.
229 229 Otherwise, the entire importstring must be used.
230 230
231 231 The functions must accept at least valid JSON input, and output *bytes*.
232 232
233 233 For example, to use msgpack:
234 234 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
235 235 pack/unpack : callables
236 236 You can also set the pack/unpack callables for serialization directly.
237 237 session : bytes
238 238 the ID of this Session object. The default is to generate a new UUID.
239 239 username : unicode
240 240 username added to message headers. The default is to ask the OS.
241 241 key : bytes
242 242 The key used to initialize an HMAC signature. If unset, messages
243 243 will not be signed or checked.
244 244 keyfile : filepath
245 245 The file containing a key. If this is set, `key` will be initialized
246 246 to the contents of the file.
247 247
248 248 """
249 249
250 250 debug=Bool(False, config=True, help="""Debug output in the Session""")
251 251
252 252 packer = DottedObjectName('json',config=True,
253 253 help="""The name of the packer for serializing messages.
254 254 Should be one of 'json', 'pickle', or an import name
255 255 for a custom callable serializer.""")
256 256 def _packer_changed(self, name, old, new):
257 257 if new.lower() == 'json':
258 258 self.pack = json_packer
259 259 self.unpack = json_unpacker
260 260 elif new.lower() == 'pickle':
261 261 self.pack = pickle_packer
262 262 self.unpack = pickle_unpacker
263 263 else:
264 264 self.pack = import_item(str(new))
265 265
266 266 unpacker = DottedObjectName('json', config=True,
267 267 help="""The name of the unpacker for unserializing messages.
268 268 Only used with custom functions for `packer`.""")
269 269 def _unpacker_changed(self, name, old, new):
270 270 if new.lower() == 'json':
271 271 self.pack = json_packer
272 272 self.unpack = json_unpacker
273 273 elif new.lower() == 'pickle':
274 274 self.pack = pickle_packer
275 275 self.unpack = pickle_unpacker
276 276 else:
277 277 self.unpack = import_item(str(new))
278 278
279 279 session = CUnicode(u'', config=True,
280 280 help="""The UUID identifying this session.""")
281 281 def _session_default(self):
282 282 u = unicode(uuid.uuid4())
283 283 self.bsession = u.encode('ascii')
284 284 return u
285 285
286 286 def _session_changed(self, name, old, new):
287 287 self.bsession = self.session.encode('ascii')
288 288
289 289 # bsession is the session as bytes
290 290 bsession = CBytes(b'')
291 291
292 292 username = Unicode(os.environ.get('USER',u'username'), config=True,
293 293 help="""Username for the Session. Default is your system username.""")
294 294
295 subheader = Dict({}, config=True,
296 help="""Subheader dictionary, which serves as the default subheader fields for each message.""")
297
295 298 # message signature related traits:
296 299
297 300 key = CBytes(b'', config=True,
298 301 help="""execution key, for extra authentication.""")
299 302 def _key_changed(self, name, old, new):
300 303 if new:
301 304 self.auth = hmac.HMAC(new)
302 305 else:
303 306 self.auth = None
304 307 auth = Instance(hmac.HMAC)
305 308 digest_history = Set()
306 309
307 310 keyfile = Unicode('', config=True,
308 311 help="""path to file containing execution key.""")
309 312 def _keyfile_changed(self, name, old, new):
310 313 with open(new, 'rb') as f:
311 314 self.key = f.read().strip()
312 315
313 316 # serialization traits:
314 317
315 318 pack = Any(default_packer) # the actual packer function
316 319 def _pack_changed(self, name, old, new):
317 320 if not callable(new):
318 321 raise TypeError("packer must be callable, not %s"%type(new))
319 322
320 323 unpack = Any(default_unpacker) # the actual packer function
321 324 def _unpack_changed(self, name, old, new):
322 325 # unpacker is not checked - it is assumed to be
323 326 if not callable(new):
324 327 raise TypeError("unpacker must be callable, not %s"%type(new))
325 328
326 329 def __init__(self, **kwargs):
327 330 """create a Session object
328 331
329 332 Parameters
330 333 ----------
331 334
332 335 debug : bool
333 336 whether to trigger extra debugging statements
334 337 packer/unpacker : str : 'json', 'pickle' or import_string
335 338 importstrings for methods to serialize message parts. If just
336 339 'json' or 'pickle', predefined JSON and pickle packers will be used.
337 340 Otherwise, the entire importstring must be used.
338 341
339 342 The functions must accept at least valid JSON input, and output
340 343 *bytes*.
341 344
342 345 For example, to use msgpack:
343 346 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
344 347 pack/unpack : callables
345 348 You can also set the pack/unpack callables for serialization
346 349 directly.
347 350 session : unicode (must be ascii)
348 351 the ID of this Session object. The default is to generate a new
349 352 UUID.
350 353 bsession : bytes
351 354 The session as bytes
352 355 username : unicode
353 356 username added to message headers. The default is to ask the OS.
354 357 key : bytes
355 358 The key used to initialize an HMAC signature. If unset, messages
356 359 will not be signed or checked.
357 360 keyfile : filepath
358 361 The file containing a key. If this is set, `key` will be
359 362 initialized to the contents of the file.
360 363 """
361 364 super(Session, self).__init__(**kwargs)
362 365 self._check_packers()
363 366 self.none = self.pack({})
364 367 # ensure self._session_default() if necessary, so bsession is defined:
365 368 self.session
366 369
367 370 @property
368 371 def msg_id(self):
369 372 """always return new uuid"""
370 373 return str(uuid.uuid4())
371 374
372 375 def _check_packers(self):
373 376 """check packers for binary data and datetime support."""
374 377 pack = self.pack
375 378 unpack = self.unpack
376 379
377 380 # check simple serialization
378 381 msg = dict(a=[1,'hi'])
379 382 try:
380 383 packed = pack(msg)
381 384 except Exception:
382 385 raise ValueError("packer could not serialize a simple message")
383 386
384 387 # ensure packed message is bytes
385 388 if not isinstance(packed, bytes):
386 389 raise ValueError("message packed to %r, but bytes are required"%type(packed))
387 390
388 391 # check that unpack is pack's inverse
389 392 try:
390 393 unpacked = unpack(packed)
391 394 except Exception:
392 395 raise ValueError("unpacker could not handle the packer's output")
393 396
394 397 # check datetime support
395 398 msg = dict(t=datetime.now())
396 399 try:
397 400 unpacked = unpack(pack(msg))
398 401 except Exception:
399 402 self.pack = lambda o: pack(squash_dates(o))
400 403 self.unpack = lambda s: extract_dates(unpack(s))
401 404
402 405 def msg_header(self, msg_type):
403 406 return msg_header(self.msg_id, msg_type, self.username, self.session)
404 407
405 408 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
406 409 """Return the nested message dict.
407 410
408 411 This format is different from what is sent over the wire. The
409 412 serialize/unserialize methods converts this nested message dict to the wire
410 413 format, which is a list of message parts.
411 414 """
412 415 msg = {}
413 416 header = self.msg_header(msg_type) if header is None else header
414 417 msg['header'] = header
415 418 msg['msg_id'] = header['msg_id']
416 419 msg['msg_type'] = header['msg_type']
417 420 msg['parent_header'] = {} if parent is None else extract_header(parent)
418 421 msg['content'] = {} if content is None else content
419 sub = {} if subheader is None else subheader
420 msg['header'].update(sub)
422 msg['header'].update(self.subheader)
423 if subheader is not None:
424 msg['header'].update(subheader)
421 425 return msg
422 426
423 427 def sign(self, msg_list):
424 428 """Sign a message with HMAC digest. If no auth, return b''.
425 429
426 430 Parameters
427 431 ----------
428 432 msg_list : list
429 433 The [p_header,p_parent,p_content] part of the message list.
430 434 """
431 435 if self.auth is None:
432 436 return b''
433 437 h = self.auth.copy()
434 438 for m in msg_list:
435 439 h.update(m)
436 440 return str_to_bytes(h.hexdigest())
437 441
438 442 def serialize(self, msg, ident=None):
439 443 """Serialize the message components to bytes.
440 444
441 445 This is roughly the inverse of unserialize. The serialize/unserialize
442 446 methods work with full message lists, whereas pack/unpack work with
443 447 the individual message parts in the message list.
444 448
445 449 Parameters
446 450 ----------
447 451 msg : dict or Message
448 452 The nexted message dict as returned by the self.msg method.
449 453
450 454 Returns
451 455 -------
452 456 msg_list : list
453 457 The list of bytes objects to be sent with the format:
454 458 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
455 459 buffer1,buffer2,...]. In this list, the p_* entities are
456 460 the packed or serialized versions, so if JSON is used, these
457 461 are utf8 encoded JSON strings.
458 462 """
459 463 content = msg.get('content', {})
460 464 if content is None:
461 465 content = self.none
462 466 elif isinstance(content, dict):
463 467 content = self.pack(content)
464 468 elif isinstance(content, bytes):
465 469 # content is already packed, as in a relayed message
466 470 pass
467 471 elif isinstance(content, unicode):
468 472 # should be bytes, but JSON often spits out unicode
469 473 content = content.encode('utf8')
470 474 else:
471 475 raise TypeError("Content incorrect type: %s"%type(content))
472 476
473 477 real_message = [self.pack(msg['header']),
474 478 self.pack(msg['parent_header']),
475 479 content
476 480 ]
477 481
478 482 to_send = []
479 483
480 484 if isinstance(ident, list):
481 485 # accept list of idents
482 486 to_send.extend(ident)
483 487 elif ident is not None:
484 488 to_send.append(ident)
485 489 to_send.append(DELIM)
486 490
487 491 signature = self.sign(real_message)
488 492 to_send.append(signature)
489 493
490 494 to_send.extend(real_message)
491 495
492 496 return to_send
493 497
494 498 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
495 499 buffers=None, subheader=None, track=False, header=None):
496 500 """Build and send a message via stream or socket.
497 501
498 502 The message format used by this function internally is as follows:
499 503
500 504 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
501 505 buffer1,buffer2,...]
502 506
503 507 The serialize/unserialize methods convert the nested message dict into this
504 508 format.
505 509
506 510 Parameters
507 511 ----------
508 512
509 513 stream : zmq.Socket or ZMQStream
510 514 The socket-like object used to send the data.
511 515 msg_or_type : str or Message/dict
512 516 Normally, msg_or_type will be a msg_type unless a message is being
513 517 sent more than once. If a header is supplied, this can be set to
514 518 None and the msg_type will be pulled from the header.
515 519
516 520 content : dict or None
517 521 The content of the message (ignored if msg_or_type is a message).
518 522 header : dict or None
519 523 The header dict for the message (ignores if msg_to_type is a message).
520 524 parent : Message or dict or None
521 525 The parent or parent header describing the parent of this message
522 526 (ignored if msg_or_type is a message).
523 527 ident : bytes or list of bytes
524 528 The zmq.IDENTITY routing path.
525 529 subheader : dict or None
526 530 Extra header keys for this message's header (ignored if msg_or_type
527 531 is a message).
528 532 buffers : list or None
529 533 The already-serialized buffers to be appended to the message.
530 534 track : bool
531 535 Whether to track. Only for use with Sockets, because ZMQStream
532 536 objects cannot track messages.
533 537
534 538 Returns
535 539 -------
536 540 msg : dict
537 541 The constructed message.
538 542 (msg,tracker) : (dict, MessageTracker)
539 543 if track=True, then a 2-tuple will be returned,
540 544 the first element being the constructed
541 545 message, and the second being the MessageTracker
542 546
543 547 """
544 548
545 549 if not isinstance(stream, (zmq.Socket, ZMQStream)):
546 550 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
547 551 elif track and isinstance(stream, ZMQStream):
548 552 raise TypeError("ZMQStream cannot track messages")
549 553
550 554 if isinstance(msg_or_type, (Message, dict)):
551 555 # We got a Message or message dict, not a msg_type so don't
552 556 # build a new Message.
553 557 msg = msg_or_type
554 558 else:
555 559 msg = self.msg(msg_or_type, content=content, parent=parent,
556 560 subheader=subheader, header=header)
557 561
558 562 buffers = [] if buffers is None else buffers
559 563 to_send = self.serialize(msg, ident)
560 564 flag = 0
561 565 if buffers:
562 566 flag = zmq.SNDMORE
563 567 _track = False
564 568 else:
565 569 _track=track
566 570 if track:
567 571 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
568 572 else:
569 573 tracker = stream.send_multipart(to_send, flag, copy=False)
570 574 for b in buffers[:-1]:
571 575 stream.send(b, flag, copy=False)
572 576 if buffers:
573 577 if track:
574 578 tracker = stream.send(buffers[-1], copy=False, track=track)
575 579 else:
576 580 tracker = stream.send(buffers[-1], copy=False)
577 581
578 582 # omsg = Message(msg)
579 583 if self.debug:
580 584 pprint.pprint(msg)
581 585 pprint.pprint(to_send)
582 586 pprint.pprint(buffers)
583 587
584 588 msg['tracker'] = tracker
585 589
586 590 return msg
587 591
588 592 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
589 593 """Send a raw message via ident path.
590 594
591 595 This method is used to send a already serialized message.
592 596
593 597 Parameters
594 598 ----------
595 599 stream : ZMQStream or Socket
596 600 The ZMQ stream or socket to use for sending the message.
597 601 msg_list : list
598 602 The serialized list of messages to send. This only includes the
599 603 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
600 604 the message.
601 605 ident : ident or list
602 606 A single ident or a list of idents to use in sending.
603 607 """
604 608 to_send = []
605 609 if isinstance(ident, bytes):
606 610 ident = [ident]
607 611 if ident is not None:
608 612 to_send.extend(ident)
609 613
610 614 to_send.append(DELIM)
611 615 to_send.append(self.sign(msg_list))
612 616 to_send.extend(msg_list)
613 617 stream.send_multipart(msg_list, flags, copy=copy)
614 618
615 619 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
616 620 """Receive and unpack a message.
617 621
618 622 Parameters
619 623 ----------
620 624 socket : ZMQStream or Socket
621 625 The socket or stream to use in receiving.
622 626
623 627 Returns
624 628 -------
625 629 [idents], msg
626 630 [idents] is a list of idents and msg is a nested message dict of
627 631 same format as self.msg returns.
628 632 """
629 633 if isinstance(socket, ZMQStream):
630 634 socket = socket.socket
631 635 try:
632 636 msg_list = socket.recv_multipart(mode, copy=copy)
633 637 except zmq.ZMQError as e:
634 638 if e.errno == zmq.EAGAIN:
635 639 # We can convert EAGAIN to None as we know in this case
636 640 # recv_multipart won't return None.
637 641 return None,None
638 642 else:
639 643 raise
640 644 # split multipart message into identity list and message dict
641 645 # invalid large messages can cause very expensive string comparisons
642 646 idents, msg_list = self.feed_identities(msg_list, copy)
643 647 try:
644 648 return idents, self.unserialize(msg_list, content=content, copy=copy)
645 649 except Exception as e:
646 650 # TODO: handle it
647 651 raise e
648 652
649 653 def feed_identities(self, msg_list, copy=True):
650 654 """Split the identities from the rest of the message.
651 655
652 656 Feed until DELIM is reached, then return the prefix as idents and
653 657 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
654 658 but that would be silly.
655 659
656 660 Parameters
657 661 ----------
658 662 msg_list : a list of Message or bytes objects
659 663 The message to be split.
660 664 copy : bool
661 665 flag determining whether the arguments are bytes or Messages
662 666
663 667 Returns
664 668 -------
665 669 (idents, msg_list) : two lists
666 670 idents will always be a list of bytes, each of which is a ZMQ
667 671 identity. msg_list will be a list of bytes or zmq.Messages of the
668 672 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
669 673 should be unpackable/unserializable via self.unserialize at this
670 674 point.
671 675 """
672 676 if copy:
673 677 idx = msg_list.index(DELIM)
674 678 return msg_list[:idx], msg_list[idx+1:]
675 679 else:
676 680 failed = True
677 681 for idx,m in enumerate(msg_list):
678 682 if m.bytes == DELIM:
679 683 failed = False
680 684 break
681 685 if failed:
682 686 raise ValueError("DELIM not in msg_list")
683 687 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
684 688 return [m.bytes for m in idents], msg_list
685 689
686 690 def unserialize(self, msg_list, content=True, copy=True):
687 691 """Unserialize a msg_list to a nested message dict.
688 692
689 693 This is roughly the inverse of serialize. The serialize/unserialize
690 694 methods work with full message lists, whereas pack/unpack work with
691 695 the individual message parts in the message list.
692 696
693 697 Parameters:
694 698 -----------
695 699 msg_list : list of bytes or Message objects
696 700 The list of message parts of the form [HMAC,p_header,p_parent,
697 701 p_content,buffer1,buffer2,...].
698 702 content : bool (True)
699 703 Whether to unpack the content dict (True), or leave it packed
700 704 (False).
701 705 copy : bool (True)
702 706 Whether to return the bytes (True), or the non-copying Message
703 707 object in each place (False).
704 708
705 709 Returns
706 710 -------
707 711 msg : dict
708 712 The nested message dict with top-level keys [header, parent_header,
709 713 content, buffers].
710 714 """
711 715 minlen = 4
712 716 message = {}
713 717 if not copy:
714 718 for i in range(minlen):
715 719 msg_list[i] = msg_list[i].bytes
716 720 if self.auth is not None:
717 721 signature = msg_list[0]
718 722 if not signature:
719 723 raise ValueError("Unsigned Message")
720 724 if signature in self.digest_history:
721 725 raise ValueError("Duplicate Signature: %r"%signature)
722 726 self.digest_history.add(signature)
723 727 check = self.sign(msg_list[1:4])
724 728 if not signature == check:
725 729 raise ValueError("Invalid Signature: %r"%signature)
726 730 if not len(msg_list) >= minlen:
727 731 raise TypeError("malformed message, must have at least %i elements"%minlen)
728 732 header = self.unpack(msg_list[1])
729 733 message['header'] = header
730 734 message['msg_id'] = header['msg_id']
731 735 message['msg_type'] = header['msg_type']
732 736 message['parent_header'] = self.unpack(msg_list[2])
733 737 if content:
734 738 message['content'] = self.unpack(msg_list[3])
735 739 else:
736 740 message['content'] = msg_list[3]
737 741
738 742 message['buffers'] = msg_list[4:]
739 743 return message
740 744
741 745 def test_msg2obj():
742 746 am = dict(x=1)
743 747 ao = Message(am)
744 748 assert ao.x == am['x']
745 749
746 750 am['y'] = dict(z=1)
747 751 ao = Message(am)
748 752 assert ao.y.z == am['y']['z']
749 753
750 754 k1, k2 = 'y', 'z'
751 755 assert ao[k1][k2] == am[k1][k2]
752 756
753 757 am2 = dict(ao)
754 758 assert am['x'] == am2['x']
755 759 assert am['y']['z'] == am2['y']['z']
756 760
General Comments 0
You need to be logged in to leave comments. Login now