diff --git a/IPython/terminal/console/interactiveshell.py b/IPython/terminal/console/interactiveshell.py index 76c8a8a..7d41ba4 100644 --- a/IPython/terminal/console/interactiveshell.py +++ b/IPython/terminal/console/interactiveshell.py @@ -43,6 +43,7 @@ from IPython.terminal.console.completer import ZMQCompleter class ZMQTerminalInteractiveShell(TerminalInteractiveShell): """A subclass of TerminalInteractiveShell that uses the 0MQ kernel""" _executing = False + _execution_state = Unicode('') image_handler = Enum(('PIL', 'stream', 'tempfile', 'callable'), config=True, help= @@ -151,32 +152,42 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): # explicitly handle 'exit' command return self.ask_exit() - self._executing = True # flush stale replies, which could have been ignored, due to missed heartbeats while self.client.shell_channel.msg_ready(): self.client.shell_channel.get_msg() # shell_channel.execute takes 'hidden', which is the inverse of store_hist msg_id = self.client.shell_channel.execute(cell, not store_history) - while not self.client.shell_channel.msg_ready() and self.client.is_alive(): + + # first thing is wait for any side effects (output, stdin, etc.) + self._executing = True + self._execution_state = "busy" + while self._execution_state != 'idle' and self.client.is_alive(): try: - self.handle_stdin_request(timeout=0.05) + self.handle_stdin_request(msg_id, timeout=0.05) except Empty: # display intermediate print statements, etc. - self.handle_iopub() + self.handle_iopub(msg_id) + pass + + # after all of that is done, wait for the execute reply + while self.client.is_alive(): + try: + self.handle_execute_reply(msg_id, timeout=0.05) + except Empty: pass - if self.client.shell_channel.msg_ready(): - self.handle_execute_reply(msg_id) + else: + break self._executing = False #----------------- # message handlers #----------------- - def handle_execute_reply(self, msg_id): - msg = self.client.shell_channel.get_msg() + def handle_execute_reply(self, msg_id, timeout=None): + msg = self.client.shell_channel.get_msg(block=False, timeout=timeout) if msg["parent_header"].get("msg_id", None) == msg_id: - self.handle_iopub() + self.handle_iopub(msg_id) content = msg["content"] status = content['status'] @@ -198,26 +209,27 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): self.execution_count = int(content["execution_count"] + 1) - def handle_iopub(self): - """ Method to procces subscribe channel's messages + def handle_iopub(self, msg_id): + """ Method to process subscribe channel's messages - This method reads a message and processes the content in different - outputs like stdout, stderr, pyout and status - - Arguments: - sub_msg: message receive from kernel in the sub socket channel - capture by kernel manager. + This method consumes and processes messages on the IOPub channel, + such as stdout, stderr, pyout and status. + + It only displays output that is caused by the given msg_id """ while self.client.iopub_channel.msg_ready(): sub_msg = self.client.iopub_channel.get_msg() msg_type = sub_msg['header']['msg_type'] parent = sub_msg["parent_header"] - if (not parent) or self.session_id == parent['session']: - if msg_type == 'status' : - if sub_msg["content"]["execution_state"] == "busy" : - pass - - elif msg_type == 'stream' : + if (not parent) or msg_id == parent['msg_id']: + if msg_type == 'status': + state = self._execution_state = sub_msg["content"]["execution_state"] + # idle messages mean an individual sequence is complete, + # so break out of consumption to allow other things to take over. + if state == 'idle': + break + + elif msg_type == 'stream': if sub_msg["content"]["name"] == "stdout": print(sub_msg["content"]["data"], file=io.stdout, end="") io.stdout.flush() @@ -239,6 +251,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): elif msg_type == 'display_data': self.handle_rich_data(sub_msg["content"]["data"]) + _imagemime = { 'image/png': 'png', @@ -292,13 +305,13 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): def handle_image_callable(self, data, mime): self.callable_image_handler(data) - def handle_stdin_request(self, timeout=0.1): + def handle_stdin_request(self, msg_id, timeout=0.1): """ Method to capture raw_input """ msg_rep = self.client.stdin_channel.get_msg(timeout=timeout) # in case any iopub came while we were waiting: - self.handle_iopub() - if self.session_id == msg_rep["parent_header"].get("session"): + self.handle_iopub(msg_id) + if msg_id == msg_rep["parent_header"].get("msg_id"): # wrap SIGINT handler real_handler = signal.getsignal(signal.SIGINT) def double_int(sig,frame): @@ -373,7 +386,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): # run a non-empty no-op, so that we don't get a prompt until # we know the kernel is ready. This keeps the connection # message above the first prompt. - if not self.wait_for_kernel(3): + if not self.wait_for_kernel(10): error("Kernel did not respond\n") return @@ -394,7 +407,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): if ans: if self.manager: self.manager.restart_kernel(True) - self.wait_for_kernel(3) + self.wait_for_kernel(10) else: self.exit_now = True continue