##// END OF EJS Templates
Start protocol version from 4.0
Takafumi Arakaki -
Show More
@@ -1,769 +1,769 b''
1 1 """Session object for building, serializing, sending, and receiving messages in
2 2 IPython. The Session object supports serialization, HMAC signatures, and
3 3 metadata on messages.
4 4
5 5 Also defined here are utilities for working with Sessions:
6 6 * A SessionFactory to be used as a base class for configurables that work with
7 7 Sessions.
8 8 * A Message object for convenience that allows attribute-access to the msg dict.
9 9
10 10 Authors:
11 11
12 12 * Min RK
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 """
16 16 #-----------------------------------------------------------------------------
17 17 # Copyright (C) 2010-2011 The IPython Development Team
18 18 #
19 19 # Distributed under the terms of the BSD License. The full license is in
20 20 # the file COPYING, distributed as part of this software.
21 21 #-----------------------------------------------------------------------------
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Imports
25 25 #-----------------------------------------------------------------------------
26 26
27 27 import hmac
28 28 import logging
29 29 import os
30 30 import pprint
31 31 import uuid
32 32 from datetime import datetime
33 33
34 34 try:
35 35 import cPickle
36 36 pickle = cPickle
37 37 except:
38 38 cPickle = None
39 39 import pickle
40 40
41 41 import zmq
42 42 from zmq.utils import jsonapi
43 43 from zmq.eventloop.ioloop import IOLoop
44 44 from zmq.eventloop.zmqstream import ZMQStream
45 45
46 46 from IPython.config.application import Application, boolean_flag
47 47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 48 from IPython.utils.importstring import import_item
49 49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 50 from IPython.utils.py3compat import str_to_bytes
51 51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 52 DottedObjectName, CUnicode, Dict, Integer)
53 53 from IPython.zmq.serialize import MAX_ITEMS, MAX_BYTES
54 54
55 55 #-----------------------------------------------------------------------------
56 56 # utility functions
57 57 #-----------------------------------------------------------------------------
58 58
59 59 def squash_unicode(obj):
60 60 """coerce unicode back to bytestrings."""
61 61 if isinstance(obj,dict):
62 62 for key in obj.keys():
63 63 obj[key] = squash_unicode(obj[key])
64 64 if isinstance(key, unicode):
65 65 obj[squash_unicode(key)] = obj.pop(key)
66 66 elif isinstance(obj, list):
67 67 for i,v in enumerate(obj):
68 68 obj[i] = squash_unicode(v)
69 69 elif isinstance(obj, unicode):
70 70 obj = obj.encode('utf8')
71 71 return obj
72 72
73 73 #-----------------------------------------------------------------------------
74 74 # globals and defaults
75 75 #-----------------------------------------------------------------------------
76 76
77 77 # Change this when incrementing the kernel protocol version
78 protocol_version = [1, 1]
78 protocol_version = [4, 0]
79 79
80 80 # ISO8601-ify datetime objects
81 81 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
82 82 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
83 83
84 84 pickle_packer = lambda o: pickle.dumps(o,-1)
85 85 pickle_unpacker = pickle.loads
86 86
87 87 default_packer = json_packer
88 88 default_unpacker = json_unpacker
89 89
90 90 DELIM = b"<IDS|MSG>"
91 91 # singleton dummy tracker, which will always report as done
92 92 DONE = zmq.MessageTracker()
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # Mixin tools for apps that use Sessions
96 96 #-----------------------------------------------------------------------------
97 97
98 98 session_aliases = dict(
99 99 ident = 'Session.session',
100 100 user = 'Session.username',
101 101 keyfile = 'Session.keyfile',
102 102 )
103 103
104 104 session_flags = {
105 105 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
106 106 'keyfile' : '' }},
107 107 """Use HMAC digests for authentication of messages.
108 108 Setting this flag will generate a new UUID to use as the HMAC key.
109 109 """),
110 110 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
111 111 """Don't authenticate messages."""),
112 112 }
113 113
114 114 def default_secure(cfg):
115 115 """Set the default behavior for a config environment to be secure.
116 116
117 117 If Session.key/keyfile have not been set, set Session.key to
118 118 a new random UUID.
119 119 """
120 120
121 121 if 'Session' in cfg:
122 122 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
123 123 return
124 124 # key/keyfile not specified, generate new UUID:
125 125 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
126 126
127 127
128 128 #-----------------------------------------------------------------------------
129 129 # Classes
130 130 #-----------------------------------------------------------------------------
131 131
132 132 class SessionFactory(LoggingConfigurable):
133 133 """The Base class for configurables that have a Session, Context, logger,
134 134 and IOLoop.
135 135 """
136 136
137 137 logname = Unicode('')
138 138 def _logname_changed(self, name, old, new):
139 139 self.log = logging.getLogger(new)
140 140
141 141 # not configurable:
142 142 context = Instance('zmq.Context')
143 143 def _context_default(self):
144 144 return zmq.Context.instance()
145 145
146 146 session = Instance('IPython.zmq.session.Session')
147 147
148 148 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
149 149 def _loop_default(self):
150 150 return IOLoop.instance()
151 151
152 152 def __init__(self, **kwargs):
153 153 super(SessionFactory, self).__init__(**kwargs)
154 154
155 155 if self.session is None:
156 156 # construct the session
157 157 self.session = Session(**kwargs)
158 158
159 159
160 160 class Message(object):
161 161 """A simple message object that maps dict keys to attributes.
162 162
163 163 A Message can be created from a dict and a dict from a Message instance
164 164 simply by calling dict(msg_obj)."""
165 165
166 166 def __init__(self, msg_dict):
167 167 dct = self.__dict__
168 168 for k, v in dict(msg_dict).iteritems():
169 169 if isinstance(v, dict):
170 170 v = Message(v)
171 171 dct[k] = v
172 172
173 173 # Having this iterator lets dict(msg_obj) work out of the box.
174 174 def __iter__(self):
175 175 return iter(self.__dict__.iteritems())
176 176
177 177 def __repr__(self):
178 178 return repr(self.__dict__)
179 179
180 180 def __str__(self):
181 181 return pprint.pformat(self.__dict__)
182 182
183 183 def __contains__(self, k):
184 184 return k in self.__dict__
185 185
186 186 def __getitem__(self, k):
187 187 return self.__dict__[k]
188 188
189 189
190 190 def msg_header(msg_id, msg_type, username, session):
191 191 date = datetime.now()
192 192 return locals()
193 193
194 194 def extract_header(msg_or_header):
195 195 """Given a message or header, return the header."""
196 196 if not msg_or_header:
197 197 return {}
198 198 try:
199 199 # See if msg_or_header is the entire message.
200 200 h = msg_or_header['header']
201 201 except KeyError:
202 202 try:
203 203 # See if msg_or_header is just the header
204 204 h = msg_or_header['msg_id']
205 205 except KeyError:
206 206 raise
207 207 else:
208 208 h = msg_or_header
209 209 if not isinstance(h, dict):
210 210 h = dict(h)
211 211 return h
212 212
213 213 class Session(Configurable):
214 214 """Object for handling serialization and sending of messages.
215 215
216 216 The Session object handles building messages and sending them
217 217 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
218 218 other over the network via Session objects, and only need to work with the
219 219 dict-based IPython message spec. The Session will handle
220 220 serialization/deserialization, security, and metadata.
221 221
222 222 Sessions support configurable serialiization via packer/unpacker traits,
223 223 and signing with HMAC digests via the key/keyfile traits.
224 224
225 225 Parameters
226 226 ----------
227 227
228 228 debug : bool
229 229 whether to trigger extra debugging statements
230 230 packer/unpacker : str : 'json', 'pickle' or import_string
231 231 importstrings for methods to serialize message parts. If just
232 232 'json' or 'pickle', predefined JSON and pickle packers will be used.
233 233 Otherwise, the entire importstring must be used.
234 234
235 235 The functions must accept at least valid JSON input, and output *bytes*.
236 236
237 237 For example, to use msgpack:
238 238 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
239 239 pack/unpack : callables
240 240 You can also set the pack/unpack callables for serialization directly.
241 241 session : bytes
242 242 the ID of this Session object. The default is to generate a new UUID.
243 243 username : unicode
244 244 username added to message headers. The default is to ask the OS.
245 245 key : bytes
246 246 The key used to initialize an HMAC signature. If unset, messages
247 247 will not be signed or checked.
248 248 keyfile : filepath
249 249 The file containing a key. If this is set, `key` will be initialized
250 250 to the contents of the file.
251 251
252 252 """
253 253
254 254 debug=Bool(False, config=True, help="""Debug output in the Session""")
255 255
256 256 packer = DottedObjectName('json',config=True,
257 257 help="""The name of the packer for serializing messages.
258 258 Should be one of 'json', 'pickle', or an import name
259 259 for a custom callable serializer.""")
260 260 def _packer_changed(self, name, old, new):
261 261 if new.lower() == 'json':
262 262 self.pack = json_packer
263 263 self.unpack = json_unpacker
264 264 self.unpacker = new
265 265 elif new.lower() == 'pickle':
266 266 self.pack = pickle_packer
267 267 self.unpack = pickle_unpacker
268 268 self.unpacker = new
269 269 else:
270 270 self.pack = import_item(str(new))
271 271
272 272 unpacker = DottedObjectName('json', config=True,
273 273 help="""The name of the unpacker for unserializing messages.
274 274 Only used with custom functions for `packer`.""")
275 275 def _unpacker_changed(self, name, old, new):
276 276 if new.lower() == 'json':
277 277 self.pack = json_packer
278 278 self.unpack = json_unpacker
279 279 self.packer = new
280 280 elif new.lower() == 'pickle':
281 281 self.pack = pickle_packer
282 282 self.unpack = pickle_unpacker
283 283 self.packer = new
284 284 else:
285 285 self.unpack = import_item(str(new))
286 286
287 287 session = CUnicode(u'', config=True,
288 288 help="""The UUID identifying this session.""")
289 289 def _session_default(self):
290 290 u = unicode(uuid.uuid4())
291 291 self.bsession = u.encode('ascii')
292 292 return u
293 293
294 294 def _session_changed(self, name, old, new):
295 295 self.bsession = self.session.encode('ascii')
296 296
297 297 # bsession is the session as bytes
298 298 bsession = CBytes(b'')
299 299
300 300 username = Unicode(os.environ.get('USER',u'username'), config=True,
301 301 help="""Username for the Session. Default is your system username.""")
302 302
303 303 metadata = Dict({}, config=True,
304 304 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
305 305
306 306 # message signature related traits:
307 307
308 308 key = CBytes(b'', config=True,
309 309 help="""execution key, for extra authentication.""")
310 310 def _key_changed(self, name, old, new):
311 311 if new:
312 312 self.auth = hmac.HMAC(new)
313 313 else:
314 314 self.auth = None
315 315 auth = Instance(hmac.HMAC)
316 316 digest_history = Set()
317 317
318 318 keyfile = Unicode('', config=True,
319 319 help="""path to file containing execution key.""")
320 320 def _keyfile_changed(self, name, old, new):
321 321 with open(new, 'rb') as f:
322 322 self.key = f.read().strip()
323 323
324 324 # serialization traits:
325 325
326 326 pack = Any(default_packer) # the actual packer function
327 327 def _pack_changed(self, name, old, new):
328 328 if not callable(new):
329 329 raise TypeError("packer must be callable, not %s"%type(new))
330 330
331 331 unpack = Any(default_unpacker) # the actual packer function
332 332 def _unpack_changed(self, name, old, new):
333 333 # unpacker is not checked - it is assumed to be
334 334 if not callable(new):
335 335 raise TypeError("unpacker must be callable, not %s"%type(new))
336 336
337 337 # thresholds:
338 338 copy_threshold = Integer(2**16, config=True,
339 339 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
340 340 buffer_threshold = Integer(MAX_BYTES, config=True,
341 341 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
342 342 item_threshold = Integer(MAX_ITEMS, config=True,
343 343 help="""The maximum number of items for a container to be introspected for custom serialization.
344 344 Containers larger than this are pickled outright.
345 345 """
346 346 )
347 347
348 348 def __init__(self, **kwargs):
349 349 """create a Session object
350 350
351 351 Parameters
352 352 ----------
353 353
354 354 debug : bool
355 355 whether to trigger extra debugging statements
356 356 packer/unpacker : str : 'json', 'pickle' or import_string
357 357 importstrings for methods to serialize message parts. If just
358 358 'json' or 'pickle', predefined JSON and pickle packers will be used.
359 359 Otherwise, the entire importstring must be used.
360 360
361 361 The functions must accept at least valid JSON input, and output
362 362 *bytes*.
363 363
364 364 For example, to use msgpack:
365 365 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
366 366 pack/unpack : callables
367 367 You can also set the pack/unpack callables for serialization
368 368 directly.
369 369 session : unicode (must be ascii)
370 370 the ID of this Session object. The default is to generate a new
371 371 UUID.
372 372 bsession : bytes
373 373 The session as bytes
374 374 username : unicode
375 375 username added to message headers. The default is to ask the OS.
376 376 key : bytes
377 377 The key used to initialize an HMAC signature. If unset, messages
378 378 will not be signed or checked.
379 379 keyfile : filepath
380 380 The file containing a key. If this is set, `key` will be
381 381 initialized to the contents of the file.
382 382 """
383 383 super(Session, self).__init__(**kwargs)
384 384 self._check_packers()
385 385 self.none = self.pack({})
386 386 # ensure self._session_default() if necessary, so bsession is defined:
387 387 self.session
388 388
389 389 @property
390 390 def msg_id(self):
391 391 """always return new uuid"""
392 392 return str(uuid.uuid4())
393 393
394 394 def _check_packers(self):
395 395 """check packers for binary data and datetime support."""
396 396 pack = self.pack
397 397 unpack = self.unpack
398 398
399 399 # check simple serialization
400 400 msg = dict(a=[1,'hi'])
401 401 try:
402 402 packed = pack(msg)
403 403 except Exception:
404 404 raise ValueError("packer could not serialize a simple message")
405 405
406 406 # ensure packed message is bytes
407 407 if not isinstance(packed, bytes):
408 408 raise ValueError("message packed to %r, but bytes are required"%type(packed))
409 409
410 410 # check that unpack is pack's inverse
411 411 try:
412 412 unpacked = unpack(packed)
413 413 except Exception:
414 414 raise ValueError("unpacker could not handle the packer's output")
415 415
416 416 # check datetime support
417 417 msg = dict(t=datetime.now())
418 418 try:
419 419 unpacked = unpack(pack(msg))
420 420 except Exception:
421 421 self.pack = lambda o: pack(squash_dates(o))
422 422 self.unpack = lambda s: extract_dates(unpack(s))
423 423
424 424 def msg_header(self, msg_type):
425 425 return msg_header(self.msg_id, msg_type, self.username, self.session)
426 426
427 427 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
428 428 """Return the nested message dict.
429 429
430 430 This format is different from what is sent over the wire. The
431 431 serialize/unserialize methods converts this nested message dict to the wire
432 432 format, which is a list of message parts.
433 433 """
434 434 msg = {}
435 435 header = self.msg_header(msg_type) if header is None else header
436 436 msg['header'] = header
437 437 msg['msg_id'] = header['msg_id']
438 438 msg['msg_type'] = header['msg_type']
439 439 msg['parent_header'] = {} if parent is None else extract_header(parent)
440 440 msg['content'] = {} if content is None else content
441 441 msg['metadata'] = self.metadata.copy()
442 442 if metadata is not None:
443 443 msg['metadata'].update(metadata)
444 444 return msg
445 445
446 446 def sign(self, msg_list):
447 447 """Sign a message with HMAC digest. If no auth, return b''.
448 448
449 449 Parameters
450 450 ----------
451 451 msg_list : list
452 452 The [p_header,p_parent,p_content] part of the message list.
453 453 """
454 454 if self.auth is None:
455 455 return b''
456 456 h = self.auth.copy()
457 457 for m in msg_list:
458 458 h.update(m)
459 459 return str_to_bytes(h.hexdigest())
460 460
461 461 def serialize(self, msg, ident=None):
462 462 """Serialize the message components to bytes.
463 463
464 464 This is roughly the inverse of unserialize. The serialize/unserialize
465 465 methods work with full message lists, whereas pack/unpack work with
466 466 the individual message parts in the message list.
467 467
468 468 Parameters
469 469 ----------
470 470 msg : dict or Message
471 471 The nexted message dict as returned by the self.msg method.
472 472
473 473 Returns
474 474 -------
475 475 msg_list : list
476 476 The list of bytes objects to be sent with the format:
477 477 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_metadata,p_content,
478 478 buffer1,buffer2,...]. In this list, the p_* entities are
479 479 the packed or serialized versions, so if JSON is used, these
480 480 are utf8 encoded JSON strings.
481 481 """
482 482 content = msg.get('content', {})
483 483 if content is None:
484 484 content = self.none
485 485 elif isinstance(content, dict):
486 486 content = self.pack(content)
487 487 elif isinstance(content, bytes):
488 488 # content is already packed, as in a relayed message
489 489 pass
490 490 elif isinstance(content, unicode):
491 491 # should be bytes, but JSON often spits out unicode
492 492 content = content.encode('utf8')
493 493 else:
494 494 raise TypeError("Content incorrect type: %s"%type(content))
495 495
496 496 real_message = [self.pack(msg['header']),
497 497 self.pack(msg['parent_header']),
498 498 self.pack(msg['metadata']),
499 499 content,
500 500 ]
501 501
502 502 to_send = []
503 503
504 504 if isinstance(ident, list):
505 505 # accept list of idents
506 506 to_send.extend(ident)
507 507 elif ident is not None:
508 508 to_send.append(ident)
509 509 to_send.append(DELIM)
510 510
511 511 signature = self.sign(real_message)
512 512 to_send.append(signature)
513 513
514 514 to_send.extend(real_message)
515 515
516 516 return to_send
517 517
518 518 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
519 519 buffers=None, track=False, header=None, metadata=None):
520 520 """Build and send a message via stream or socket.
521 521
522 522 The message format used by this function internally is as follows:
523 523
524 524 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
525 525 buffer1,buffer2,...]
526 526
527 527 The serialize/unserialize methods convert the nested message dict into this
528 528 format.
529 529
530 530 Parameters
531 531 ----------
532 532
533 533 stream : zmq.Socket or ZMQStream
534 534 The socket-like object used to send the data.
535 535 msg_or_type : str or Message/dict
536 536 Normally, msg_or_type will be a msg_type unless a message is being
537 537 sent more than once. If a header is supplied, this can be set to
538 538 None and the msg_type will be pulled from the header.
539 539
540 540 content : dict or None
541 541 The content of the message (ignored if msg_or_type is a message).
542 542 header : dict or None
543 543 The header dict for the message (ignored if msg_to_type is a message).
544 544 parent : Message or dict or None
545 545 The parent or parent header describing the parent of this message
546 546 (ignored if msg_or_type is a message).
547 547 ident : bytes or list of bytes
548 548 The zmq.IDENTITY routing path.
549 549 metadata : dict or None
550 550 The metadata describing the message
551 551 buffers : list or None
552 552 The already-serialized buffers to be appended to the message.
553 553 track : bool
554 554 Whether to track. Only for use with Sockets, because ZMQStream
555 555 objects cannot track messages.
556 556
557 557
558 558 Returns
559 559 -------
560 560 msg : dict
561 561 The constructed message.
562 562 """
563 563
564 564 if not isinstance(stream, (zmq.Socket, ZMQStream)):
565 565 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
566 566 elif track and isinstance(stream, ZMQStream):
567 567 raise TypeError("ZMQStream cannot track messages")
568 568
569 569 if isinstance(msg_or_type, (Message, dict)):
570 570 # We got a Message or message dict, not a msg_type so don't
571 571 # build a new Message.
572 572 msg = msg_or_type
573 573 else:
574 574 msg = self.msg(msg_or_type, content=content, parent=parent,
575 575 header=header, metadata=metadata)
576 576
577 577 buffers = [] if buffers is None else buffers
578 578 to_send = self.serialize(msg, ident)
579 579 to_send.extend(buffers)
580 580 longest = max([ len(s) for s in to_send ])
581 581 copy = (longest < self.copy_threshold)
582 582
583 583 if buffers and track and not copy:
584 584 # only really track when we are doing zero-copy buffers
585 585 tracker = stream.send_multipart(to_send, copy=False, track=True)
586 586 else:
587 587 # use dummy tracker, which will be done immediately
588 588 tracker = DONE
589 589 stream.send_multipart(to_send, copy=copy)
590 590
591 591 if self.debug:
592 592 pprint.pprint(msg)
593 593 pprint.pprint(to_send)
594 594 pprint.pprint(buffers)
595 595
596 596 msg['tracker'] = tracker
597 597
598 598 return msg
599 599
600 600 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
601 601 """Send a raw message via ident path.
602 602
603 603 This method is used to send a already serialized message.
604 604
605 605 Parameters
606 606 ----------
607 607 stream : ZMQStream or Socket
608 608 The ZMQ stream or socket to use for sending the message.
609 609 msg_list : list
610 610 The serialized list of messages to send. This only includes the
611 611 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
612 612 the message.
613 613 ident : ident or list
614 614 A single ident or a list of idents to use in sending.
615 615 """
616 616 to_send = []
617 617 if isinstance(ident, bytes):
618 618 ident = [ident]
619 619 if ident is not None:
620 620 to_send.extend(ident)
621 621
622 622 to_send.append(DELIM)
623 623 to_send.append(self.sign(msg_list))
624 624 to_send.extend(msg_list)
625 625 stream.send_multipart(msg_list, flags, copy=copy)
626 626
627 627 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
628 628 """Receive and unpack a message.
629 629
630 630 Parameters
631 631 ----------
632 632 socket : ZMQStream or Socket
633 633 The socket or stream to use in receiving.
634 634
635 635 Returns
636 636 -------
637 637 [idents], msg
638 638 [idents] is a list of idents and msg is a nested message dict of
639 639 same format as self.msg returns.
640 640 """
641 641 if isinstance(socket, ZMQStream):
642 642 socket = socket.socket
643 643 try:
644 644 msg_list = socket.recv_multipart(mode, copy=copy)
645 645 except zmq.ZMQError as e:
646 646 if e.errno == zmq.EAGAIN:
647 647 # We can convert EAGAIN to None as we know in this case
648 648 # recv_multipart won't return None.
649 649 return None,None
650 650 else:
651 651 raise
652 652 # split multipart message into identity list and message dict
653 653 # invalid large messages can cause very expensive string comparisons
654 654 idents, msg_list = self.feed_identities(msg_list, copy)
655 655 try:
656 656 return idents, self.unserialize(msg_list, content=content, copy=copy)
657 657 except Exception as e:
658 658 # TODO: handle it
659 659 raise e
660 660
661 661 def feed_identities(self, msg_list, copy=True):
662 662 """Split the identities from the rest of the message.
663 663
664 664 Feed until DELIM is reached, then return the prefix as idents and
665 665 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
666 666 but that would be silly.
667 667
668 668 Parameters
669 669 ----------
670 670 msg_list : a list of Message or bytes objects
671 671 The message to be split.
672 672 copy : bool
673 673 flag determining whether the arguments are bytes or Messages
674 674
675 675 Returns
676 676 -------
677 677 (idents, msg_list) : two lists
678 678 idents will always be a list of bytes, each of which is a ZMQ
679 679 identity. msg_list will be a list of bytes or zmq.Messages of the
680 680 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
681 681 should be unpackable/unserializable via self.unserialize at this
682 682 point.
683 683 """
684 684 if copy:
685 685 idx = msg_list.index(DELIM)
686 686 return msg_list[:idx], msg_list[idx+1:]
687 687 else:
688 688 failed = True
689 689 for idx,m in enumerate(msg_list):
690 690 if m.bytes == DELIM:
691 691 failed = False
692 692 break
693 693 if failed:
694 694 raise ValueError("DELIM not in msg_list")
695 695 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
696 696 return [m.bytes for m in idents], msg_list
697 697
698 698 def unserialize(self, msg_list, content=True, copy=True):
699 699 """Unserialize a msg_list to a nested message dict.
700 700
701 701 This is roughly the inverse of serialize. The serialize/unserialize
702 702 methods work with full message lists, whereas pack/unpack work with
703 703 the individual message parts in the message list.
704 704
705 705 Parameters:
706 706 -----------
707 707 msg_list : list of bytes or Message objects
708 708 The list of message parts of the form [HMAC,p_header,p_parent,
709 709 p_metadata,p_content,buffer1,buffer2,...].
710 710 content : bool (True)
711 711 Whether to unpack the content dict (True), or leave it packed
712 712 (False).
713 713 copy : bool (True)
714 714 Whether to return the bytes (True), or the non-copying Message
715 715 object in each place (False).
716 716
717 717 Returns
718 718 -------
719 719 msg : dict
720 720 The nested message dict with top-level keys [header, parent_header,
721 721 content, buffers].
722 722 """
723 723 minlen = 5
724 724 message = {}
725 725 if not copy:
726 726 for i in range(minlen):
727 727 msg_list[i] = msg_list[i].bytes
728 728 if self.auth is not None:
729 729 signature = msg_list[0]
730 730 if not signature:
731 731 raise ValueError("Unsigned Message")
732 732 if signature in self.digest_history:
733 733 raise ValueError("Duplicate Signature: %r"%signature)
734 734 self.digest_history.add(signature)
735 735 check = self.sign(msg_list[1:5])
736 736 if not signature == check:
737 737 raise ValueError("Invalid Signature: %r" % signature)
738 738 if not len(msg_list) >= minlen:
739 739 raise TypeError("malformed message, must have at least %i elements"%minlen)
740 740 header = self.unpack(msg_list[1])
741 741 message['header'] = header
742 742 message['msg_id'] = header['msg_id']
743 743 message['msg_type'] = header['msg_type']
744 744 message['parent_header'] = self.unpack(msg_list[2])
745 745 message['metadata'] = self.unpack(msg_list[3])
746 746 if content:
747 747 message['content'] = self.unpack(msg_list[4])
748 748 else:
749 749 message['content'] = msg_list[4]
750 750
751 751 message['buffers'] = msg_list[5:]
752 752 return message
753 753
754 754 def test_msg2obj():
755 755 am = dict(x=1)
756 756 ao = Message(am)
757 757 assert ao.x == am['x']
758 758
759 759 am['y'] = dict(z=1)
760 760 ao = Message(am)
761 761 assert ao.y.z == am['y']['z']
762 762
763 763 k1, k2 = 'y', 'z'
764 764 assert ao[k1][k2] == am[k1][k2]
765 765
766 766 am2 = dict(ao)
767 767 assert am['x'] == am2['x']
768 768 assert am['y']['z'] == am2['y']['z']
769 769
General Comments 0
You need to be logged in to leave comments. Login now