##// END OF EJS Templates
StreamSession better handles invalid/missing keys
MinRK -
Show More
@@ -435,7 +435,7 class StreamSession(object):
435 except zmq.ZMQError as e:
435 except zmq.ZMQError as e:
436 if e.errno == zmq.EAGAIN:
436 if e.errno == zmq.EAGAIN:
437 # We can convert EAGAIN to None as we know in this case
437 # We can convert EAGAIN to None as we know in this case
438 # recv_json won't return None.
438 # recv_multipart won't return None.
439 return None
439 return None
440 else:
440 else:
441 raise
441 raise
@@ -468,9 +468,11 class StreamSession(object):
468 msg will be a list of bytes or Messages, unchanged from input
468 msg will be a list of bytes or Messages, unchanged from input
469 msg should be unpackable via self.unpack_message at this point.
469 msg should be unpackable via self.unpack_message at this point.
470 """
470 """
471 ikey = int(self.key is not None)
472 minlen = 3 + ikey
471 msg = list(msg)
473 msg = list(msg)
472 idents = []
474 idents = []
473 while len(msg) > 3:
475 while len(msg) > minlen:
474 if copy:
476 if copy:
475 s = msg[0]
477 s = msg[0]
476 else:
478 else:
@@ -502,8 +504,6 class StreamSession(object):
502 """
504 """
503 ikey = int(self.key is not None)
505 ikey = int(self.key is not None)
504 minlen = 3 + ikey
506 minlen = 3 + ikey
505 if not len(msg) >= minlen:
506 raise TypeError("malformed message, must have at least %i elements"%minlen)
507 message = {}
507 message = {}
508 if not copy:
508 if not copy:
509 for i in range(minlen):
509 for i in range(minlen):
@@ -511,6 +511,8 class StreamSession(object):
511 if ikey:
511 if ikey:
512 if not self.key == msg[0]:
512 if not self.key == msg[0]:
513 raise KeyError("Invalid Session Key: %s"%msg[0])
513 raise KeyError("Invalid Session Key: %s"%msg[0])
514 if not len(msg) >= minlen:
515 raise TypeError("malformed message, must have at least %i elements"%minlen)
514 message['header'] = self.unpack(msg[ikey+0])
516 message['header'] = self.unpack(msg[ikey+0])
515 message['msg_type'] = message['header']['msg_type']
517 message['msg_type'] = message['header']['msg_type']
516 message['parent_header'] = self.unpack(msg[ikey+1])
518 message['parent_header'] = self.unpack(msg[ikey+1])
@@ -518,20 +520,9 class StreamSession(object):
518 message['content'] = self.unpack(msg[ikey+2])
520 message['content'] = self.unpack(msg[ikey+2])
519 else:
521 else:
520 message['content'] = msg[ikey+2]
522 message['content'] = msg[ikey+2]
521
522 # message['buffers'] = msg[3:]
523 # else:
524 # message['header'] = self.unpack(msg[0].bytes)
525 # message['msg_type'] = message['header']['msg_type']
526 # message['parent_header'] = self.unpack(msg[1].bytes)
527 # if content:
528 # message['content'] = self.unpack(msg[2].bytes)
529 # else:
530 # message['content'] = msg[2].bytes
531
523
532 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
524 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
533 return message
525 return message
534
535
526
536
527
537 def test_msg2obj():
528 def test_msg2obj():
General Comments 0
You need to be logged in to leave comments. Login now