Show More
@@ -435,7 +435,7 class StreamSession(object): | |||||
435 | except zmq.ZMQError as e: |
|
435 | except zmq.ZMQError as e: | |
436 | if e.errno == zmq.EAGAIN: |
|
436 | if e.errno == zmq.EAGAIN: | |
437 | # We can convert EAGAIN to None as we know in this case |
|
437 | # We can convert EAGAIN to None as we know in this case | |
438 |
# recv_ |
|
438 | # recv_multipart won't return None. | |
439 | return None |
|
439 | return None | |
440 | else: |
|
440 | else: | |
441 | raise |
|
441 | raise | |
@@ -468,9 +468,11 class StreamSession(object): | |||||
468 | msg will be a list of bytes or Messages, unchanged from input |
|
468 | msg will be a list of bytes or Messages, unchanged from input | |
469 | msg should be unpackable via self.unpack_message at this point. |
|
469 | msg should be unpackable via self.unpack_message at this point. | |
470 | """ |
|
470 | """ | |
|
471 | ikey = int(self.key is not None) | |||
|
472 | minlen = 3 + ikey | |||
471 | msg = list(msg) |
|
473 | msg = list(msg) | |
472 | idents = [] |
|
474 | idents = [] | |
473 |
while len(msg) > |
|
475 | while len(msg) > minlen: | |
474 | if copy: |
|
476 | if copy: | |
475 | s = msg[0] |
|
477 | s = msg[0] | |
476 | else: |
|
478 | else: | |
@@ -502,8 +504,6 class StreamSession(object): | |||||
502 | """ |
|
504 | """ | |
503 | ikey = int(self.key is not None) |
|
505 | ikey = int(self.key is not None) | |
504 | minlen = 3 + ikey |
|
506 | minlen = 3 + ikey | |
505 | if not len(msg) >= minlen: |
|
|||
506 | raise TypeError("malformed message, must have at least %i elements"%minlen) |
|
|||
507 | message = {} |
|
507 | message = {} | |
508 | if not copy: |
|
508 | if not copy: | |
509 | for i in range(minlen): |
|
509 | for i in range(minlen): | |
@@ -511,6 +511,8 class StreamSession(object): | |||||
511 | if ikey: |
|
511 | if ikey: | |
512 | if not self.key == msg[0]: |
|
512 | if not self.key == msg[0]: | |
513 | raise KeyError("Invalid Session Key: %s"%msg[0]) |
|
513 | raise KeyError("Invalid Session Key: %s"%msg[0]) | |
|
514 | if not len(msg) >= minlen: | |||
|
515 | raise TypeError("malformed message, must have at least %i elements"%minlen) | |||
514 | message['header'] = self.unpack(msg[ikey+0]) |
|
516 | message['header'] = self.unpack(msg[ikey+0]) | |
515 | message['msg_type'] = message['header']['msg_type'] |
|
517 | message['msg_type'] = message['header']['msg_type'] | |
516 | message['parent_header'] = self.unpack(msg[ikey+1]) |
|
518 | message['parent_header'] = self.unpack(msg[ikey+1]) | |
@@ -518,20 +520,9 class StreamSession(object): | |||||
518 | message['content'] = self.unpack(msg[ikey+2]) |
|
520 | message['content'] = self.unpack(msg[ikey+2]) | |
519 | else: |
|
521 | else: | |
520 | message['content'] = msg[ikey+2] |
|
522 | message['content'] = msg[ikey+2] | |
521 |
|
||||
522 | # message['buffers'] = msg[3:] |
|
|||
523 | # else: |
|
|||
524 | # message['header'] = self.unpack(msg[0].bytes) |
|
|||
525 | # message['msg_type'] = message['header']['msg_type'] |
|
|||
526 | # message['parent_header'] = self.unpack(msg[1].bytes) |
|
|||
527 | # if content: |
|
|||
528 | # message['content'] = self.unpack(msg[2].bytes) |
|
|||
529 | # else: |
|
|||
530 | # message['content'] = msg[2].bytes |
|
|||
531 |
|
523 | |||
532 | message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ] |
|
524 | message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ] | |
533 | return message |
|
525 | return message | |
534 |
|
||||
535 |
|
526 | |||
536 |
|
527 | |||
537 | def test_msg2obj(): |
|
528 | def test_msg2obj(): |
General Comments 0
You need to be logged in to leave comments.
Login now