##// END OF EJS Templates
Renaming unpack_message to unserialize and updating docstrings.
Brian E. Granger -
Show More
@@ -485,7 +485,7 b' class Hub(SessionFactory):'
485 return
485 return
486 client_id = idents[0]
486 client_id = idents[0]
487 try:
487 try:
488 msg = self.session.unpack_message(msg, content=True)
488 msg = self.session.unserialize(msg, content=True)
489 except Exception:
489 except Exception:
490 content = error.wrap_exception()
490 content = error.wrap_exception()
491 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
491 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
@@ -550,7 +550,7 b' class Hub(SessionFactory):'
550 return
550 return
551 queue_id, client_id = idents[:2]
551 queue_id, client_id = idents[:2]
552 try:
552 try:
553 msg = self.session.unpack_message(msg)
553 msg = self.session.unserialize(msg)
554 except Exception:
554 except Exception:
555 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
555 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
556 return
556 return
@@ -597,7 +597,7 b' class Hub(SessionFactory):'
597
597
598 client_id, queue_id = idents[:2]
598 client_id, queue_id = idents[:2]
599 try:
599 try:
600 msg = self.session.unpack_message(msg)
600 msg = self.session.unserialize(msg)
601 except Exception:
601 except Exception:
602 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
602 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
603 queue_id,client_id, msg), exc_info=True)
603 queue_id,client_id, msg), exc_info=True)
@@ -647,7 +647,7 b' class Hub(SessionFactory):'
647 client_id = idents[0]
647 client_id = idents[0]
648
648
649 try:
649 try:
650 msg = self.session.unpack_message(msg)
650 msg = self.session.unserialize(msg)
651 except Exception:
651 except Exception:
652 self.log.error("task::client %r sent invalid task message: %r"%(
652 self.log.error("task::client %r sent invalid task message: %r"%(
653 client_id, msg), exc_info=True)
653 client_id, msg), exc_info=True)
@@ -697,7 +697,7 b' class Hub(SessionFactory):'
697 """save the result of a completed task."""
697 """save the result of a completed task."""
698 client_id = idents[0]
698 client_id = idents[0]
699 try:
699 try:
700 msg = self.session.unpack_message(msg)
700 msg = self.session.unserialize(msg)
701 except Exception:
701 except Exception:
702 self.log.error("task::invalid task result message send to %r: %r"%(
702 self.log.error("task::invalid task result message send to %r: %r"%(
703 client_id, msg), exc_info=True)
703 client_id, msg), exc_info=True)
@@ -744,7 +744,7 b' class Hub(SessionFactory):'
744
744
745 def save_task_destination(self, idents, msg):
745 def save_task_destination(self, idents, msg):
746 try:
746 try:
747 msg = self.session.unpack_message(msg, content=True)
747 msg = self.session.unserialize(msg, content=True)
748 except Exception:
748 except Exception:
749 self.log.error("task::invalid task tracking message", exc_info=True)
749 self.log.error("task::invalid task tracking message", exc_info=True)
750 return
750 return
@@ -781,7 +781,7 b' class Hub(SessionFactory):'
781 """save an iopub message into the db"""
781 """save an iopub message into the db"""
782 # print (topics)
782 # print (topics)
783 try:
783 try:
784 msg = self.session.unpack_message(msg, content=True)
784 msg = self.session.unserialize(msg, content=True)
785 except Exception:
785 except Exception:
786 self.log.error("iopub::invalid IOPub message", exc_info=True)
786 self.log.error("iopub::invalid IOPub message", exc_info=True)
787 return
787 return
@@ -211,7 +211,7 b' class TaskScheduler(SessionFactory):'
211 self.log.warn("task::Invalid Message: %r",msg)
211 self.log.warn("task::Invalid Message: %r",msg)
212 return
212 return
213 try:
213 try:
214 msg = self.session.unpack_message(msg)
214 msg = self.session.unserialize(msg)
215 except ValueError:
215 except ValueError:
216 self.log.warn("task::Unauthorized message from: %r"%idents)
216 self.log.warn("task::Unauthorized message from: %r"%idents)
217 return
217 return
@@ -307,7 +307,7 b' class TaskScheduler(SessionFactory):'
307 self.notifier_stream.flush()
307 self.notifier_stream.flush()
308 try:
308 try:
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
310 msg = self.session.unpack_message(msg, content=False, copy=False)
310 msg = self.session.unserialize(msg, content=False, copy=False)
311 except Exception:
311 except Exception:
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
313 return
313 return
@@ -515,7 +515,7 b' class TaskScheduler(SessionFactory):'
515 """dispatch method for result replies"""
515 """dispatch method for result replies"""
516 try:
516 try:
517 idents,msg = self.session.feed_identities(raw_msg, copy=False)
517 idents,msg = self.session.feed_identities(raw_msg, copy=False)
518 msg = self.session.unpack_message(msg, content=False, copy=False)
518 msg = self.session.unserialize(msg, content=False, copy=False)
519 engine = idents[0]
519 engine = idents[0]
520 try:
520 try:
521 idx = self.targets.index(engine)
521 idx = self.targets.index(engine)
@@ -90,7 +90,7 b' class EngineFactory(RegistrationFactory):'
90 loop = self.loop
90 loop = self.loop
91 identity = self.bident
91 identity = self.bident
92 idents,msg = self.session.feed_identities(msg)
92 idents,msg = self.session.feed_identities(msg)
93 msg = Message(self.session.unpack_message(msg))
93 msg = Message(self.session.unserialize(msg))
94
94
95 if msg.content.status == 'ok':
95 if msg.content.status == 'ok':
96 self.id = int(msg.content.id)
96 self.id = int(msg.content.id)
@@ -40,7 +40,7 b' class KernelStarter(object):'
40 def dispatch_request(self, raw_msg):
40 def dispatch_request(self, raw_msg):
41 idents, msg = self.session.feed_identities()
41 idents, msg = self.session.feed_identities()
42 try:
42 try:
43 msg = self.session.unpack_message(msg, content=False)
43 msg = self.session.unserialize(msg, content=False)
44 except:
44 except:
45 print ("bad msg: %s"%msg)
45 print ("bad msg: %s"%msg)
46
46
@@ -54,7 +54,7 b' class KernelStarter(object):'
54 def dispatch_reply(self, raw_msg):
54 def dispatch_reply(self, raw_msg):
55 idents, msg = self.session.feed_identities()
55 idents, msg = self.session.feed_identities()
56 try:
56 try:
57 msg = self.session.unpack_message(msg, content=False)
57 msg = self.session.unserialize(msg, content=False)
58 except:
58 except:
59 print ("bad msg: %s"%msg)
59 print ("bad msg: %s"%msg)
60
60
@@ -195,7 +195,7 b' class Kernel(SessionFactory):'
195 def dispatch_control(self, msg):
195 def dispatch_control(self, msg):
196 idents,msg = self.session.feed_identities(msg, copy=False)
196 idents,msg = self.session.feed_identities(msg, copy=False)
197 try:
197 try:
198 msg = self.session.unpack_message(msg, content=True, copy=False)
198 msg = self.session.unserialize(msg, content=True, copy=False)
199 except:
199 except:
200 self.log.error("Invalid Message", exc_info=True)
200 self.log.error("Invalid Message", exc_info=True)
201 return
201 return
@@ -373,7 +373,7 b' class Kernel(SessionFactory):'
373 self.control_stream.flush()
373 self.control_stream.flush()
374 idents,msg = self.session.feed_identities(msg, copy=False)
374 idents,msg = self.session.feed_identities(msg, copy=False)
375 try:
375 try:
376 msg = self.session.unpack_message(msg, content=True, copy=False)
376 msg = self.session.unserialize(msg, content=True, copy=False)
377 except:
377 except:
378 self.log.error("Invalid Message", exc_info=True)
378 self.log.error("Invalid Message", exc_info=True)
379 return
379 return
@@ -383,6 +383,10 b' class Session(Configurable):'
383 def serialize(self, msg, ident=None):
383 def serialize(self, msg, ident=None):
384 """Serialize the message components to bytes.
384 """Serialize the message components to bytes.
385
385
386 This is roughly the inverse of unserialize. The serialize/unserialize
387 methods work with full message lists, whereas pack/unpack work with
388 the individual message parts in the message list.
389
386 Parameters
390 Parameters
387 ----------
391 ----------
388 msg : dict or Message
392 msg : dict or Message
@@ -576,7 +580,7 b' class Session(Configurable):'
576 # invalid large messages can cause very expensive string comparisons
580 # invalid large messages can cause very expensive string comparisons
577 idents, msg_list = self.feed_identities(msg_list, copy)
581 idents, msg_list = self.feed_identities(msg_list, copy)
578 try:
582 try:
579 return idents, self.unpack_message(msg_list, content=content, copy=copy)
583 return idents, self.unserialize(msg_list, content=content, copy=copy)
580 except Exception as e:
584 except Exception as e:
581 print (idents, msg_list)
585 print (idents, msg_list)
582 # TODO: handle it
586 # TODO: handle it
@@ -599,9 +603,11 b' class Session(Configurable):'
599 Returns
603 Returns
600 -------
604 -------
601 (idents,msg_list) : two lists
605 (idents, msg_list) : two lists
602 idents will always be a list of bytes - the indentity prefix
606 idents will always be a list of bytes, each of which is a ZMQ
603 msg_list will be a list of bytes or Messages, unchanged from input
607 identity. msg_list will be a list of bytes or zmq.Messages of the
604 msg_list should be unpackable via self.unpack_message at this point.
608 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
609 should be unpackable/unserializable via self.unserialize at this
610 point.
605 """
611 """
606 if copy:
612 if copy:
607 idx = msg_list.index(DELIM)
613 idx = msg_list.index(DELIM)
@@ -617,21 +623,30 b' class Session(Configurable):'
617 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
623 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
618 return [m.bytes for m in idents], msg_list
624 return [m.bytes for m in idents], msg_list
619
625
620 def unpack_message(self, msg_list, content=True, copy=True):
626 def unserialize(self, msg_list, content=True, copy=True):
621 """Return a message object from the format
627 """Unserialize a msg_list to a nested message dict.
622 sent by self.send.
628
629 This is roughly the inverse of serialize. The serialize/unserialize
630 methods work with full message lists, whereas pack/unpack work with
631 the individual message parts in the message list.
623
632
624 Parameters:
633 Parameters:
625 -----------
634 -----------
626
635 msg_list : list of bytes or Message objects
636 The list of message parts of the form [HMAC,p_header,p_parent,
637 p_content,buffer1,buffer2,...].
627 content : bool (True)
638 content : bool (True)
628 whether to unpack the content dict (True),
639 Whether to unpack the content dict (True), or leave it packed
629 or leave it serialized (False)
640 (False).
630
631 copy : bool (True)
641 copy : bool (True)
632 whether to return the bytes (True),
642 Whether to return the bytes (True), or the non-copying Message
633 or the non-copying Message object in each place (False)
643 object in each place (False).
634
644
645 Returns
646 -------
647 msg : dict
648 The nested message dict with top-level keys [header, parent_header,
649 content, buffers].
635 """
650 """
636 minlen = 4
651 minlen = 4
637 message = {}
652 message = {}
General Comments 0
You need to be logged in to leave comments. Login now