Show More
@@ -351,6 +351,12 b' class Session(Configurable):' | |||||
351 | return msg_header(self.msg_id, msg_type, self.username, self.session) |
|
351 | return msg_header(self.msg_id, msg_type, self.username, self.session) | |
352 |
|
352 | |||
353 | def msg(self, msg_type, content=None, parent=None, subheader=None): |
|
353 | def msg(self, msg_type, content=None, parent=None, subheader=None): | |
|
354 | """Return the nested message dict. | |||
|
355 | ||||
|
356 | This format is different from what is sent over the wire. The | |||
|
357 | self.serialize method converts this nested message dict to the wire | |||
|
358 | format, which uses a message list. | |||
|
359 | """ | |||
354 | msg = {} |
|
360 | msg = {} | |
355 | msg['header'] = self.msg_header(msg_type) |
|
361 | msg['header'] = self.msg_header(msg_type) | |
356 | msg['msg_id'] = msg['header']['msg_id'] |
|
362 | msg['msg_id'] = msg['header']['msg_id'] | |
@@ -361,23 +367,37 b' class Session(Configurable):' | |||||
361 | msg['header'].update(sub) |
|
367 | msg['header'].update(sub) | |
362 | return msg |
|
368 | return msg | |
363 |
|
369 | |||
364 | def sign(self, msg): |
|
370 | def sign(self, msg_list): | |
365 |
"""Sign a message with HMAC digest. If no auth, return b''. |
|
371 | """Sign a message with HMAC digest. If no auth, return b''. | |
|
372 | ||||
|
373 | Parameters | |||
|
374 | ---------- | |||
|
375 | msg_list : list | |||
|
376 | The [p_header,p_parent,p_content] part of the message list. | |||
|
377 | """ | |||
366 | if self.auth is None: |
|
378 | if self.auth is None: | |
367 | return b'' |
|
379 | return b'' | |
368 | h = self.auth.copy() |
|
380 | h = self.auth.copy() | |
369 | for m in msg: |
|
381 | for m in msg_list: | |
370 | h.update(m) |
|
382 | h.update(m) | |
371 | return h.hexdigest() |
|
383 | return h.hexdigest() | |
372 |
|
384 | |||
373 | def serialize(self, msg, ident=None): |
|
385 | def serialize(self, msg, ident=None): | |
374 | """Serialize the message components to bytes. |
|
386 | """Serialize the message components to bytes. | |
375 |
|
387 | |||
|
388 | Parameters | |||
|
389 | ---------- | |||
|
390 | msg : dict or Message | |||
|
391 | The nexted message dict as returned by the self.msg method. | |||
|
392 | ||||
376 | Returns |
|
393 | Returns | |
377 | ------- |
|
394 | ------- | |
378 |
|
395 | msg_list : list | ||
379 | list of bytes objects |
|
396 | The list of bytes objects to be sent with the format: | |
380 |
|
397 | [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, | ||
|
398 | buffer1,buffer2,...]. In this list, the p_* entities are | |||
|
399 | the packed or serialized versions, so if JSON is used, these | |||
|
400 | are uft8 encoded JSON strings. | |||
381 | """ |
|
401 | """ | |
382 | content = msg.get('content', {}) |
|
402 | content = msg.get('content', {}) | |
383 | if content is None: |
|
403 | if content is None: | |
@@ -417,7 +437,15 b' class Session(Configurable):' | |||||
417 | def send(self, stream, msg_or_type, content=None, parent=None, ident=None, |
|
437 | def send(self, stream, msg_or_type, content=None, parent=None, ident=None, | |
418 | buffers=None, subheader=None, track=False): |
|
438 | buffers=None, subheader=None, track=False): | |
419 | """Build and send a message via stream or socket. |
|
439 | """Build and send a message via stream or socket. | |
420 |
|
440 | |||
|
441 | The message format used by this function internally is as follows: | |||
|
442 | ||||
|
443 | [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, | |||
|
444 | buffer1,buffer2,...] | |||
|
445 | ||||
|
446 | The self.serialize method converts the nested message dict into this | |||
|
447 | format. | |||
|
448 | ||||
421 | Parameters |
|
449 | Parameters | |
422 | ---------- |
|
450 | ---------- | |
423 |
|
451 | |||
@@ -483,7 +511,7 b' class Session(Configurable):' | |||||
483 | tracker = stream.send(buffers[-1], copy=False, track=track) |
|
511 | tracker = stream.send(buffers[-1], copy=False, track=track) | |
484 | else: |
|
512 | else: | |
485 | tracker = stream.send(buffers[-1], copy=False) |
|
513 | tracker = stream.send(buffers[-1], copy=False) | |
486 |
|
514 | |||
487 | # omsg = Message(msg) |
|
515 | # omsg = Message(msg) | |
488 | if self.debug: |
|
516 | if self.debug: | |
489 | pprint.pprint(msg) |
|
517 | pprint.pprint(msg) | |
@@ -494,12 +522,22 b' class Session(Configurable):' | |||||
494 |
|
522 | |||
495 | return msg |
|
523 | return msg | |
496 |
|
524 | |||
497 | def send_raw(self, stream, msg, flags=0, copy=True, ident=None): |
|
525 | def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None): | |
498 | """Send a raw message via ident path. |
|
526 | """Send a raw message via ident path. | |
499 |
|
527 | |||
|
528 | This method is used to send a already serialized message. | |||
|
529 | ||||
500 | Parameters |
|
530 | Parameters | |
501 | ---------- |
|
531 | ---------- | |
502 | msg : list of sendable buffers""" |
|
532 | stream : ZMQStream or Socket | |
|
533 | The ZMQ stream or socket to use for sending the message. | |||
|
534 | msg_list : list | |||
|
535 | The serialized list of messages to send. This only includes the | |||
|
536 | [p_header,p_parent,p_content,buffer1,buffer2,...] portion of | |||
|
537 | the message. | |||
|
538 | ident : ident or list | |||
|
539 | A single ident or a list of idents to use in sending. | |||
|
540 | """ | |||
503 | to_send = [] |
|
541 | to_send = [] | |
504 | if isinstance(ident, bytes): |
|
542 | if isinstance(ident, bytes): | |
505 | ident = [ident] |
|
543 | ident = [ident] | |
@@ -507,17 +545,28 b' class Session(Configurable):' | |||||
507 | to_send.extend(ident) |
|
545 | to_send.extend(ident) | |
508 |
|
546 | |||
509 | to_send.append(DELIM) |
|
547 | to_send.append(DELIM) | |
510 | to_send.append(self.sign(msg)) |
|
548 | to_send.append(self.sign(msg_list)) | |
511 | to_send.extend(msg) |
|
549 | to_send.extend(msg_list) | |
512 | stream.send_multipart(msg, flags, copy=copy) |
|
550 | stream.send_multipart(msg_list, flags, copy=copy) | |
513 |
|
551 | |||
514 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): |
|
552 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): | |
515 |
""" |
|
553 | """Receive and unpack a message. | |
516 | returns [idents], msg""" |
|
554 | ||
|
555 | Parameters | |||
|
556 | ---------- | |||
|
557 | socket : ZMQStream or Socket | |||
|
558 | The socket or stream to use in receiving. | |||
|
559 | ||||
|
560 | Returns | |||
|
561 | ------- | |||
|
562 | [idents], msg | |||
|
563 | [idents] is a list of idents and msg is a nested message dict of | |||
|
564 | same format as self.msg returns. | |||
|
565 | """ | |||
517 | if isinstance(socket, ZMQStream): |
|
566 | if isinstance(socket, ZMQStream): | |
518 | socket = socket.socket |
|
567 | socket = socket.socket | |
519 | try: |
|
568 | try: | |
520 | msg = socket.recv_multipart(mode) |
|
569 | msg_list = socket.recv_multipart(mode) | |
521 | except zmq.ZMQError as e: |
|
570 | except zmq.ZMQError as e: | |
522 | if e.errno == zmq.EAGAIN: |
|
571 | if e.errno == zmq.EAGAIN: | |
523 | # We can convert EAGAIN to None as we know in this case |
|
572 | # We can convert EAGAIN to None as we know in this case | |
@@ -527,48 +576,50 b' class Session(Configurable):' | |||||
527 | raise |
|
576 | raise | |
528 | # split multipart message into identity list and message dict |
|
577 | # split multipart message into identity list and message dict | |
529 | # invalid large messages can cause very expensive string comparisons |
|
578 | # invalid large messages can cause very expensive string comparisons | |
530 | idents, msg = self.feed_identities(msg, copy) |
|
579 | idents, msg_list = self.feed_identities(msg_list, copy) | |
531 | try: |
|
580 | try: | |
532 | return idents, self.unpack_message(msg, content=content, copy=copy) |
|
581 | return idents, self.unpack_message(msg_list, content=content, copy=copy) | |
533 | except Exception as e: |
|
582 | except Exception as e: | |
534 | print (idents, msg) |
|
583 | print (idents, msg_list) | |
535 | # TODO: handle it |
|
584 | # TODO: handle it | |
536 | raise e |
|
585 | raise e | |
537 |
|
586 | |||
538 | def feed_identities(self, msg, copy=True): |
|
587 | def feed_identities(self, msg_list, copy=True): | |
539 | """feed until DELIM is reached, then return the prefix as idents and |
|
588 | """Split the identities from the rest of the message. | |
540 | remainder as msg. This is easily broken by setting an IDENT to DELIM, |
|
589 | ||
|
590 | Feed until DELIM is reached, then return the prefix as idents and | |||
|
591 | remainder as msg_list. This is easily broken by setting an IDENT to DELIM, | |||
541 | but that would be silly. |
|
592 | but that would be silly. | |
542 |
|
593 | |||
543 | Parameters |
|
594 | Parameters | |
544 | ---------- |
|
595 | ---------- | |
545 | msg : a list of Message or bytes objects |
|
596 | msg_list : a list of Message or bytes objects | |
546 |
|
|
597 | The message to be split. | |
547 | copy : bool |
|
598 | copy : bool | |
548 | flag determining whether the arguments are bytes or Messages |
|
599 | flag determining whether the arguments are bytes or Messages | |
549 |
|
600 | |||
550 | Returns |
|
601 | Returns | |
551 | ------- |
|
602 | ------- | |
552 | (idents,msg) : two lists |
|
603 | (idents,msg_list) : two lists | |
553 | idents will always be a list of bytes - the indentity prefix |
|
604 | idents will always be a list of bytes - the indentity prefix | |
554 | msg will be a list of bytes or Messages, unchanged from input |
|
605 | msg_list will be a list of bytes or Messages, unchanged from input | |
555 | msg should be unpackable via self.unpack_message at this point. |
|
606 | msg_list should be unpackable via self.unpack_message at this point. | |
556 | """ |
|
607 | """ | |
557 | if copy: |
|
608 | if copy: | |
558 | idx = msg.index(DELIM) |
|
609 | idx = msg_list.index(DELIM) | |
559 | return msg[:idx], msg[idx+1:] |
|
610 | return msg_list[:idx], msg_list[idx+1:] | |
560 | else: |
|
611 | else: | |
561 | failed = True |
|
612 | failed = True | |
562 | for idx,m in enumerate(msg): |
|
613 | for idx,m in enumerate(msg_list): | |
563 | if m.bytes == DELIM: |
|
614 | if m.bytes == DELIM: | |
564 | failed = False |
|
615 | failed = False | |
565 | break |
|
616 | break | |
566 | if failed: |
|
617 | if failed: | |
567 | raise ValueError("DELIM not in msg") |
|
618 | raise ValueError("DELIM not in msg_list") | |
568 | idents, msg = msg[:idx], msg[idx+1:] |
|
619 | idents, msg_list = msg_list[:idx], msg_list[idx+1:] | |
569 | return [m.bytes for m in idents], msg |
|
620 | return [m.bytes for m in idents], msg_list | |
570 |
|
621 | |||
571 | def unpack_message(self, msg, content=True, copy=True): |
|
622 | def unpack_message(self, msg_list, content=True, copy=True): | |
572 | """Return a message object from the format |
|
623 | """Return a message object from the format | |
573 | sent by self.send. |
|
624 | sent by self.send. | |
574 |
|
625 | |||
@@ -588,26 +639,26 b' class Session(Configurable):' | |||||
588 | message = {} |
|
639 | message = {} | |
589 | if not copy: |
|
640 | if not copy: | |
590 | for i in range(minlen): |
|
641 | for i in range(minlen): | |
591 | msg[i] = msg[i].bytes |
|
642 | msg_list[i] = msg_list[i].bytes | |
592 | if self.auth is not None: |
|
643 | if self.auth is not None: | |
593 | signature = msg[0] |
|
644 | signature = msg_list[0] | |
594 | if signature in self.digest_history: |
|
645 | if signature in self.digest_history: | |
595 | raise ValueError("Duplicate Signature: %r"%signature) |
|
646 | raise ValueError("Duplicate Signature: %r"%signature) | |
596 | self.digest_history.add(signature) |
|
647 | self.digest_history.add(signature) | |
597 | check = self.sign(msg[1:4]) |
|
648 | check = self.sign(msg_list[1:4]) | |
598 | if not signature == check: |
|
649 | if not signature == check: | |
599 | raise ValueError("Invalid Signature: %r"%signature) |
|
650 | raise ValueError("Invalid Signature: %r"%signature) | |
600 | if not len(msg) >= minlen: |
|
651 | if not len(msg_list) >= minlen: | |
601 | raise TypeError("malformed message, must have at least %i elements"%minlen) |
|
652 | raise TypeError("malformed message, must have at least %i elements"%minlen) | |
602 | message['header'] = self.unpack(msg[1]) |
|
653 | message['header'] = self.unpack(msg_list[1]) | |
603 | message['msg_type'] = message['header']['msg_type'] |
|
654 | message['msg_type'] = message['header']['msg_type'] | |
604 | message['parent_header'] = self.unpack(msg[2]) |
|
655 | message['parent_header'] = self.unpack(msg_list[2]) | |
605 | if content: |
|
656 | if content: | |
606 | message['content'] = self.unpack(msg[3]) |
|
657 | message['content'] = self.unpack(msg_list[3]) | |
607 | else: |
|
658 | else: | |
608 | message['content'] = msg[3] |
|
659 | message['content'] = msg_list[3] | |
609 |
|
660 | |||
610 | message['buffers'] = msg[4:] |
|
661 | message['buffers'] = msg_list[4:] | |
611 | return message |
|
662 | return message | |
612 |
|
663 | |||
613 | def test_msg2obj(): |
|
664 | def test_msg2obj(): |
General Comments 0
You need to be logged in to leave comments.
Login now