From f1c52deb97169ac533506eb5ec3b9b6469e20414 2013-07-25 03:03:06
From: MinRK <benjaminrk@gmail.com>
Date: 2013-07-25 03:03:06
Subject: [PATCH] cleanup terminal console iopub handling

ensures IOPub for a given cell is handled before drawing the next prompt.

Follows logic elsewhere using `status=idle` to signal end of output for a given cell.
Prior to this, early execute reply could result in some output being printed after the prompt for the next input.
---

diff --git a/IPython/terminal/console/interactiveshell.py b/IPython/terminal/console/interactiveshell.py
index 1591c7c..080eff7 100644
--- a/IPython/terminal/console/interactiveshell.py
+++ b/IPython/terminal/console/interactiveshell.py
@@ -43,6 +43,8 @@ from IPython.terminal.console.completer import ZMQCompleter
 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
     """A subclass of TerminalInteractiveShell that uses the 0MQ kernel"""
     _executing = False
+    _execution_state = Unicode('')
+    _current_msg_id = Unicode('')
 
     image_handler = Enum(('PIL', 'stream', 'tempfile', 'callable'),
                          config=True, help=
@@ -151,32 +153,44 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
             # explicitly handle 'exit' command
             return self.ask_exit()
 
-        self._executing = True
         # flush stale replies, which could have been ignored, due to missed heartbeats
         while self.client.shell_channel.msg_ready():
             self.client.shell_channel.get_msg()
         # shell_channel.execute takes 'hidden', which is the inverse of store_hist
         msg_id = self.client.shell_channel.execute(cell, not store_history)
-        while not self.client.shell_channel.msg_ready() and self.client.is_alive():
+        self._current_msg_id = msg_id
+        
+        # first thing is wait for any side effects (output, stdin, etc.)
+        self._executing = True
+        self._execution_state = "busy"
+        while self._execution_state != 'idle' and self.client.is_alive():
+            print (self._execution_state)
             try:
-                self.handle_stdin_request(timeout=0.05)
+                self.handle_stdin_request(msg_id, timeout=0.05)
             except Empty:
                 # display intermediate print statements, etc.
-                self.handle_iopub()
+                self.handle_iopub(msg_id)
+                pass
+        
+        # after all of that is done, wait for the execute reply
+        while self.client.is_alive():
+            try:
+                self.handle_execute_reply(msg_id, timeout=0.05)
+            except Empty:
                 pass
-        if self.client.shell_channel.msg_ready():
-            self.handle_execute_reply(msg_id)
+            else:
+                break
         self._executing = False
 
     #-----------------
     # message handlers
     #-----------------
 
-    def handle_execute_reply(self, msg_id):
-        msg = self.client.shell_channel.get_msg()
+    def handle_execute_reply(self, msg_id, timeout=None):
+        msg = self.client.shell_channel.get_msg(block=False, timeout=timeout)
         if msg["parent_header"].get("msg_id", None) == msg_id:
             
-            self.handle_iopub()
+            self.handle_iopub(msg_id)
             
             content = msg["content"]
             status = content['status']
@@ -198,26 +212,27 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
             self.execution_count = int(content["execution_count"] + 1)
 
 
-    def handle_iopub(self):
-        """ Method to procces subscribe channel's messages
+    def handle_iopub(self, msg_id):
+        """ Method to process subscribe channel's messages
 
-           This method reads a message and processes the content in different
-           outputs like stdout, stderr, pyout and status
-
-           Arguments:
-           sub_msg:  message receive from kernel in the sub socket channel
-                     capture by kernel manager.
+           This method consumes and processes messages on the IOPub channel,
+           such as stdout, stderr, pyout and status.
+           
+           It only displays output that is caused by the given msg_id
         """
         while self.client.iopub_channel.msg_ready():
             sub_msg = self.client.iopub_channel.get_msg()
             msg_type = sub_msg['header']['msg_type']
             parent = sub_msg["parent_header"]
-            if (not parent) or self.session_id == parent['session']:
-                if msg_type == 'status' :
-                    if sub_msg["content"]["execution_state"] == "busy" :
-                        pass
-
-                elif msg_type == 'stream' :
+            if (not parent) or msg_id == parent['msg_id']:
+                if msg_type == 'status':
+                    state = self._execution_state = sub_msg["content"]["execution_state"]
+                    # idle messages mean an individual sequence is complete,
+                    # so break out of consumption to allow other things to take over.
+                    if state == 'idle':
+                        break
+
+                elif msg_type == 'stream':
                     if sub_msg["content"]["name"] == "stdout":
                         print(sub_msg["content"]["data"], file=io.stdout, end="")
                         io.stdout.flush()
@@ -239,6 +254,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
 
                 elif msg_type == 'display_data':
                     self.handle_rich_data(sub_msg["content"]["data"])
+                    
 
     _imagemime = {
         'image/png': 'png',
@@ -292,13 +308,13 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
     def handle_image_callable(self, data, mime):
         self.callable_image_handler(data)
 
-    def handle_stdin_request(self, timeout=0.1):
+    def handle_stdin_request(self, msg_id, timeout=0.1):
         """ Method to capture raw_input
         """
         msg_rep = self.client.stdin_channel.get_msg(timeout=timeout)
         # in case any iopub came while we were waiting:
-        self.handle_iopub()
-        if self.session_id == msg_rep["parent_header"].get("session"):
+        self.handle_iopub(msg_id)
+        if msg_id == msg_rep["parent_header"].get("msg_id"):
             # wrap SIGINT handler
             real_handler = signal.getsignal(signal.SIGINT)
             def double_int(sig,frame):