From 6ceaba58c5b00b328a806c437f05bfda5d509e87 2011-07-14 22:21:15 From: Brian E. Granger Date: 2011-07-14 22:21:15 Subject: [PATCH] Renaming unpack_message to unserialize and updating docstrings. --- diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index a82acc9..6b3f497 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -485,7 +485,7 @@ class Hub(SessionFactory): return client_id = idents[0] try: - msg = self.session.unpack_message(msg, content=True) + msg = self.session.unserialize(msg, content=True) except Exception: content = error.wrap_exception() self.log.error("Bad Query Message: %r"%msg, exc_info=True) @@ -550,7 +550,7 @@ class Hub(SessionFactory): return queue_id, client_id = idents[:2] try: - msg = self.session.unpack_message(msg) + msg = self.session.unserialize(msg) except Exception: self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) return @@ -597,7 +597,7 @@ class Hub(SessionFactory): client_id, queue_id = idents[:2] try: - msg = self.session.unpack_message(msg) + msg = self.session.unserialize(msg) except Exception: self.log.error("queue::engine %r sent invalid message to %r: %r"%( queue_id,client_id, msg), exc_info=True) @@ -647,7 +647,7 @@ class Hub(SessionFactory): client_id = idents[0] try: - msg = self.session.unpack_message(msg) + msg = self.session.unserialize(msg) except Exception: self.log.error("task::client %r sent invalid task message: %r"%( client_id, msg), exc_info=True) @@ -697,7 +697,7 @@ class Hub(SessionFactory): """save the result of a completed task.""" client_id = idents[0] try: - msg = self.session.unpack_message(msg) + msg = self.session.unserialize(msg) except Exception: self.log.error("task::invalid task result message send to %r: %r"%( client_id, msg), exc_info=True) @@ -744,7 +744,7 @@ class Hub(SessionFactory): def save_task_destination(self, idents, msg): try: - msg = self.session.unpack_message(msg, content=True) + msg = self.session.unserialize(msg, content=True) except Exception: self.log.error("task::invalid task tracking message", exc_info=True) return @@ -781,7 +781,7 @@ class Hub(SessionFactory): """save an iopub message into the db""" # print (topics) try: - msg = self.session.unpack_message(msg, content=True) + msg = self.session.unserialize(msg, content=True) except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index db4bb74..d7e6da6 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -211,7 +211,7 @@ class TaskScheduler(SessionFactory): self.log.warn("task::Invalid Message: %r",msg) return try: - msg = self.session.unpack_message(msg) + msg = self.session.unserialize(msg) except ValueError: self.log.warn("task::Unauthorized message from: %r"%idents) return @@ -307,7 +307,7 @@ class TaskScheduler(SessionFactory): self.notifier_stream.flush() try: idents, msg = self.session.feed_identities(raw_msg, copy=False) - msg = self.session.unpack_message(msg, content=False, copy=False) + msg = self.session.unserialize(msg, content=False, copy=False) except Exception: self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) return @@ -515,7 +515,7 @@ class TaskScheduler(SessionFactory): """dispatch method for result replies""" try: idents,msg = self.session.feed_identities(raw_msg, copy=False) - msg = self.session.unpack_message(msg, content=False, copy=False) + msg = self.session.unserialize(msg, content=False, copy=False) engine = idents[0] try: idx = self.targets.index(engine) diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index dd91f8c..04201e9 100755 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -90,7 +90,7 @@ class EngineFactory(RegistrationFactory): loop = self.loop identity = self.bident idents,msg = self.session.feed_identities(msg) - msg = Message(self.session.unpack_message(msg)) + msg = Message(self.session.unserialize(msg)) if msg.content.status == 'ok': self.id = int(msg.content.id) diff --git a/IPython/parallel/engine/kernelstarter.py b/IPython/parallel/engine/kernelstarter.py index 196434d..4aa0238 100644 --- a/IPython/parallel/engine/kernelstarter.py +++ b/IPython/parallel/engine/kernelstarter.py @@ -40,7 +40,7 @@ class KernelStarter(object): def dispatch_request(self, raw_msg): idents, msg = self.session.feed_identities() try: - msg = self.session.unpack_message(msg, content=False) + msg = self.session.unserialize(msg, content=False) except: print ("bad msg: %s"%msg) @@ -54,7 +54,7 @@ class KernelStarter(object): def dispatch_reply(self, raw_msg): idents, msg = self.session.feed_identities() try: - msg = self.session.unpack_message(msg, content=False) + msg = self.session.unserialize(msg, content=False) except: print ("bad msg: %s"%msg) diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py index 579a2fb..8bf6ac3 100755 --- a/IPython/parallel/engine/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -195,7 +195,7 @@ class Kernel(SessionFactory): def dispatch_control(self, msg): idents,msg = self.session.feed_identities(msg, copy=False) try: - msg = self.session.unpack_message(msg, content=True, copy=False) + msg = self.session.unserialize(msg, content=True, copy=False) except: self.log.error("Invalid Message", exc_info=True) return @@ -373,7 +373,7 @@ class Kernel(SessionFactory): self.control_stream.flush() idents,msg = self.session.feed_identities(msg, copy=False) try: - msg = self.session.unpack_message(msg, content=True, copy=False) + msg = self.session.unserialize(msg, content=True, copy=False) except: self.log.error("Invalid Message", exc_info=True) return diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index c504986..5f495b0 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -383,6 +383,10 @@ class Session(Configurable): def serialize(self, msg, ident=None): """Serialize the message components to bytes. + This is roughly the inverse of unserialize. The serialize/unserialize + methods work with full message lists, whereas pack/unpack work with + the individual message parts in the message list. + Parameters ---------- msg : dict or Message @@ -576,7 +580,7 @@ class Session(Configurable): # invalid large messages can cause very expensive string comparisons idents, msg_list = self.feed_identities(msg_list, copy) try: - return idents, self.unpack_message(msg_list, content=content, copy=copy) + return idents, self.unserialize(msg_list, content=content, copy=copy) except Exception as e: print (idents, msg_list) # TODO: handle it @@ -598,10 +602,12 @@ class Session(Configurable): Returns ------- - (idents,msg_list) : two lists - idents will always be a list of bytes - the indentity prefix - msg_list will be a list of bytes or Messages, unchanged from input - msg_list should be unpackable via self.unpack_message at this point. + (idents, msg_list) : two lists + idents will always be a list of bytes, each of which is a ZMQ + identity. msg_list will be a list of bytes or zmq.Messages of the + form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and + should be unpackable/unserializable via self.unserialize at this + point. """ if copy: idx = msg_list.index(DELIM) @@ -617,21 +623,30 @@ class Session(Configurable): idents, msg_list = msg_list[:idx], msg_list[idx+1:] return [m.bytes for m in idents], msg_list - def unpack_message(self, msg_list, content=True, copy=True): - """Return a message object from the format - sent by self.send. - + def unserialize(self, msg_list, content=True, copy=True): + """Unserialize a msg_list to a nested message dict. + + This is roughly the inverse of serialize. The serialize/unserialize + methods work with full message lists, whereas pack/unpack work with + the individual message parts in the message list. + Parameters: ----------- - + msg_list : list of bytes or Message objects + The list of message parts of the form [HMAC,p_header,p_parent, + p_content,buffer1,buffer2,...]. content : bool (True) - whether to unpack the content dict (True), - or leave it serialized (False) - + Whether to unpack the content dict (True), or leave it packed + (False). copy : bool (True) - whether to return the bytes (True), - or the non-copying Message object in each place (False) - + Whether to return the bytes (True), or the non-copying Message + object in each place (False). + + Returns + ------- + msg : dict + The nested message dict with top-level keys [header, parent_header, + content, buffers]. """ minlen = 4 message = {}