##// END OF EJS Templates
add IPython version to session header...
MinRK -
Show More
@@ -1,767 +1,769 b''
1 1 """Session object for building, serializing, sending, and receiving messages in
2 2 IPython. The Session object supports serialization, HMAC signatures, and
3 3 metadata on messages.
4 4
5 5 Also defined here are utilities for working with Sessions:
6 6 * A SessionFactory to be used as a base class for configurables that work with
7 7 Sessions.
8 8 * A Message object for convenience that allows attribute-access to the msg dict.
9 9
10 10 Authors:
11 11
12 12 * Min RK
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 """
16 16 #-----------------------------------------------------------------------------
17 17 # Copyright (C) 2010-2011 The IPython Development Team
18 18 #
19 19 # Distributed under the terms of the BSD License. The full license is in
20 20 # the file COPYING, distributed as part of this software.
21 21 #-----------------------------------------------------------------------------
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Imports
25 25 #-----------------------------------------------------------------------------
26 26
27 27 import hmac
28 28 import logging
29 29 import os
30 30 import pprint
31 31 import uuid
32 32 from datetime import datetime
33 33
34 34 try:
35 35 import cPickle
36 36 pickle = cPickle
37 37 except:
38 38 cPickle = None
39 39 import pickle
40 40
41 41 import zmq
42 42 from zmq.utils import jsonapi
43 43 from zmq.eventloop.ioloop import IOLoop
44 44 from zmq.eventloop.zmqstream import ZMQStream
45 45
46 import IPython
46 47 from IPython.config.application import Application, boolean_flag
47 48 from IPython.config.configurable import Configurable, LoggingConfigurable
48 49 from IPython.utils.importstring import import_item
49 50 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 51 from IPython.utils.py3compat import str_to_bytes
51 52 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 53 DottedObjectName, CUnicode, Dict, Integer)
53 54 from IPython.zmq.serialize import MAX_ITEMS, MAX_BYTES
54 55
55 56 #-----------------------------------------------------------------------------
56 57 # utility functions
57 58 #-----------------------------------------------------------------------------
58 59
59 60 def squash_unicode(obj):
60 61 """coerce unicode back to bytestrings."""
61 62 if isinstance(obj,dict):
62 63 for key in obj.keys():
63 64 obj[key] = squash_unicode(obj[key])
64 65 if isinstance(key, unicode):
65 66 obj[squash_unicode(key)] = obj.pop(key)
66 67 elif isinstance(obj, list):
67 68 for i,v in enumerate(obj):
68 69 obj[i] = squash_unicode(v)
69 70 elif isinstance(obj, unicode):
70 71 obj = obj.encode('utf8')
71 72 return obj
72 73
73 74 #-----------------------------------------------------------------------------
74 75 # globals and defaults
75 76 #-----------------------------------------------------------------------------
76 77
77
78 _version_info_list = list(IPython.version_info)
78 79 # ISO8601-ify datetime objects
79 80 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
80 81 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
81 82
82 83 pickle_packer = lambda o: pickle.dumps(o,-1)
83 84 pickle_unpacker = pickle.loads
84 85
85 86 default_packer = json_packer
86 87 default_unpacker = json_unpacker
87 88
88 89 DELIM = b"<IDS|MSG>"
89 90 # singleton dummy tracker, which will always report as done
90 91 DONE = zmq.MessageTracker()
91 92
92 93 #-----------------------------------------------------------------------------
93 94 # Mixin tools for apps that use Sessions
94 95 #-----------------------------------------------------------------------------
95 96
96 97 session_aliases = dict(
97 98 ident = 'Session.session',
98 99 user = 'Session.username',
99 100 keyfile = 'Session.keyfile',
100 101 )
101 102
102 103 session_flags = {
103 104 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
104 105 'keyfile' : '' }},
105 106 """Use HMAC digests for authentication of messages.
106 107 Setting this flag will generate a new UUID to use as the HMAC key.
107 108 """),
108 109 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
109 110 """Don't authenticate messages."""),
110 111 }
111 112
112 113 def default_secure(cfg):
113 114 """Set the default behavior for a config environment to be secure.
114 115
115 116 If Session.key/keyfile have not been set, set Session.key to
116 117 a new random UUID.
117 118 """
118 119
119 120 if 'Session' in cfg:
120 121 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
121 122 return
122 123 # key/keyfile not specified, generate new UUID:
123 124 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
124 125
125 126
126 127 #-----------------------------------------------------------------------------
127 128 # Classes
128 129 #-----------------------------------------------------------------------------
129 130
130 131 class SessionFactory(LoggingConfigurable):
131 132 """The Base class for configurables that have a Session, Context, logger,
132 133 and IOLoop.
133 134 """
134 135
135 136 logname = Unicode('')
136 137 def _logname_changed(self, name, old, new):
137 138 self.log = logging.getLogger(new)
138 139
139 140 # not configurable:
140 141 context = Instance('zmq.Context')
141 142 def _context_default(self):
142 143 return zmq.Context.instance()
143 144
144 145 session = Instance('IPython.zmq.session.Session')
145 146
146 147 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
147 148 def _loop_default(self):
148 149 return IOLoop.instance()
149 150
150 151 def __init__(self, **kwargs):
151 152 super(SessionFactory, self).__init__(**kwargs)
152 153
153 154 if self.session is None:
154 155 # construct the session
155 156 self.session = Session(**kwargs)
156 157
157 158
158 159 class Message(object):
159 160 """A simple message object that maps dict keys to attributes.
160 161
161 162 A Message can be created from a dict and a dict from a Message instance
162 163 simply by calling dict(msg_obj)."""
163 164
164 165 def __init__(self, msg_dict):
165 166 dct = self.__dict__
166 167 for k, v in dict(msg_dict).iteritems():
167 168 if isinstance(v, dict):
168 169 v = Message(v)
169 170 dct[k] = v
170 171
171 172 # Having this iterator lets dict(msg_obj) work out of the box.
172 173 def __iter__(self):
173 174 return iter(self.__dict__.iteritems())
174 175
175 176 def __repr__(self):
176 177 return repr(self.__dict__)
177 178
178 179 def __str__(self):
179 180 return pprint.pformat(self.__dict__)
180 181
181 182 def __contains__(self, k):
182 183 return k in self.__dict__
183 184
184 185 def __getitem__(self, k):
185 186 return self.__dict__[k]
186 187
187 188
188 189 def msg_header(msg_id, msg_type, username, session):
189 190 date = datetime.now()
191 version = _version_info_list
190 192 return locals()
191 193
192 194 def extract_header(msg_or_header):
193 195 """Given a message or header, return the header."""
194 196 if not msg_or_header:
195 197 return {}
196 198 try:
197 199 # See if msg_or_header is the entire message.
198 200 h = msg_or_header['header']
199 201 except KeyError:
200 202 try:
201 203 # See if msg_or_header is just the header
202 204 h = msg_or_header['msg_id']
203 205 except KeyError:
204 206 raise
205 207 else:
206 208 h = msg_or_header
207 209 if not isinstance(h, dict):
208 210 h = dict(h)
209 211 return h
210 212
211 213 class Session(Configurable):
212 214 """Object for handling serialization and sending of messages.
213 215
214 216 The Session object handles building messages and sending them
215 217 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
216 218 other over the network via Session objects, and only need to work with the
217 219 dict-based IPython message spec. The Session will handle
218 220 serialization/deserialization, security, and metadata.
219 221
220 222 Sessions support configurable serialiization via packer/unpacker traits,
221 223 and signing with HMAC digests via the key/keyfile traits.
222 224
223 225 Parameters
224 226 ----------
225 227
226 228 debug : bool
227 229 whether to trigger extra debugging statements
228 230 packer/unpacker : str : 'json', 'pickle' or import_string
229 231 importstrings for methods to serialize message parts. If just
230 232 'json' or 'pickle', predefined JSON and pickle packers will be used.
231 233 Otherwise, the entire importstring must be used.
232 234
233 235 The functions must accept at least valid JSON input, and output *bytes*.
234 236
235 237 For example, to use msgpack:
236 238 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
237 239 pack/unpack : callables
238 240 You can also set the pack/unpack callables for serialization directly.
239 241 session : bytes
240 242 the ID of this Session object. The default is to generate a new UUID.
241 243 username : unicode
242 244 username added to message headers. The default is to ask the OS.
243 245 key : bytes
244 246 The key used to initialize an HMAC signature. If unset, messages
245 247 will not be signed or checked.
246 248 keyfile : filepath
247 249 The file containing a key. If this is set, `key` will be initialized
248 250 to the contents of the file.
249 251
250 252 """
251 253
252 254 debug=Bool(False, config=True, help="""Debug output in the Session""")
253 255
254 256 packer = DottedObjectName('json',config=True,
255 257 help="""The name of the packer for serializing messages.
256 258 Should be one of 'json', 'pickle', or an import name
257 259 for a custom callable serializer.""")
258 260 def _packer_changed(self, name, old, new):
259 261 if new.lower() == 'json':
260 262 self.pack = json_packer
261 263 self.unpack = json_unpacker
262 264 self.unpacker = new
263 265 elif new.lower() == 'pickle':
264 266 self.pack = pickle_packer
265 267 self.unpack = pickle_unpacker
266 268 self.unpacker = new
267 269 else:
268 270 self.pack = import_item(str(new))
269 271
270 272 unpacker = DottedObjectName('json', config=True,
271 273 help="""The name of the unpacker for unserializing messages.
272 274 Only used with custom functions for `packer`.""")
273 275 def _unpacker_changed(self, name, old, new):
274 276 if new.lower() == 'json':
275 277 self.pack = json_packer
276 278 self.unpack = json_unpacker
277 279 self.packer = new
278 280 elif new.lower() == 'pickle':
279 281 self.pack = pickle_packer
280 282 self.unpack = pickle_unpacker
281 283 self.packer = new
282 284 else:
283 285 self.unpack = import_item(str(new))
284 286
285 287 session = CUnicode(u'', config=True,
286 288 help="""The UUID identifying this session.""")
287 289 def _session_default(self):
288 290 u = unicode(uuid.uuid4())
289 291 self.bsession = u.encode('ascii')
290 292 return u
291 293
292 294 def _session_changed(self, name, old, new):
293 295 self.bsession = self.session.encode('ascii')
294 296
295 297 # bsession is the session as bytes
296 298 bsession = CBytes(b'')
297 299
298 300 username = Unicode(os.environ.get('USER',u'username'), config=True,
299 301 help="""Username for the Session. Default is your system username.""")
300 302
301 303 metadata = Dict({}, config=True,
302 304 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
303 305
304 306 # message signature related traits:
305 307
306 308 key = CBytes(b'', config=True,
307 309 help="""execution key, for extra authentication.""")
308 310 def _key_changed(self, name, old, new):
309 311 if new:
310 312 self.auth = hmac.HMAC(new)
311 313 else:
312 314 self.auth = None
313 315 auth = Instance(hmac.HMAC)
314 316 digest_history = Set()
315 317
316 318 keyfile = Unicode('', config=True,
317 319 help="""path to file containing execution key.""")
318 320 def _keyfile_changed(self, name, old, new):
319 321 with open(new, 'rb') as f:
320 322 self.key = f.read().strip()
321 323
322 324 # serialization traits:
323 325
324 326 pack = Any(default_packer) # the actual packer function
325 327 def _pack_changed(self, name, old, new):
326 328 if not callable(new):
327 329 raise TypeError("packer must be callable, not %s"%type(new))
328 330
329 331 unpack = Any(default_unpacker) # the actual packer function
330 332 def _unpack_changed(self, name, old, new):
331 333 # unpacker is not checked - it is assumed to be
332 334 if not callable(new):
333 335 raise TypeError("unpacker must be callable, not %s"%type(new))
334 336
335 337 # thresholds:
336 338 copy_threshold = Integer(2**16, config=True,
337 339 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
338 340 buffer_threshold = Integer(MAX_BYTES, config=True,
339 341 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
340 342 item_threshold = Integer(MAX_ITEMS, config=True,
341 343 help="""The maximum number of items for a container to be introspected for custom serialization.
342 344 Containers larger than this are pickled outright.
343 345 """
344 346 )
345 347
346 348 def __init__(self, **kwargs):
347 349 """create a Session object
348 350
349 351 Parameters
350 352 ----------
351 353
352 354 debug : bool
353 355 whether to trigger extra debugging statements
354 356 packer/unpacker : str : 'json', 'pickle' or import_string
355 357 importstrings for methods to serialize message parts. If just
356 358 'json' or 'pickle', predefined JSON and pickle packers will be used.
357 359 Otherwise, the entire importstring must be used.
358 360
359 361 The functions must accept at least valid JSON input, and output
360 362 *bytes*.
361 363
362 364 For example, to use msgpack:
363 365 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
364 366 pack/unpack : callables
365 367 You can also set the pack/unpack callables for serialization
366 368 directly.
367 369 session : unicode (must be ascii)
368 370 the ID of this Session object. The default is to generate a new
369 371 UUID.
370 372 bsession : bytes
371 373 The session as bytes
372 374 username : unicode
373 375 username added to message headers. The default is to ask the OS.
374 376 key : bytes
375 377 The key used to initialize an HMAC signature. If unset, messages
376 378 will not be signed or checked.
377 379 keyfile : filepath
378 380 The file containing a key. If this is set, `key` will be
379 381 initialized to the contents of the file.
380 382 """
381 383 super(Session, self).__init__(**kwargs)
382 384 self._check_packers()
383 385 self.none = self.pack({})
384 386 # ensure self._session_default() if necessary, so bsession is defined:
385 387 self.session
386 388
387 389 @property
388 390 def msg_id(self):
389 391 """always return new uuid"""
390 392 return str(uuid.uuid4())
391 393
392 394 def _check_packers(self):
393 395 """check packers for binary data and datetime support."""
394 396 pack = self.pack
395 397 unpack = self.unpack
396 398
397 399 # check simple serialization
398 400 msg = dict(a=[1,'hi'])
399 401 try:
400 402 packed = pack(msg)
401 403 except Exception:
402 404 raise ValueError("packer could not serialize a simple message")
403 405
404 406 # ensure packed message is bytes
405 407 if not isinstance(packed, bytes):
406 408 raise ValueError("message packed to %r, but bytes are required"%type(packed))
407 409
408 410 # check that unpack is pack's inverse
409 411 try:
410 412 unpacked = unpack(packed)
411 413 except Exception:
412 414 raise ValueError("unpacker could not handle the packer's output")
413 415
414 416 # check datetime support
415 417 msg = dict(t=datetime.now())
416 418 try:
417 419 unpacked = unpack(pack(msg))
418 420 except Exception:
419 421 self.pack = lambda o: pack(squash_dates(o))
420 422 self.unpack = lambda s: extract_dates(unpack(s))
421 423
422 424 def msg_header(self, msg_type):
423 425 return msg_header(self.msg_id, msg_type, self.username, self.session)
424 426
425 427 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
426 428 """Return the nested message dict.
427 429
428 430 This format is different from what is sent over the wire. The
429 431 serialize/unserialize methods converts this nested message dict to the wire
430 432 format, which is a list of message parts.
431 433 """
432 434 msg = {}
433 435 header = self.msg_header(msg_type) if header is None else header
434 436 msg['header'] = header
435 437 msg['msg_id'] = header['msg_id']
436 438 msg['msg_type'] = header['msg_type']
437 439 msg['parent_header'] = {} if parent is None else extract_header(parent)
438 440 msg['content'] = {} if content is None else content
439 441 msg['metadata'] = self.metadata.copy()
440 442 if metadata is not None:
441 443 msg['metadata'].update(metadata)
442 444 return msg
443 445
444 446 def sign(self, msg_list):
445 447 """Sign a message with HMAC digest. If no auth, return b''.
446 448
447 449 Parameters
448 450 ----------
449 451 msg_list : list
450 452 The [p_header,p_parent,p_content] part of the message list.
451 453 """
452 454 if self.auth is None:
453 455 return b''
454 456 h = self.auth.copy()
455 457 for m in msg_list:
456 458 h.update(m)
457 459 return str_to_bytes(h.hexdigest())
458 460
459 461 def serialize(self, msg, ident=None):
460 462 """Serialize the message components to bytes.
461 463
462 464 This is roughly the inverse of unserialize. The serialize/unserialize
463 465 methods work with full message lists, whereas pack/unpack work with
464 466 the individual message parts in the message list.
465 467
466 468 Parameters
467 469 ----------
468 470 msg : dict or Message
469 471 The nexted message dict as returned by the self.msg method.
470 472
471 473 Returns
472 474 -------
473 475 msg_list : list
474 476 The list of bytes objects to be sent with the format:
475 477 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_metadata,p_content,
476 478 buffer1,buffer2,...]. In this list, the p_* entities are
477 479 the packed or serialized versions, so if JSON is used, these
478 480 are utf8 encoded JSON strings.
479 481 """
480 482 content = msg.get('content', {})
481 483 if content is None:
482 484 content = self.none
483 485 elif isinstance(content, dict):
484 486 content = self.pack(content)
485 487 elif isinstance(content, bytes):
486 488 # content is already packed, as in a relayed message
487 489 pass
488 490 elif isinstance(content, unicode):
489 491 # should be bytes, but JSON often spits out unicode
490 492 content = content.encode('utf8')
491 493 else:
492 494 raise TypeError("Content incorrect type: %s"%type(content))
493 495
494 496 real_message = [self.pack(msg['header']),
495 497 self.pack(msg['parent_header']),
496 498 self.pack(msg['metadata']),
497 499 content,
498 500 ]
499 501
500 502 to_send = []
501 503
502 504 if isinstance(ident, list):
503 505 # accept list of idents
504 506 to_send.extend(ident)
505 507 elif ident is not None:
506 508 to_send.append(ident)
507 509 to_send.append(DELIM)
508 510
509 511 signature = self.sign(real_message)
510 512 to_send.append(signature)
511 513
512 514 to_send.extend(real_message)
513 515
514 516 return to_send
515 517
516 518 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
517 519 buffers=None, track=False, header=None, metadata=None):
518 520 """Build and send a message via stream or socket.
519 521
520 522 The message format used by this function internally is as follows:
521 523
522 524 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
523 525 buffer1,buffer2,...]
524 526
525 527 The serialize/unserialize methods convert the nested message dict into this
526 528 format.
527 529
528 530 Parameters
529 531 ----------
530 532
531 533 stream : zmq.Socket or ZMQStream
532 534 The socket-like object used to send the data.
533 535 msg_or_type : str or Message/dict
534 536 Normally, msg_or_type will be a msg_type unless a message is being
535 537 sent more than once. If a header is supplied, this can be set to
536 538 None and the msg_type will be pulled from the header.
537 539
538 540 content : dict or None
539 541 The content of the message (ignored if msg_or_type is a message).
540 542 header : dict or None
541 543 The header dict for the message (ignored if msg_to_type is a message).
542 544 parent : Message or dict or None
543 545 The parent or parent header describing the parent of this message
544 546 (ignored if msg_or_type is a message).
545 547 ident : bytes or list of bytes
546 548 The zmq.IDENTITY routing path.
547 549 metadata : dict or None
548 550 The metadata describing the message
549 551 buffers : list or None
550 552 The already-serialized buffers to be appended to the message.
551 553 track : bool
552 554 Whether to track. Only for use with Sockets, because ZMQStream
553 555 objects cannot track messages.
554 556
555 557
556 558 Returns
557 559 -------
558 560 msg : dict
559 561 The constructed message.
560 562 """
561 563
562 564 if not isinstance(stream, (zmq.Socket, ZMQStream)):
563 565 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
564 566 elif track and isinstance(stream, ZMQStream):
565 567 raise TypeError("ZMQStream cannot track messages")
566 568
567 569 if isinstance(msg_or_type, (Message, dict)):
568 570 # We got a Message or message dict, not a msg_type so don't
569 571 # build a new Message.
570 572 msg = msg_or_type
571 573 else:
572 574 msg = self.msg(msg_or_type, content=content, parent=parent,
573 575 header=header, metadata=metadata)
574 576
575 577 buffers = [] if buffers is None else buffers
576 578 to_send = self.serialize(msg, ident)
577 579 to_send.extend(buffers)
578 580 longest = max([ len(s) for s in to_send ])
579 581 copy = (longest < self.copy_threshold)
580 582
581 583 if buffers and track and not copy:
582 584 # only really track when we are doing zero-copy buffers
583 585 tracker = stream.send_multipart(to_send, copy=False, track=True)
584 586 else:
585 587 # use dummy tracker, which will be done immediately
586 588 tracker = DONE
587 589 stream.send_multipart(to_send, copy=copy)
588 590
589 591 if self.debug:
590 592 pprint.pprint(msg)
591 593 pprint.pprint(to_send)
592 594 pprint.pprint(buffers)
593 595
594 596 msg['tracker'] = tracker
595 597
596 598 return msg
597 599
598 600 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
599 601 """Send a raw message via ident path.
600 602
601 603 This method is used to send a already serialized message.
602 604
603 605 Parameters
604 606 ----------
605 607 stream : ZMQStream or Socket
606 608 The ZMQ stream or socket to use for sending the message.
607 609 msg_list : list
608 610 The serialized list of messages to send. This only includes the
609 611 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
610 612 the message.
611 613 ident : ident or list
612 614 A single ident or a list of idents to use in sending.
613 615 """
614 616 to_send = []
615 617 if isinstance(ident, bytes):
616 618 ident = [ident]
617 619 if ident is not None:
618 620 to_send.extend(ident)
619 621
620 622 to_send.append(DELIM)
621 623 to_send.append(self.sign(msg_list))
622 624 to_send.extend(msg_list)
623 625 stream.send_multipart(msg_list, flags, copy=copy)
624 626
625 627 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
626 628 """Receive and unpack a message.
627 629
628 630 Parameters
629 631 ----------
630 632 socket : ZMQStream or Socket
631 633 The socket or stream to use in receiving.
632 634
633 635 Returns
634 636 -------
635 637 [idents], msg
636 638 [idents] is a list of idents and msg is a nested message dict of
637 639 same format as self.msg returns.
638 640 """
639 641 if isinstance(socket, ZMQStream):
640 642 socket = socket.socket
641 643 try:
642 644 msg_list = socket.recv_multipart(mode, copy=copy)
643 645 except zmq.ZMQError as e:
644 646 if e.errno == zmq.EAGAIN:
645 647 # We can convert EAGAIN to None as we know in this case
646 648 # recv_multipart won't return None.
647 649 return None,None
648 650 else:
649 651 raise
650 652 # split multipart message into identity list and message dict
651 653 # invalid large messages can cause very expensive string comparisons
652 654 idents, msg_list = self.feed_identities(msg_list, copy)
653 655 try:
654 656 return idents, self.unserialize(msg_list, content=content, copy=copy)
655 657 except Exception as e:
656 658 # TODO: handle it
657 659 raise e
658 660
659 661 def feed_identities(self, msg_list, copy=True):
660 662 """Split the identities from the rest of the message.
661 663
662 664 Feed until DELIM is reached, then return the prefix as idents and
663 665 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
664 666 but that would be silly.
665 667
666 668 Parameters
667 669 ----------
668 670 msg_list : a list of Message or bytes objects
669 671 The message to be split.
670 672 copy : bool
671 673 flag determining whether the arguments are bytes or Messages
672 674
673 675 Returns
674 676 -------
675 677 (idents, msg_list) : two lists
676 678 idents will always be a list of bytes, each of which is a ZMQ
677 679 identity. msg_list will be a list of bytes or zmq.Messages of the
678 680 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
679 681 should be unpackable/unserializable via self.unserialize at this
680 682 point.
681 683 """
682 684 if copy:
683 685 idx = msg_list.index(DELIM)
684 686 return msg_list[:idx], msg_list[idx+1:]
685 687 else:
686 688 failed = True
687 689 for idx,m in enumerate(msg_list):
688 690 if m.bytes == DELIM:
689 691 failed = False
690 692 break
691 693 if failed:
692 694 raise ValueError("DELIM not in msg_list")
693 695 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
694 696 return [m.bytes for m in idents], msg_list
695 697
696 698 def unserialize(self, msg_list, content=True, copy=True):
697 699 """Unserialize a msg_list to a nested message dict.
698 700
699 701 This is roughly the inverse of serialize. The serialize/unserialize
700 702 methods work with full message lists, whereas pack/unpack work with
701 703 the individual message parts in the message list.
702 704
703 705 Parameters:
704 706 -----------
705 707 msg_list : list of bytes or Message objects
706 708 The list of message parts of the form [HMAC,p_header,p_parent,
707 709 p_metadata,p_content,buffer1,buffer2,...].
708 710 content : bool (True)
709 711 Whether to unpack the content dict (True), or leave it packed
710 712 (False).
711 713 copy : bool (True)
712 714 Whether to return the bytes (True), or the non-copying Message
713 715 object in each place (False).
714 716
715 717 Returns
716 718 -------
717 719 msg : dict
718 720 The nested message dict with top-level keys [header, parent_header,
719 721 content, buffers].
720 722 """
721 723 minlen = 5
722 724 message = {}
723 725 if not copy:
724 726 for i in range(minlen):
725 727 msg_list[i] = msg_list[i].bytes
726 728 if self.auth is not None:
727 729 signature = msg_list[0]
728 730 if not signature:
729 731 raise ValueError("Unsigned Message")
730 732 if signature in self.digest_history:
731 733 raise ValueError("Duplicate Signature: %r"%signature)
732 734 self.digest_history.add(signature)
733 735 check = self.sign(msg_list[1:5])
734 736 if not signature == check:
735 737 raise ValueError("Invalid Signature: %r" % signature)
736 738 if not len(msg_list) >= minlen:
737 739 raise TypeError("malformed message, must have at least %i elements"%minlen)
738 740 header = self.unpack(msg_list[1])
739 741 message['header'] = header
740 742 message['msg_id'] = header['msg_id']
741 743 message['msg_type'] = header['msg_type']
742 744 message['parent_header'] = self.unpack(msg_list[2])
743 745 message['metadata'] = self.unpack(msg_list[3])
744 746 if content:
745 747 message['content'] = self.unpack(msg_list[4])
746 748 else:
747 749 message['content'] = msg_list[4]
748 750
749 751 message['buffers'] = msg_list[5:]
750 752 return message
751 753
752 754 def test_msg2obj():
753 755 am = dict(x=1)
754 756 ao = Message(am)
755 757 assert ao.x == am['x']
756 758
757 759 am['y'] = dict(z=1)
758 760 ao = Message(am)
759 761 assert ao.y.z == am['y']['z']
760 762
761 763 k1, k2 = 'y', 'z'
762 764 assert ao[k1][k2] == am[k1][k2]
763 765
764 766 am2 = dict(ao)
765 767 assert am['x'] == am2['x']
766 768 assert am['y']['z'] == am2['y']['z']
767 769
General Comments 0
You need to be logged in to leave comments. Login now