##// END OF EJS Templates
Added more docstrings to IPython.zmq.session....
Brian E. Granger -
Show More
@@ -351,6 +351,12 b' class Session(Configurable):'
351 351 return msg_header(self.msg_id, msg_type, self.username, self.session)
352 352
353 353 def msg(self, msg_type, content=None, parent=None, subheader=None):
354 """Return the nested message dict.
355
356 This format is different from what is sent over the wire. The
357 self.serialize method converts this nested message dict to the wire
358 format, which uses a message list.
359 """
354 360 msg = {}
355 361 msg['header'] = self.msg_header(msg_type)
356 362 msg['msg_id'] = msg['header']['msg_id']
@@ -361,23 +367,37 b' class Session(Configurable):'
361 367 msg['header'].update(sub)
362 368 return msg
363 369
364 def sign(self, msg):
365 """Sign a message with HMAC digest. If no auth, return b''."""
370 def sign(self, msg_list):
371 """Sign a message with HMAC digest. If no auth, return b''.
372
373 Parameters
374 ----------
375 msg_list : list
376 The [p_header,p_parent,p_content] part of the message list.
377 """
366 378 if self.auth is None:
367 379 return b''
368 380 h = self.auth.copy()
369 for m in msg:
381 for m in msg_list:
370 382 h.update(m)
371 383 return h.hexdigest()
372 384
373 385 def serialize(self, msg, ident=None):
374 386 """Serialize the message components to bytes.
375
387
388 Parameters
389 ----------
390 msg : dict or Message
391 The nexted message dict as returned by the self.msg method.
392
376 393 Returns
377 394 -------
378
379 list of bytes objects
380
395 msg_list : list
396 The list of bytes objects to be sent with the format:
397 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
398 buffer1,buffer2,...]. In this list, the p_* entities are
399 the packed or serialized versions, so if JSON is used, these
400 are uft8 encoded JSON strings.
381 401 """
382 402 content = msg.get('content', {})
383 403 if content is None:
@@ -417,7 +437,15 b' class Session(Configurable):'
417 437 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
418 438 buffers=None, subheader=None, track=False):
419 439 """Build and send a message via stream or socket.
420
440
441 The message format used by this function internally is as follows:
442
443 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
444 buffer1,buffer2,...]
445
446 The self.serialize method converts the nested message dict into this
447 format.
448
421 449 Parameters
422 450 ----------
423 451
@@ -483,7 +511,7 b' class Session(Configurable):'
483 511 tracker = stream.send(buffers[-1], copy=False, track=track)
484 512 else:
485 513 tracker = stream.send(buffers[-1], copy=False)
486
514
487 515 # omsg = Message(msg)
488 516 if self.debug:
489 517 pprint.pprint(msg)
@@ -494,12 +522,22 b' class Session(Configurable):'
494 522
495 523 return msg
496 524
497 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
525 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
498 526 """Send a raw message via ident path.
499
527
528 This method is used to send a already serialized message.
529
500 530 Parameters
501 531 ----------
502 msg : list of sendable buffers"""
532 stream : ZMQStream or Socket
533 The ZMQ stream or socket to use for sending the message.
534 msg_list : list
535 The serialized list of messages to send. This only includes the
536 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
537 the message.
538 ident : ident or list
539 A single ident or a list of idents to use in sending.
540 """
503 541 to_send = []
504 542 if isinstance(ident, bytes):
505 543 ident = [ident]
@@ -507,17 +545,28 b' class Session(Configurable):'
507 545 to_send.extend(ident)
508 546
509 547 to_send.append(DELIM)
510 to_send.append(self.sign(msg))
511 to_send.extend(msg)
512 stream.send_multipart(msg, flags, copy=copy)
548 to_send.append(self.sign(msg_list))
549 to_send.extend(msg_list)
550 stream.send_multipart(msg_list, flags, copy=copy)
513 551
514 552 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
515 """receives and unpacks a message
516 returns [idents], msg"""
553 """Receive and unpack a message.
554
555 Parameters
556 ----------
557 socket : ZMQStream or Socket
558 The socket or stream to use in receiving.
559
560 Returns
561 -------
562 [idents], msg
563 [idents] is a list of idents and msg is a nested message dict of
564 same format as self.msg returns.
565 """
517 566 if isinstance(socket, ZMQStream):
518 567 socket = socket.socket
519 568 try:
520 msg = socket.recv_multipart(mode)
569 msg_list = socket.recv_multipart(mode)
521 570 except zmq.ZMQError as e:
522 571 if e.errno == zmq.EAGAIN:
523 572 # We can convert EAGAIN to None as we know in this case
@@ -527,48 +576,50 b' class Session(Configurable):'
527 576 raise
528 577 # split multipart message into identity list and message dict
529 578 # invalid large messages can cause very expensive string comparisons
530 idents, msg = self.feed_identities(msg, copy)
579 idents, msg_list = self.feed_identities(msg_list, copy)
531 580 try:
532 return idents, self.unpack_message(msg, content=content, copy=copy)
581 return idents, self.unpack_message(msg_list, content=content, copy=copy)
533 582 except Exception as e:
534 print (idents, msg)
583 print (idents, msg_list)
535 584 # TODO: handle it
536 585 raise e
537 586
538 def feed_identities(self, msg, copy=True):
539 """feed until DELIM is reached, then return the prefix as idents and
540 remainder as msg. This is easily broken by setting an IDENT to DELIM,
587 def feed_identities(self, msg_list, copy=True):
588 """Split the identities from the rest of the message.
589
590 Feed until DELIM is reached, then return the prefix as idents and
591 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
541 592 but that would be silly.
542 593
543 594 Parameters
544 595 ----------
545 msg : a list of Message or bytes objects
546 the message to be split
596 msg_list : a list of Message or bytes objects
597 The message to be split.
547 598 copy : bool
548 599 flag determining whether the arguments are bytes or Messages
549 600
550 601 Returns
551 602 -------
552 (idents,msg) : two lists
603 (idents,msg_list) : two lists
553 604 idents will always be a list of bytes - the indentity prefix
554 msg will be a list of bytes or Messages, unchanged from input
555 msg should be unpackable via self.unpack_message at this point.
605 msg_list will be a list of bytes or Messages, unchanged from input
606 msg_list should be unpackable via self.unpack_message at this point.
556 607 """
557 608 if copy:
558 idx = msg.index(DELIM)
559 return msg[:idx], msg[idx+1:]
609 idx = msg_list.index(DELIM)
610 return msg_list[:idx], msg_list[idx+1:]
560 611 else:
561 612 failed = True
562 for idx,m in enumerate(msg):
613 for idx,m in enumerate(msg_list):
563 614 if m.bytes == DELIM:
564 615 failed = False
565 616 break
566 617 if failed:
567 raise ValueError("DELIM not in msg")
568 idents, msg = msg[:idx], msg[idx+1:]
569 return [m.bytes for m in idents], msg
618 raise ValueError("DELIM not in msg_list")
619 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
620 return [m.bytes for m in idents], msg_list
570 621
571 def unpack_message(self, msg, content=True, copy=True):
622 def unpack_message(self, msg_list, content=True, copy=True):
572 623 """Return a message object from the format
573 624 sent by self.send.
574 625
@@ -588,26 +639,26 b' class Session(Configurable):'
588 639 message = {}
589 640 if not copy:
590 641 for i in range(minlen):
591 msg[i] = msg[i].bytes
642 msg_list[i] = msg_list[i].bytes
592 643 if self.auth is not None:
593 signature = msg[0]
644 signature = msg_list[0]
594 645 if signature in self.digest_history:
595 646 raise ValueError("Duplicate Signature: %r"%signature)
596 647 self.digest_history.add(signature)
597 check = self.sign(msg[1:4])
648 check = self.sign(msg_list[1:4])
598 649 if not signature == check:
599 650 raise ValueError("Invalid Signature: %r"%signature)
600 if not len(msg) >= minlen:
651 if not len(msg_list) >= minlen:
601 652 raise TypeError("malformed message, must have at least %i elements"%minlen)
602 message['header'] = self.unpack(msg[1])
653 message['header'] = self.unpack(msg_list[1])
603 654 message['msg_type'] = message['header']['msg_type']
604 message['parent_header'] = self.unpack(msg[2])
655 message['parent_header'] = self.unpack(msg_list[2])
605 656 if content:
606 message['content'] = self.unpack(msg[3])
657 message['content'] = self.unpack(msg_list[3])
607 658 else:
608 message['content'] = msg[3]
659 message['content'] = msg_list[3]
609 660
610 message['buffers'] = msg[4:]
661 message['buffers'] = msg_list[4:]
611 662 return message
612 663
613 664 def test_msg2obj():
General Comments 0
You need to be logged in to leave comments. Login now