##// END OF EJS Templates
Merge branch 'sessiondocs'
Brian E. Granger -
r4221:e281f288 merge
parent child Browse files
Show More
@@ -351,6 +351,12 b' class Session(Configurable):'
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
352
352
353 def msg(self, msg_type, content=None, parent=None, subheader=None):
353 def msg(self, msg_type, content=None, parent=None, subheader=None):
354 """Return the nested message dict.
355
356 This format is different from what is sent over the wire. The
357 self.serialize method converts this nested message dict to the wire
358 format, which uses a message list.
359 """
354 msg = {}
360 msg = {}
355 msg['header'] = self.msg_header(msg_type)
361 msg['header'] = self.msg_header(msg_type)
356 msg['msg_id'] = msg['header']['msg_id']
362 msg['msg_id'] = msg['header']['msg_id']
@@ -361,23 +367,37 b' class Session(Configurable):'
361 msg['header'].update(sub)
367 msg['header'].update(sub)
362 return msg
368 return msg
363
369
364 def sign(self, msg):
370 def sign(self, msg_list):
365 """Sign a message with HMAC digest. If no auth, return b''."""
371 """Sign a message with HMAC digest. If no auth, return b''.
372
373 Parameters
374 ----------
375 msg_list : list
376 The [p_header,p_parent,p_content] part of the message list.
377 """
366 if self.auth is None:
378 if self.auth is None:
367 return b''
379 return b''
368 h = self.auth.copy()
380 h = self.auth.copy()
369 for m in msg:
381 for m in msg_list:
370 h.update(m)
382 h.update(m)
371 return h.hexdigest()
383 return h.hexdigest()
372
384
373 def serialize(self, msg, ident=None):
385 def serialize(self, msg, ident=None):
374 """Serialize the message components to bytes.
386 """Serialize the message components to bytes.
375
387
388 Parameters
389 ----------
390 msg : dict or Message
391 The nexted message dict as returned by the self.msg method.
392
376 Returns
393 Returns
377 -------
394 -------
378
395 msg_list : list
379 list of bytes objects
396 The list of bytes objects to be sent with the format:
380
397 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
398 buffer1,buffer2,...]. In this list, the p_* entities are
399 the packed or serialized versions, so if JSON is used, these
400 are uft8 encoded JSON strings.
381 """
401 """
382 content = msg.get('content', {})
402 content = msg.get('content', {})
383 if content is None:
403 if content is None:
@@ -417,7 +437,15 b' class Session(Configurable):'
417 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
437 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
418 buffers=None, subheader=None, track=False):
438 buffers=None, subheader=None, track=False):
419 """Build and send a message via stream or socket.
439 """Build and send a message via stream or socket.
420
440
441 The message format used by this function internally is as follows:
442
443 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
444 buffer1,buffer2,...]
445
446 The self.serialize method converts the nested message dict into this
447 format.
448
421 Parameters
449 Parameters
422 ----------
450 ----------
423
451
@@ -483,7 +511,7 b' class Session(Configurable):'
483 tracker = stream.send(buffers[-1], copy=False, track=track)
511 tracker = stream.send(buffers[-1], copy=False, track=track)
484 else:
512 else:
485 tracker = stream.send(buffers[-1], copy=False)
513 tracker = stream.send(buffers[-1], copy=False)
486
514
487 # omsg = Message(msg)
515 # omsg = Message(msg)
488 if self.debug:
516 if self.debug:
489 pprint.pprint(msg)
517 pprint.pprint(msg)
@@ -494,12 +522,22 b' class Session(Configurable):'
494
522
495 return msg
523 return msg
496
524
497 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
525 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
498 """Send a raw message via ident path.
526 """Send a raw message via ident path.
499
527
528 This method is used to send a already serialized message.
529
500 Parameters
530 Parameters
501 ----------
531 ----------
502 msg : list of sendable buffers"""
532 stream : ZMQStream or Socket
533 The ZMQ stream or socket to use for sending the message.
534 msg_list : list
535 The serialized list of messages to send. This only includes the
536 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
537 the message.
538 ident : ident or list
539 A single ident or a list of idents to use in sending.
540 """
503 to_send = []
541 to_send = []
504 if isinstance(ident, bytes):
542 if isinstance(ident, bytes):
505 ident = [ident]
543 ident = [ident]
@@ -507,17 +545,28 b' class Session(Configurable):'
507 to_send.extend(ident)
545 to_send.extend(ident)
508
546
509 to_send.append(DELIM)
547 to_send.append(DELIM)
510 to_send.append(self.sign(msg))
548 to_send.append(self.sign(msg_list))
511 to_send.extend(msg)
549 to_send.extend(msg_list)
512 stream.send_multipart(msg, flags, copy=copy)
550 stream.send_multipart(msg_list, flags, copy=copy)
513
551
514 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
552 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
515 """receives and unpacks a message
553 """Receive and unpack a message.
516 returns [idents], msg"""
554
555 Parameters
556 ----------
557 socket : ZMQStream or Socket
558 The socket or stream to use in receiving.
559
560 Returns
561 -------
562 [idents], msg
563 [idents] is a list of idents and msg is a nested message dict of
564 same format as self.msg returns.
565 """
517 if isinstance(socket, ZMQStream):
566 if isinstance(socket, ZMQStream):
518 socket = socket.socket
567 socket = socket.socket
519 try:
568 try:
520 msg = socket.recv_multipart(mode)
569 msg_list = socket.recv_multipart(mode)
521 except zmq.ZMQError as e:
570 except zmq.ZMQError as e:
522 if e.errno == zmq.EAGAIN:
571 if e.errno == zmq.EAGAIN:
523 # We can convert EAGAIN to None as we know in this case
572 # We can convert EAGAIN to None as we know in this case
@@ -527,48 +576,50 b' class Session(Configurable):'
527 raise
576 raise
528 # split multipart message into identity list and message dict
577 # split multipart message into identity list and message dict
529 # invalid large messages can cause very expensive string comparisons
578 # invalid large messages can cause very expensive string comparisons
530 idents, msg = self.feed_identities(msg, copy)
579 idents, msg_list = self.feed_identities(msg_list, copy)
531 try:
580 try:
532 return idents, self.unpack_message(msg, content=content, copy=copy)
581 return idents, self.unpack_message(msg_list, content=content, copy=copy)
533 except Exception as e:
582 except Exception as e:
534 print (idents, msg)
583 print (idents, msg_list)
535 # TODO: handle it
584 # TODO: handle it
536 raise e
585 raise e
537
586
538 def feed_identities(self, msg, copy=True):
587 def feed_identities(self, msg_list, copy=True):
539 """feed until DELIM is reached, then return the prefix as idents and
588 """Split the identities from the rest of the message.
540 remainder as msg. This is easily broken by setting an IDENT to DELIM,
589
590 Feed until DELIM is reached, then return the prefix as idents and
591 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
541 but that would be silly.
592 but that would be silly.
542
593
543 Parameters
594 Parameters
544 ----------
595 ----------
545 msg : a list of Message or bytes objects
596 msg_list : a list of Message or bytes objects
546 the message to be split
597 The message to be split.
547 copy : bool
598 copy : bool
548 flag determining whether the arguments are bytes or Messages
599 flag determining whether the arguments are bytes or Messages
549
600
550 Returns
601 Returns
551 -------
602 -------
552 (idents,msg) : two lists
603 (idents,msg_list) : two lists
553 idents will always be a list of bytes - the indentity prefix
604 idents will always be a list of bytes - the indentity prefix
554 msg will be a list of bytes or Messages, unchanged from input
605 msg_list will be a list of bytes or Messages, unchanged from input
555 msg should be unpackable via self.unpack_message at this point.
606 msg_list should be unpackable via self.unpack_message at this point.
556 """
607 """
557 if copy:
608 if copy:
558 idx = msg.index(DELIM)
609 idx = msg_list.index(DELIM)
559 return msg[:idx], msg[idx+1:]
610 return msg_list[:idx], msg_list[idx+1:]
560 else:
611 else:
561 failed = True
612 failed = True
562 for idx,m in enumerate(msg):
613 for idx,m in enumerate(msg_list):
563 if m.bytes == DELIM:
614 if m.bytes == DELIM:
564 failed = False
615 failed = False
565 break
616 break
566 if failed:
617 if failed:
567 raise ValueError("DELIM not in msg")
618 raise ValueError("DELIM not in msg_list")
568 idents, msg = msg[:idx], msg[idx+1:]
619 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
569 return [m.bytes for m in idents], msg
620 return [m.bytes for m in idents], msg_list
570
621
571 def unpack_message(self, msg, content=True, copy=True):
622 def unpack_message(self, msg_list, content=True, copy=True):
572 """Return a message object from the format
623 """Return a message object from the format
573 sent by self.send.
624 sent by self.send.
574
625
@@ -588,26 +639,26 b' class Session(Configurable):'
588 message = {}
639 message = {}
589 if not copy:
640 if not copy:
590 for i in range(minlen):
641 for i in range(minlen):
591 msg[i] = msg[i].bytes
642 msg_list[i] = msg_list[i].bytes
592 if self.auth is not None:
643 if self.auth is not None:
593 signature = msg[0]
644 signature = msg_list[0]
594 if signature in self.digest_history:
645 if signature in self.digest_history:
595 raise ValueError("Duplicate Signature: %r"%signature)
646 raise ValueError("Duplicate Signature: %r"%signature)
596 self.digest_history.add(signature)
647 self.digest_history.add(signature)
597 check = self.sign(msg[1:4])
648 check = self.sign(msg_list[1:4])
598 if not signature == check:
649 if not signature == check:
599 raise ValueError("Invalid Signature: %r"%signature)
650 raise ValueError("Invalid Signature: %r"%signature)
600 if not len(msg) >= minlen:
651 if not len(msg_list) >= minlen:
601 raise TypeError("malformed message, must have at least %i elements"%minlen)
652 raise TypeError("malformed message, must have at least %i elements"%minlen)
602 message['header'] = self.unpack(msg[1])
653 message['header'] = self.unpack(msg_list[1])
603 message['msg_type'] = message['header']['msg_type']
654 message['msg_type'] = message['header']['msg_type']
604 message['parent_header'] = self.unpack(msg[2])
655 message['parent_header'] = self.unpack(msg_list[2])
605 if content:
656 if content:
606 message['content'] = self.unpack(msg[3])
657 message['content'] = self.unpack(msg_list[3])
607 else:
658 else:
608 message['content'] = msg[3]
659 message['content'] = msg_list[3]
609
660
610 message['buffers'] = msg[4:]
661 message['buffers'] = msg_list[4:]
611 return message
662 return message
612
663
613 def test_msg2obj():
664 def test_msg2obj():
General Comments 0
You need to be logged in to leave comments. Login now