From 1bc3aac5a9d9f4af66c7d84b4e0e9c2916db80d7 2011-07-21 03:42:30 From: Brian E. Granger Date: 2011-07-21 03:42:30 Subject: [PATCH] Fixing code to assume msg_type and msg_id are top-level. * I have gone through and looked for instances of ['msg_type'] and ['msg_id'] and tried to make sure that I added ['header'] so pull the values out of the header. * But there are many cases where I can't tell if the dict is the full message or the header already. This is especially true of the msg_id in the parallel db parts of the code. * Tests pass, but this is scary. --- diff --git a/IPython/frontend/qt/base_frontend_mixin.py b/IPython/frontend/qt/base_frontend_mixin.py index 02080b7..c4189d3 100644 --- a/IPython/frontend/qt/base_frontend_mixin.py +++ b/IPython/frontend/qt/base_frontend_mixin.py @@ -96,7 +96,7 @@ class BaseFrontendMixin(object): """ Calls the frontend handler associated with the message type of the given message. """ - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] handler = getattr(self, '_handle_' + msg_type, None) if handler: handler(msg) diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index c6a5dde..f243114 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -66,7 +66,7 @@ class QtShellSocketChannel(SocketChannelQObject, ShellSocketChannel): self.message_received.emit(msg) # Emit signals for specialized message types. - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] signal = getattr(self, msg_type, None) if signal: signal.emit(msg) @@ -122,7 +122,7 @@ class QtSubSocketChannel(SocketChannelQObject, SubSocketChannel): # Emit the generic signal. self.message_received.emit(msg) # Emit signals for specialized message types. - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] signal = getattr(self, msg_type + '_received', None) if signal: signal.emit(msg) @@ -155,7 +155,7 @@ class QtStdInSocketChannel(SocketChannelQObject, StdInSocketChannel): self.message_received.emit(msg) # Emit signals for specialized message types. - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] if msg_type == 'input_request': self.input_requested.emit(msg) diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 60cbd67..3d3975d 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -670,7 +670,7 @@ class Client(HasTraits): while msg is not None: if self.debug: pprint(msg) - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] handler = self._notification_handlers.get(msg_type, None) if handler is None: raise Exception("Unhandled message type: %s"%msg.msg_type) @@ -684,7 +684,7 @@ class Client(HasTraits): while msg is not None: if self.debug: pprint(msg) - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] handler = self._queue_handlers.get(msg_type, None) if handler is None: raise Exception("Unhandled message type: %s"%msg.msg_type) @@ -729,7 +729,7 @@ class Client(HasTraits): msg_id = parent['msg_id'] content = msg['content'] header = msg['header'] - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] # init metadata: md = self.metadata[msg_id] @@ -994,7 +994,7 @@ class Client(HasTraits): msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, subheader=subheader, track=track) - msg_id = msg['msg_id'] + msg_id = msg['header']['msg_id'] self.outstanding.add(msg_id) if ident: # possibly routed to a specific engine diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py index 12ed96d..1b1eae6 100644 --- a/IPython/parallel/client/view.py +++ b/IPython/parallel/client/view.py @@ -523,7 +523,7 @@ class DirectView(View): ident=ident) if track: trackers.append(msg['tracker']) - msg_ids.append(msg['msg_id']) + msg_ids.append(msg['header']['msg_id']) tracker = None if track is False else zmq.MessageTracker(*trackers) ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker) if block: @@ -980,7 +980,7 @@ class LoadBalancedView(View): subheader=subheader) tracker = None if track is False else msg['tracker'] - ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker) + ar = AsyncResult(self.client, msg['header']['msg_id'], fname=f.__name__, targets=None, tracker=tracker) if block: try: diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 5a66178..a82acc9 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -494,7 +494,7 @@ class Hub(SessionFactory): return # print client_id, header, parent, content #switch on message type: - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] self.log.info("client::client %r requested %r"%(client_id, msg_type)) handler = self.query_handlers.get(msg_type, None) try: @@ -791,7 +791,7 @@ class Hub(SessionFactory): self.log.error("iopub::invalid IOPub message: %r"%msg) return msg_id = parent['msg_id'] - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] content = msg['content'] # ensure msg_id is in db diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 747d5b6..db4bb74 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -216,7 +216,7 @@ class TaskScheduler(SessionFactory): self.log.warn("task::Unauthorized message from: %r"%idents) return - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] handler = self._notification_handlers.get(msg_type, None) if handler is None: diff --git a/IPython/parallel/engine/kernelstarter.py b/IPython/parallel/engine/kernelstarter.py index fe92d1d..196434d 100644 --- a/IPython/parallel/engine/kernelstarter.py +++ b/IPython/parallel/engine/kernelstarter.py @@ -44,7 +44,7 @@ class KernelStarter(object): except: print ("bad msg: %s"%msg) - msgtype = msg['msg_type'] + msgtype = msg['header']['msg_type'] handler = self.handlers.get(msgtype, None) if handler is None: self.downstream.send_multipart(raw_msg, copy=False) @@ -58,7 +58,7 @@ class KernelStarter(object): except: print ("bad msg: %s"%msg) - msgtype = msg['msg_type'] + msgtype = msg['header']['msg_type'] handler = self.handlers.get(msgtype, None) if handler is None: self.upstream.send_multipart(raw_msg, copy=False) @@ -227,4 +227,4 @@ def make_starter(up_addr, down_addr, *args, **kwargs): starter = KernelStarter(session, upstream, downstream, *args, **kwargs) starter.start() loop.start() - \ No newline at end of file + diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py index 5e6203b..579a2fb 100755 --- a/IPython/parallel/engine/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -150,7 +150,7 @@ class Kernel(SessionFactory): self.log.info("Aborting:") self.log.info(str(msg)) - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) # self.reply_socket.send(ident,zmq.SNDMORE) @@ -205,9 +205,9 @@ class Kernel(SessionFactory): header = msg['header'] msg_id = header['msg_id'] - handler = self.control_handlers.get(msg['msg_type'], None) + handler = self.control_handlers.get(msg['header']['msg_type'], None) if handler is None: - self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) + self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['header']['msg_type']) else: handler(self.control_stream, idents, msg) @@ -386,14 +386,14 @@ class Kernel(SessionFactory): if self.check_aborted(msg_id): self.aborted.remove(msg_id) # is it safe to assume a msg_id will not be resubmitted? - reply_type = msg['msg_type'].split('_')[0] + '_reply' + reply_type = msg['header']['msg_type'].split('_')[0] + '_reply' status = {'status' : 'aborted'} reply_msg = self.session.send(stream, reply_type, subheader=status, content=status, parent=msg, ident=idents) return - handler = self.shell_handlers.get(msg['msg_type'], None) + handler = self.shell_handlers.get(msg['header']['msg_type'], None) if handler is None: - self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) + self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['header']['msg_type']) else: handler(stream, idents, msg) diff --git a/IPython/parallel/tests/test_db.py b/IPython/parallel/tests/test_db.py index fddb961..f9c08d5 100644 --- a/IPython/parallel/tests/test_db.py +++ b/IPython/parallel/tests/test_db.py @@ -56,8 +56,8 @@ class TestDictBackend(TestCase): msg = self.session.msg('apply_request', content=dict(a=5)) msg['buffers'] = [] rec = init_record(msg) - msg_ids.append(msg['msg_id']) - self.db.add_record(msg['msg_id'], rec) + msg_ids.append(msg['header']['msg_id']) + self.db.add_record(msg['header']['msg_id'], rec) return msg_ids def test_add_record(self): diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 3b47f6a..fb58d29 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -133,11 +133,11 @@ class Kernel(Configurable): # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each # handler prints its message at the end. - self.log.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***') + self.log.debug('\n*** MESSAGE TYPE:'+str(msg['header']['msg_type'])+'***') self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ') # Find and call actual handler for message - handler = self.handlers.get(msg['msg_type'], None) + handler = self.handlers.get(msg['header']['msg_type'], None) if handler is None: self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg)) else: @@ -375,7 +375,7 @@ class Kernel(Configurable): "Unexpected missing message part." self.log.debug("Aborting:\n"+str(Message(msg))) - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' reply_msg = self.session.send(self.shell_socket, reply_type, {'status' : 'aborted'}, msg, ident=ident) diff --git a/IPython/zmq/pykernel.py b/IPython/zmq/pykernel.py index fb62629..42835da 100755 --- a/IPython/zmq/pykernel.py +++ b/IPython/zmq/pykernel.py @@ -190,7 +190,7 @@ class Kernel(HasTraits): else: assert ident is not None, "Missing message part." self.log.debug("Aborting: %s"%Message(msg)) - msg_type = msg['msg_type'] + msg_type = msg['header']['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident) self.log.debug(Message(reply_msg)) diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index 7d2ebc9..c504986 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -359,9 +359,7 @@ class Session(Configurable): """ msg = {} msg['header'] = self.msg_header(msg_type) - msg['msg_id'] = msg['header']['msg_id'] msg['parent_header'] = {} if parent is None else extract_header(parent) - msg['msg_type'] = msg_type msg['content'] = {} if content is None else content sub = {} if subheader is None else subheader msg['header'].update(sub) @@ -651,7 +649,6 @@ class Session(Configurable): if not len(msg_list) >= minlen: raise TypeError("malformed message, must have at least %i elements"%minlen) message['header'] = self.unpack(msg_list[1]) - message['msg_type'] = message['header']['msg_type'] message['parent_header'] = self.unpack(msg_list[2]) if content: message['content'] = self.unpack(msg_list[3]) diff --git a/IPython/zmq/tests/test_session.py b/IPython/zmq/tests/test_session.py index 6279acc..86badca 100644 --- a/IPython/zmq/tests/test_session.py +++ b/IPython/zmq/tests/test_session.py @@ -37,10 +37,8 @@ class TestSession(SessionTestCase): self.assertTrue(isinstance(msg['content'],dict)) self.assertTrue(isinstance(msg['header'],dict)) self.assertTrue(isinstance(msg['parent_header'],dict)) - self.assertEquals(msg['msg_type'], 'execute') + self.assertEquals(msg['header']['msg_type'], 'execute') - - def test_args(self): """initialization arguments for Session""" s = self.session