Show More
@@ -485,7 +485,7 b' class Hub(SessionFactory):' | |||||
485 | return |
|
485 | return | |
486 | client_id = idents[0] |
|
486 | client_id = idents[0] | |
487 | try: |
|
487 | try: | |
488 |
msg = self.session.un |
|
488 | msg = self.session.unserialize(msg, content=True) | |
489 | except Exception: |
|
489 | except Exception: | |
490 | content = error.wrap_exception() |
|
490 | content = error.wrap_exception() | |
491 | self.log.error("Bad Query Message: %r"%msg, exc_info=True) |
|
491 | self.log.error("Bad Query Message: %r"%msg, exc_info=True) | |
@@ -550,7 +550,7 b' class Hub(SessionFactory):' | |||||
550 | return |
|
550 | return | |
551 | queue_id, client_id = idents[:2] |
|
551 | queue_id, client_id = idents[:2] | |
552 | try: |
|
552 | try: | |
553 |
msg = self.session.un |
|
553 | msg = self.session.unserialize(msg) | |
554 | except Exception: |
|
554 | except Exception: | |
555 | self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) |
|
555 | self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) | |
556 | return |
|
556 | return | |
@@ -597,7 +597,7 b' class Hub(SessionFactory):' | |||||
597 |
|
597 | |||
598 | client_id, queue_id = idents[:2] |
|
598 | client_id, queue_id = idents[:2] | |
599 | try: |
|
599 | try: | |
600 |
msg = self.session.un |
|
600 | msg = self.session.unserialize(msg) | |
601 | except Exception: |
|
601 | except Exception: | |
602 | self.log.error("queue::engine %r sent invalid message to %r: %r"%( |
|
602 | self.log.error("queue::engine %r sent invalid message to %r: %r"%( | |
603 | queue_id,client_id, msg), exc_info=True) |
|
603 | queue_id,client_id, msg), exc_info=True) | |
@@ -647,7 +647,7 b' class Hub(SessionFactory):' | |||||
647 | client_id = idents[0] |
|
647 | client_id = idents[0] | |
648 |
|
648 | |||
649 | try: |
|
649 | try: | |
650 |
msg = self.session.un |
|
650 | msg = self.session.unserialize(msg) | |
651 | except Exception: |
|
651 | except Exception: | |
652 | self.log.error("task::client %r sent invalid task message: %r"%( |
|
652 | self.log.error("task::client %r sent invalid task message: %r"%( | |
653 | client_id, msg), exc_info=True) |
|
653 | client_id, msg), exc_info=True) | |
@@ -697,7 +697,7 b' class Hub(SessionFactory):' | |||||
697 | """save the result of a completed task.""" |
|
697 | """save the result of a completed task.""" | |
698 | client_id = idents[0] |
|
698 | client_id = idents[0] | |
699 | try: |
|
699 | try: | |
700 |
msg = self.session.un |
|
700 | msg = self.session.unserialize(msg) | |
701 | except Exception: |
|
701 | except Exception: | |
702 | self.log.error("task::invalid task result message send to %r: %r"%( |
|
702 | self.log.error("task::invalid task result message send to %r: %r"%( | |
703 | client_id, msg), exc_info=True) |
|
703 | client_id, msg), exc_info=True) | |
@@ -744,7 +744,7 b' class Hub(SessionFactory):' | |||||
744 |
|
744 | |||
745 | def save_task_destination(self, idents, msg): |
|
745 | def save_task_destination(self, idents, msg): | |
746 | try: |
|
746 | try: | |
747 |
msg = self.session.un |
|
747 | msg = self.session.unserialize(msg, content=True) | |
748 | except Exception: |
|
748 | except Exception: | |
749 | self.log.error("task::invalid task tracking message", exc_info=True) |
|
749 | self.log.error("task::invalid task tracking message", exc_info=True) | |
750 | return |
|
750 | return | |
@@ -781,7 +781,7 b' class Hub(SessionFactory):' | |||||
781 | """save an iopub message into the db""" |
|
781 | """save an iopub message into the db""" | |
782 | # print (topics) |
|
782 | # print (topics) | |
783 | try: |
|
783 | try: | |
784 |
msg = self.session.un |
|
784 | msg = self.session.unserialize(msg, content=True) | |
785 | except Exception: |
|
785 | except Exception: | |
786 | self.log.error("iopub::invalid IOPub message", exc_info=True) |
|
786 | self.log.error("iopub::invalid IOPub message", exc_info=True) | |
787 | return |
|
787 | return |
@@ -211,7 +211,7 b' class TaskScheduler(SessionFactory):' | |||||
211 | self.log.warn("task::Invalid Message: %r",msg) |
|
211 | self.log.warn("task::Invalid Message: %r",msg) | |
212 | return |
|
212 | return | |
213 | try: |
|
213 | try: | |
214 |
msg = self.session.un |
|
214 | msg = self.session.unserialize(msg) | |
215 | except ValueError: |
|
215 | except ValueError: | |
216 | self.log.warn("task::Unauthorized message from: %r"%idents) |
|
216 | self.log.warn("task::Unauthorized message from: %r"%idents) | |
217 | return |
|
217 | return | |
@@ -307,7 +307,7 b' class TaskScheduler(SessionFactory):' | |||||
307 | self.notifier_stream.flush() |
|
307 | self.notifier_stream.flush() | |
308 | try: |
|
308 | try: | |
309 | idents, msg = self.session.feed_identities(raw_msg, copy=False) |
|
309 | idents, msg = self.session.feed_identities(raw_msg, copy=False) | |
310 |
msg = self.session.un |
|
310 | msg = self.session.unserialize(msg, content=False, copy=False) | |
311 | except Exception: |
|
311 | except Exception: | |
312 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) |
|
312 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) | |
313 | return |
|
313 | return | |
@@ -515,7 +515,7 b' class TaskScheduler(SessionFactory):' | |||||
515 | """dispatch method for result replies""" |
|
515 | """dispatch method for result replies""" | |
516 | try: |
|
516 | try: | |
517 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
517 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
518 |
msg = self.session.un |
|
518 | msg = self.session.unserialize(msg, content=False, copy=False) | |
519 | engine = idents[0] |
|
519 | engine = idents[0] | |
520 | try: |
|
520 | try: | |
521 | idx = self.targets.index(engine) |
|
521 | idx = self.targets.index(engine) |
@@ -90,7 +90,7 b' class EngineFactory(RegistrationFactory):' | |||||
90 | loop = self.loop |
|
90 | loop = self.loop | |
91 | identity = self.bident |
|
91 | identity = self.bident | |
92 | idents,msg = self.session.feed_identities(msg) |
|
92 | idents,msg = self.session.feed_identities(msg) | |
93 |
msg = Message(self.session.un |
|
93 | msg = Message(self.session.unserialize(msg)) | |
94 |
|
94 | |||
95 | if msg.content.status == 'ok': |
|
95 | if msg.content.status == 'ok': | |
96 | self.id = int(msg.content.id) |
|
96 | self.id = int(msg.content.id) |
@@ -40,7 +40,7 b' class KernelStarter(object):' | |||||
40 | def dispatch_request(self, raw_msg): |
|
40 | def dispatch_request(self, raw_msg): | |
41 | idents, msg = self.session.feed_identities() |
|
41 | idents, msg = self.session.feed_identities() | |
42 | try: |
|
42 | try: | |
43 |
msg = self.session.un |
|
43 | msg = self.session.unserialize(msg, content=False) | |
44 | except: |
|
44 | except: | |
45 | print ("bad msg: %s"%msg) |
|
45 | print ("bad msg: %s"%msg) | |
46 |
|
46 | |||
@@ -54,7 +54,7 b' class KernelStarter(object):' | |||||
54 | def dispatch_reply(self, raw_msg): |
|
54 | def dispatch_reply(self, raw_msg): | |
55 | idents, msg = self.session.feed_identities() |
|
55 | idents, msg = self.session.feed_identities() | |
56 | try: |
|
56 | try: | |
57 |
msg = self.session.un |
|
57 | msg = self.session.unserialize(msg, content=False) | |
58 | except: |
|
58 | except: | |
59 | print ("bad msg: %s"%msg) |
|
59 | print ("bad msg: %s"%msg) | |
60 |
|
60 |
@@ -195,7 +195,7 b' class Kernel(SessionFactory):' | |||||
195 | def dispatch_control(self, msg): |
|
195 | def dispatch_control(self, msg): | |
196 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
196 | idents,msg = self.session.feed_identities(msg, copy=False) | |
197 | try: |
|
197 | try: | |
198 |
msg = self.session.un |
|
198 | msg = self.session.unserialize(msg, content=True, copy=False) | |
199 | except: |
|
199 | except: | |
200 | self.log.error("Invalid Message", exc_info=True) |
|
200 | self.log.error("Invalid Message", exc_info=True) | |
201 | return |
|
201 | return | |
@@ -373,7 +373,7 b' class Kernel(SessionFactory):' | |||||
373 | self.control_stream.flush() |
|
373 | self.control_stream.flush() | |
374 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
374 | idents,msg = self.session.feed_identities(msg, copy=False) | |
375 | try: |
|
375 | try: | |
376 |
msg = self.session.un |
|
376 | msg = self.session.unserialize(msg, content=True, copy=False) | |
377 | except: |
|
377 | except: | |
378 | self.log.error("Invalid Message", exc_info=True) |
|
378 | self.log.error("Invalid Message", exc_info=True) | |
379 | return |
|
379 | return |
@@ -383,6 +383,10 b' class Session(Configurable):' | |||||
383 | def serialize(self, msg, ident=None): |
|
383 | def serialize(self, msg, ident=None): | |
384 | """Serialize the message components to bytes. |
|
384 | """Serialize the message components to bytes. | |
385 |
|
385 | |||
|
386 | This is roughly the inverse of unserialize. The serialize/unserialize | |||
|
387 | methods work with full message lists, whereas pack/unpack work with | |||
|
388 | the individual message parts in the message list. | |||
|
389 | ||||
386 | Parameters |
|
390 | Parameters | |
387 | ---------- |
|
391 | ---------- | |
388 | msg : dict or Message |
|
392 | msg : dict or Message | |
@@ -576,7 +580,7 b' class Session(Configurable):' | |||||
576 | # invalid large messages can cause very expensive string comparisons |
|
580 | # invalid large messages can cause very expensive string comparisons | |
577 | idents, msg_list = self.feed_identities(msg_list, copy) |
|
581 | idents, msg_list = self.feed_identities(msg_list, copy) | |
578 | try: |
|
582 | try: | |
579 |
return idents, self.un |
|
583 | return idents, self.unserialize(msg_list, content=content, copy=copy) | |
580 | except Exception as e: |
|
584 | except Exception as e: | |
581 | print (idents, msg_list) |
|
585 | print (idents, msg_list) | |
582 | # TODO: handle it |
|
586 | # TODO: handle it | |
@@ -598,10 +602,12 b' class Session(Configurable):' | |||||
598 |
|
602 | |||
599 | Returns |
|
603 | Returns | |
600 | ------- |
|
604 | ------- | |
601 | (idents,msg_list) : two lists |
|
605 | (idents, msg_list) : two lists | |
602 |
idents will always be a list of bytes |
|
606 | idents will always be a list of bytes, each of which is a ZMQ | |
603 |
msg_list will be a list of bytes or Messages |
|
607 | identity. msg_list will be a list of bytes or zmq.Messages of the | |
604 | msg_list should be unpackable via self.unpack_message at this point. |
|
608 | form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and | |
|
609 | should be unpackable/unserializable via self.unserialize at this | |||
|
610 | point. | |||
605 | """ |
|
611 | """ | |
606 | if copy: |
|
612 | if copy: | |
607 | idx = msg_list.index(DELIM) |
|
613 | idx = msg_list.index(DELIM) | |
@@ -617,21 +623,30 b' class Session(Configurable):' | |||||
617 | idents, msg_list = msg_list[:idx], msg_list[idx+1:] |
|
623 | idents, msg_list = msg_list[:idx], msg_list[idx+1:] | |
618 | return [m.bytes for m in idents], msg_list |
|
624 | return [m.bytes for m in idents], msg_list | |
619 |
|
625 | |||
620 |
def un |
|
626 | def unserialize(self, msg_list, content=True, copy=True): | |
621 | """Return a message object from the format |
|
627 | """Unserialize a msg_list to a nested message dict. | |
622 | sent by self.send. |
|
628 | ||
623 |
|
629 | This is roughly the inverse of serialize. The serialize/unserialize | ||
|
630 | methods work with full message lists, whereas pack/unpack work with | |||
|
631 | the individual message parts in the message list. | |||
|
632 | ||||
624 | Parameters: |
|
633 | Parameters: | |
625 | ----------- |
|
634 | ----------- | |
626 |
|
635 | msg_list : list of bytes or Message objects | ||
|
636 | The list of message parts of the form [HMAC,p_header,p_parent, | |||
|
637 | p_content,buffer1,buffer2,...]. | |||
627 | content : bool (True) |
|
638 | content : bool (True) | |
628 |
|
|
639 | Whether to unpack the content dict (True), or leave it packed | |
629 |
|
|
640 | (False). | |
630 |
|
||||
631 | copy : bool (True) |
|
641 | copy : bool (True) | |
632 |
|
|
642 | Whether to return the bytes (True), or the non-copying Message | |
633 |
o |
|
643 | object in each place (False). | |
634 |
|
644 | |||
|
645 | Returns | |||
|
646 | ------- | |||
|
647 | msg : dict | |||
|
648 | The nested message dict with top-level keys [header, parent_header, | |||
|
649 | content, buffers]. | |||
635 | """ |
|
650 | """ | |
636 | minlen = 4 |
|
651 | minlen = 4 | |
637 | message = {} |
|
652 | message = {} |
General Comments 0
You need to be logged in to leave comments.
Login now