Show More
@@ -485,7 +485,7 b' class Hub(SessionFactory):' | |||
|
485 | 485 | return |
|
486 | 486 | client_id = idents[0] |
|
487 | 487 | try: |
|
488 |
msg = self.session.un |
|
|
488 | msg = self.session.unserialize(msg, content=True) | |
|
489 | 489 | except Exception: |
|
490 | 490 | content = error.wrap_exception() |
|
491 | 491 | self.log.error("Bad Query Message: %r"%msg, exc_info=True) |
@@ -550,7 +550,7 b' class Hub(SessionFactory):' | |||
|
550 | 550 | return |
|
551 | 551 | queue_id, client_id = idents[:2] |
|
552 | 552 | try: |
|
553 |
msg = self.session.un |
|
|
553 | msg = self.session.unserialize(msg) | |
|
554 | 554 | except Exception: |
|
555 | 555 | self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) |
|
556 | 556 | return |
@@ -597,7 +597,7 b' class Hub(SessionFactory):' | |||
|
597 | 597 | |
|
598 | 598 | client_id, queue_id = idents[:2] |
|
599 | 599 | try: |
|
600 |
msg = self.session.un |
|
|
600 | msg = self.session.unserialize(msg) | |
|
601 | 601 | except Exception: |
|
602 | 602 | self.log.error("queue::engine %r sent invalid message to %r: %r"%( |
|
603 | 603 | queue_id,client_id, msg), exc_info=True) |
@@ -647,7 +647,7 b' class Hub(SessionFactory):' | |||
|
647 | 647 | client_id = idents[0] |
|
648 | 648 | |
|
649 | 649 | try: |
|
650 |
msg = self.session.un |
|
|
650 | msg = self.session.unserialize(msg) | |
|
651 | 651 | except Exception: |
|
652 | 652 | self.log.error("task::client %r sent invalid task message: %r"%( |
|
653 | 653 | client_id, msg), exc_info=True) |
@@ -697,7 +697,7 b' class Hub(SessionFactory):' | |||
|
697 | 697 | """save the result of a completed task.""" |
|
698 | 698 | client_id = idents[0] |
|
699 | 699 | try: |
|
700 |
msg = self.session.un |
|
|
700 | msg = self.session.unserialize(msg) | |
|
701 | 701 | except Exception: |
|
702 | 702 | self.log.error("task::invalid task result message send to %r: %r"%( |
|
703 | 703 | client_id, msg), exc_info=True) |
@@ -744,7 +744,7 b' class Hub(SessionFactory):' | |||
|
744 | 744 | |
|
745 | 745 | def save_task_destination(self, idents, msg): |
|
746 | 746 | try: |
|
747 |
msg = self.session.un |
|
|
747 | msg = self.session.unserialize(msg, content=True) | |
|
748 | 748 | except Exception: |
|
749 | 749 | self.log.error("task::invalid task tracking message", exc_info=True) |
|
750 | 750 | return |
@@ -781,7 +781,7 b' class Hub(SessionFactory):' | |||
|
781 | 781 | """save an iopub message into the db""" |
|
782 | 782 | # print (topics) |
|
783 | 783 | try: |
|
784 |
msg = self.session.un |
|
|
784 | msg = self.session.unserialize(msg, content=True) | |
|
785 | 785 | except Exception: |
|
786 | 786 | self.log.error("iopub::invalid IOPub message", exc_info=True) |
|
787 | 787 | return |
@@ -211,7 +211,7 b' class TaskScheduler(SessionFactory):' | |||
|
211 | 211 | self.log.warn("task::Invalid Message: %r",msg) |
|
212 | 212 | return |
|
213 | 213 | try: |
|
214 |
msg = self.session.un |
|
|
214 | msg = self.session.unserialize(msg) | |
|
215 | 215 | except ValueError: |
|
216 | 216 | self.log.warn("task::Unauthorized message from: %r"%idents) |
|
217 | 217 | return |
@@ -307,7 +307,7 b' class TaskScheduler(SessionFactory):' | |||
|
307 | 307 | self.notifier_stream.flush() |
|
308 | 308 | try: |
|
309 | 309 | idents, msg = self.session.feed_identities(raw_msg, copy=False) |
|
310 |
msg = self.session.un |
|
|
310 | msg = self.session.unserialize(msg, content=False, copy=False) | |
|
311 | 311 | except Exception: |
|
312 | 312 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) |
|
313 | 313 | return |
@@ -515,7 +515,7 b' class TaskScheduler(SessionFactory):' | |||
|
515 | 515 | """dispatch method for result replies""" |
|
516 | 516 | try: |
|
517 | 517 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
518 |
msg = self.session.un |
|
|
518 | msg = self.session.unserialize(msg, content=False, copy=False) | |
|
519 | 519 | engine = idents[0] |
|
520 | 520 | try: |
|
521 | 521 | idx = self.targets.index(engine) |
@@ -90,7 +90,7 b' class EngineFactory(RegistrationFactory):' | |||
|
90 | 90 | loop = self.loop |
|
91 | 91 | identity = self.bident |
|
92 | 92 | idents,msg = self.session.feed_identities(msg) |
|
93 |
msg = Message(self.session.un |
|
|
93 | msg = Message(self.session.unserialize(msg)) | |
|
94 | 94 | |
|
95 | 95 | if msg.content.status == 'ok': |
|
96 | 96 | self.id = int(msg.content.id) |
@@ -40,7 +40,7 b' class KernelStarter(object):' | |||
|
40 | 40 | def dispatch_request(self, raw_msg): |
|
41 | 41 | idents, msg = self.session.feed_identities() |
|
42 | 42 | try: |
|
43 |
msg = self.session.un |
|
|
43 | msg = self.session.unserialize(msg, content=False) | |
|
44 | 44 | except: |
|
45 | 45 | print ("bad msg: %s"%msg) |
|
46 | 46 | |
@@ -54,7 +54,7 b' class KernelStarter(object):' | |||
|
54 | 54 | def dispatch_reply(self, raw_msg): |
|
55 | 55 | idents, msg = self.session.feed_identities() |
|
56 | 56 | try: |
|
57 |
msg = self.session.un |
|
|
57 | msg = self.session.unserialize(msg, content=False) | |
|
58 | 58 | except: |
|
59 | 59 | print ("bad msg: %s"%msg) |
|
60 | 60 |
@@ -195,7 +195,7 b' class Kernel(SessionFactory):' | |||
|
195 | 195 | def dispatch_control(self, msg): |
|
196 | 196 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
197 | 197 | try: |
|
198 |
msg = self.session.un |
|
|
198 | msg = self.session.unserialize(msg, content=True, copy=False) | |
|
199 | 199 | except: |
|
200 | 200 | self.log.error("Invalid Message", exc_info=True) |
|
201 | 201 | return |
@@ -373,7 +373,7 b' class Kernel(SessionFactory):' | |||
|
373 | 373 | self.control_stream.flush() |
|
374 | 374 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
375 | 375 | try: |
|
376 |
msg = self.session.un |
|
|
376 | msg = self.session.unserialize(msg, content=True, copy=False) | |
|
377 | 377 | except: |
|
378 | 378 | self.log.error("Invalid Message", exc_info=True) |
|
379 | 379 | return |
@@ -383,6 +383,10 b' class Session(Configurable):' | |||
|
383 | 383 | def serialize(self, msg, ident=None): |
|
384 | 384 | """Serialize the message components to bytes. |
|
385 | 385 | |
|
386 | This is roughly the inverse of unserialize. The serialize/unserialize | |
|
387 | methods work with full message lists, whereas pack/unpack work with | |
|
388 | the individual message parts in the message list. | |
|
389 | ||
|
386 | 390 | Parameters |
|
387 | 391 | ---------- |
|
388 | 392 | msg : dict or Message |
@@ -576,7 +580,7 b' class Session(Configurable):' | |||
|
576 | 580 | # invalid large messages can cause very expensive string comparisons |
|
577 | 581 | idents, msg_list = self.feed_identities(msg_list, copy) |
|
578 | 582 | try: |
|
579 |
return idents, self.un |
|
|
583 | return idents, self.unserialize(msg_list, content=content, copy=copy) | |
|
580 | 584 | except Exception as e: |
|
581 | 585 | print (idents, msg_list) |
|
582 | 586 | # TODO: handle it |
@@ -598,10 +602,12 b' class Session(Configurable):' | |||
|
598 | 602 | |
|
599 | 603 | Returns |
|
600 | 604 | ------- |
|
601 | (idents,msg_list) : two lists | |
|
602 |
idents will always be a list of bytes |
|
|
603 |
msg_list will be a list of bytes or Messages |
|
|
604 | msg_list should be unpackable via self.unpack_message at this point. | |
|
605 | (idents, msg_list) : two lists | |
|
606 | idents will always be a list of bytes, each of which is a ZMQ | |
|
607 | identity. msg_list will be a list of bytes or zmq.Messages of the | |
|
608 | form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and | |
|
609 | should be unpackable/unserializable via self.unserialize at this | |
|
610 | point. | |
|
605 | 611 | """ |
|
606 | 612 | if copy: |
|
607 | 613 | idx = msg_list.index(DELIM) |
@@ -617,21 +623,30 b' class Session(Configurable):' | |||
|
617 | 623 | idents, msg_list = msg_list[:idx], msg_list[idx+1:] |
|
618 | 624 | return [m.bytes for m in idents], msg_list |
|
619 | 625 | |
|
620 |
def un |
|
|
621 | """Return a message object from the format | |
|
622 | sent by self.send. | |
|
623 | ||
|
626 | def unserialize(self, msg_list, content=True, copy=True): | |
|
627 | """Unserialize a msg_list to a nested message dict. | |
|
628 | ||
|
629 | This is roughly the inverse of serialize. The serialize/unserialize | |
|
630 | methods work with full message lists, whereas pack/unpack work with | |
|
631 | the individual message parts in the message list. | |
|
632 | ||
|
624 | 633 | Parameters: |
|
625 | 634 | ----------- |
|
626 | ||
|
635 | msg_list : list of bytes or Message objects | |
|
636 | The list of message parts of the form [HMAC,p_header,p_parent, | |
|
637 | p_content,buffer1,buffer2,...]. | |
|
627 | 638 | content : bool (True) |
|
628 |
|
|
|
629 |
|
|
|
630 | ||
|
639 | Whether to unpack the content dict (True), or leave it packed | |
|
640 | (False). | |
|
631 | 641 | copy : bool (True) |
|
632 |
|
|
|
633 |
o |
|
|
634 | ||
|
642 | Whether to return the bytes (True), or the non-copying Message | |
|
643 | object in each place (False). | |
|
644 | ||
|
645 | Returns | |
|
646 | ------- | |
|
647 | msg : dict | |
|
648 | The nested message dict with top-level keys [header, parent_header, | |
|
649 | content, buffers]. | |
|
635 | 650 | """ |
|
636 | 651 | minlen = 4 |
|
637 | 652 | message = {} |
General Comments 0
You need to be logged in to leave comments.
Login now