Show More
@@ -351,6 +351,12 b' class Session(Configurable):' | |||
|
351 | 351 | return msg_header(self.msg_id, msg_type, self.username, self.session) |
|
352 | 352 | |
|
353 | 353 | def msg(self, msg_type, content=None, parent=None, subheader=None): |
|
354 | """Return the nested message dict. | |
|
355 | ||
|
356 | This format is different from what is sent over the wire. The | |
|
357 | self.serialize method converts this nested message dict to the wire | |
|
358 | format, which uses a message list. | |
|
359 | """ | |
|
354 | 360 | msg = {} |
|
355 | 361 | msg['header'] = self.msg_header(msg_type) |
|
356 | 362 | msg['msg_id'] = msg['header']['msg_id'] |
@@ -361,23 +367,37 b' class Session(Configurable):' | |||
|
361 | 367 | msg['header'].update(sub) |
|
362 | 368 | return msg |
|
363 | 369 | |
|
364 | def sign(self, msg): | |
|
365 |
"""Sign a message with HMAC digest. If no auth, return b''. |
|
|
370 | def sign(self, msg_list): | |
|
371 | """Sign a message with HMAC digest. If no auth, return b''. | |
|
372 | ||
|
373 | Parameters | |
|
374 | ---------- | |
|
375 | msg_list : list | |
|
376 | The [p_header,p_parent,p_content] part of the message list. | |
|
377 | """ | |
|
366 | 378 | if self.auth is None: |
|
367 | 379 | return b'' |
|
368 | 380 | h = self.auth.copy() |
|
369 | for m in msg: | |
|
381 | for m in msg_list: | |
|
370 | 382 | h.update(m) |
|
371 | 383 | return h.hexdigest() |
|
372 | 384 | |
|
373 | 385 | def serialize(self, msg, ident=None): |
|
374 | 386 | """Serialize the message components to bytes. |
|
375 | 387 | |
|
388 | Parameters | |
|
389 | ---------- | |
|
390 | msg : dict or Message | |
|
391 | The nexted message dict as returned by the self.msg method. | |
|
392 | ||
|
376 | 393 | Returns |
|
377 | 394 | ------- |
|
378 | ||
|
379 | list of bytes objects | |
|
380 | ||
|
395 | msg_list : list | |
|
396 | The list of bytes objects to be sent with the format: | |
|
397 | [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, | |
|
398 | buffer1,buffer2,...]. In this list, the p_* entities are | |
|
399 | the packed or serialized versions, so if JSON is used, these | |
|
400 | are uft8 encoded JSON strings. | |
|
381 | 401 | """ |
|
382 | 402 | content = msg.get('content', {}) |
|
383 | 403 | if content is None: |
@@ -418,6 +438,14 b' class Session(Configurable):' | |||
|
418 | 438 | buffers=None, subheader=None, track=False): |
|
419 | 439 | """Build and send a message via stream or socket. |
|
420 | 440 | |
|
441 | The message format used by this function internally is as follows: | |
|
442 | ||
|
443 | [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, | |
|
444 | buffer1,buffer2,...] | |
|
445 | ||
|
446 | The self.serialize method converts the nested message dict into this | |
|
447 | format. | |
|
448 | ||
|
421 | 449 | Parameters |
|
422 | 450 | ---------- |
|
423 | 451 | |
@@ -494,12 +522,22 b' class Session(Configurable):' | |||
|
494 | 522 | |
|
495 | 523 | return msg |
|
496 | 524 | |
|
497 | def send_raw(self, stream, msg, flags=0, copy=True, ident=None): | |
|
525 | def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None): | |
|
498 | 526 | """Send a raw message via ident path. |
|
499 | 527 | |
|
528 | This method is used to send a already serialized message. | |
|
529 | ||
|
500 | 530 | Parameters |
|
501 | 531 | ---------- |
|
502 | msg : list of sendable buffers""" | |
|
532 | stream : ZMQStream or Socket | |
|
533 | The ZMQ stream or socket to use for sending the message. | |
|
534 | msg_list : list | |
|
535 | The serialized list of messages to send. This only includes the | |
|
536 | [p_header,p_parent,p_content,buffer1,buffer2,...] portion of | |
|
537 | the message. | |
|
538 | ident : ident or list | |
|
539 | A single ident or a list of idents to use in sending. | |
|
540 | """ | |
|
503 | 541 | to_send = [] |
|
504 | 542 | if isinstance(ident, bytes): |
|
505 | 543 | ident = [ident] |
@@ -507,17 +545,28 b' class Session(Configurable):' | |||
|
507 | 545 | to_send.extend(ident) |
|
508 | 546 | |
|
509 | 547 | to_send.append(DELIM) |
|
510 | to_send.append(self.sign(msg)) | |
|
511 | to_send.extend(msg) | |
|
512 | stream.send_multipart(msg, flags, copy=copy) | |
|
548 | to_send.append(self.sign(msg_list)) | |
|
549 | to_send.extend(msg_list) | |
|
550 | stream.send_multipart(msg_list, flags, copy=copy) | |
|
513 | 551 | |
|
514 | 552 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): |
|
515 |
""" |
|
|
516 | returns [idents], msg""" | |
|
553 | """Receive and unpack a message. | |
|
554 | ||
|
555 | Parameters | |
|
556 | ---------- | |
|
557 | socket : ZMQStream or Socket | |
|
558 | The socket or stream to use in receiving. | |
|
559 | ||
|
560 | Returns | |
|
561 | ------- | |
|
562 | [idents], msg | |
|
563 | [idents] is a list of idents and msg is a nested message dict of | |
|
564 | same format as self.msg returns. | |
|
565 | """ | |
|
517 | 566 | if isinstance(socket, ZMQStream): |
|
518 | 567 | socket = socket.socket |
|
519 | 568 | try: |
|
520 | msg = socket.recv_multipart(mode) | |
|
569 | msg_list = socket.recv_multipart(mode) | |
|
521 | 570 | except zmq.ZMQError as e: |
|
522 | 571 | if e.errno == zmq.EAGAIN: |
|
523 | 572 | # We can convert EAGAIN to None as we know in this case |
@@ -527,48 +576,50 b' class Session(Configurable):' | |||
|
527 | 576 | raise |
|
528 | 577 | # split multipart message into identity list and message dict |
|
529 | 578 | # invalid large messages can cause very expensive string comparisons |
|
530 | idents, msg = self.feed_identities(msg, copy) | |
|
579 | idents, msg_list = self.feed_identities(msg_list, copy) | |
|
531 | 580 | try: |
|
532 | return idents, self.unpack_message(msg, content=content, copy=copy) | |
|
581 | return idents, self.unpack_message(msg_list, content=content, copy=copy) | |
|
533 | 582 | except Exception as e: |
|
534 | print (idents, msg) | |
|
583 | print (idents, msg_list) | |
|
535 | 584 | # TODO: handle it |
|
536 | 585 | raise e |
|
537 | 586 | |
|
538 | def feed_identities(self, msg, copy=True): | |
|
539 | """feed until DELIM is reached, then return the prefix as idents and | |
|
540 | remainder as msg. This is easily broken by setting an IDENT to DELIM, | |
|
587 | def feed_identities(self, msg_list, copy=True): | |
|
588 | """Split the identities from the rest of the message. | |
|
589 | ||
|
590 | Feed until DELIM is reached, then return the prefix as idents and | |
|
591 | remainder as msg_list. This is easily broken by setting an IDENT to DELIM, | |
|
541 | 592 | but that would be silly. |
|
542 | 593 | |
|
543 | 594 | Parameters |
|
544 | 595 | ---------- |
|
545 | msg : a list of Message or bytes objects | |
|
546 |
|
|
|
596 | msg_list : a list of Message or bytes objects | |
|
597 | The message to be split. | |
|
547 | 598 | copy : bool |
|
548 | 599 | flag determining whether the arguments are bytes or Messages |
|
549 | 600 | |
|
550 | 601 | Returns |
|
551 | 602 | ------- |
|
552 | (idents,msg) : two lists | |
|
603 | (idents,msg_list) : two lists | |
|
553 | 604 | idents will always be a list of bytes - the indentity prefix |
|
554 | msg will be a list of bytes or Messages, unchanged from input | |
|
555 | msg should be unpackable via self.unpack_message at this point. | |
|
605 | msg_list will be a list of bytes or Messages, unchanged from input | |
|
606 | msg_list should be unpackable via self.unpack_message at this point. | |
|
556 | 607 | """ |
|
557 | 608 | if copy: |
|
558 | idx = msg.index(DELIM) | |
|
559 | return msg[:idx], msg[idx+1:] | |
|
609 | idx = msg_list.index(DELIM) | |
|
610 | return msg_list[:idx], msg_list[idx+1:] | |
|
560 | 611 | else: |
|
561 | 612 | failed = True |
|
562 | for idx,m in enumerate(msg): | |
|
613 | for idx,m in enumerate(msg_list): | |
|
563 | 614 | if m.bytes == DELIM: |
|
564 | 615 | failed = False |
|
565 | 616 | break |
|
566 | 617 | if failed: |
|
567 | raise ValueError("DELIM not in msg") | |
|
568 | idents, msg = msg[:idx], msg[idx+1:] | |
|
569 | return [m.bytes for m in idents], msg | |
|
618 | raise ValueError("DELIM not in msg_list") | |
|
619 | idents, msg_list = msg_list[:idx], msg_list[idx+1:] | |
|
620 | return [m.bytes for m in idents], msg_list | |
|
570 | 621 | |
|
571 | def unpack_message(self, msg, content=True, copy=True): | |
|
622 | def unpack_message(self, msg_list, content=True, copy=True): | |
|
572 | 623 | """Return a message object from the format |
|
573 | 624 | sent by self.send. |
|
574 | 625 | |
@@ -588,26 +639,26 b' class Session(Configurable):' | |||
|
588 | 639 | message = {} |
|
589 | 640 | if not copy: |
|
590 | 641 | for i in range(minlen): |
|
591 | msg[i] = msg[i].bytes | |
|
642 | msg_list[i] = msg_list[i].bytes | |
|
592 | 643 | if self.auth is not None: |
|
593 | signature = msg[0] | |
|
644 | signature = msg_list[0] | |
|
594 | 645 | if signature in self.digest_history: |
|
595 | 646 | raise ValueError("Duplicate Signature: %r"%signature) |
|
596 | 647 | self.digest_history.add(signature) |
|
597 | check = self.sign(msg[1:4]) | |
|
648 | check = self.sign(msg_list[1:4]) | |
|
598 | 649 | if not signature == check: |
|
599 | 650 | raise ValueError("Invalid Signature: %r"%signature) |
|
600 | if not len(msg) >= minlen: | |
|
651 | if not len(msg_list) >= minlen: | |
|
601 | 652 | raise TypeError("malformed message, must have at least %i elements"%minlen) |
|
602 | message['header'] = self.unpack(msg[1]) | |
|
653 | message['header'] = self.unpack(msg_list[1]) | |
|
603 | 654 | message['msg_type'] = message['header']['msg_type'] |
|
604 | message['parent_header'] = self.unpack(msg[2]) | |
|
655 | message['parent_header'] = self.unpack(msg_list[2]) | |
|
605 | 656 | if content: |
|
606 | message['content'] = self.unpack(msg[3]) | |
|
657 | message['content'] = self.unpack(msg_list[3]) | |
|
607 | 658 | else: |
|
608 | message['content'] = msg[3] | |
|
659 | message['content'] = msg_list[3] | |
|
609 | 660 | |
|
610 | message['buffers'] = msg[4:] | |
|
661 | message['buffers'] = msg_list[4:] | |
|
611 | 662 | return message |
|
612 | 663 | |
|
613 | 664 | def test_msg2obj(): |
General Comments 0
You need to be logged in to leave comments.
Login now