##// END OF EJS Templates
cleanup terminal console iopub handling...
MinRK -
Show More
@@ -43,6 +43,8 b' from IPython.terminal.console.completer import ZMQCompleter'
43 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
43 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
44 """A subclass of TerminalInteractiveShell that uses the 0MQ kernel"""
44 """A subclass of TerminalInteractiveShell that uses the 0MQ kernel"""
45 _executing = False
45 _executing = False
46 _execution_state = Unicode('')
47 _current_msg_id = Unicode('')
46
48
47 image_handler = Enum(('PIL', 'stream', 'tempfile', 'callable'),
49 image_handler = Enum(('PIL', 'stream', 'tempfile', 'callable'),
48 config=True, help=
50 config=True, help=
@@ -151,32 +153,44 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
151 # explicitly handle 'exit' command
153 # explicitly handle 'exit' command
152 return self.ask_exit()
154 return self.ask_exit()
153
155
154 self._executing = True
155 # flush stale replies, which could have been ignored, due to missed heartbeats
156 # flush stale replies, which could have been ignored, due to missed heartbeats
156 while self.client.shell_channel.msg_ready():
157 while self.client.shell_channel.msg_ready():
157 self.client.shell_channel.get_msg()
158 self.client.shell_channel.get_msg()
158 # shell_channel.execute takes 'hidden', which is the inverse of store_hist
159 # shell_channel.execute takes 'hidden', which is the inverse of store_hist
159 msg_id = self.client.shell_channel.execute(cell, not store_history)
160 msg_id = self.client.shell_channel.execute(cell, not store_history)
160 while not self.client.shell_channel.msg_ready() and self.client.is_alive():
161 self._current_msg_id = msg_id
162
163 # first thing is wait for any side effects (output, stdin, etc.)
164 self._executing = True
165 self._execution_state = "busy"
166 while self._execution_state != 'idle' and self.client.is_alive():
167 print (self._execution_state)
161 try:
168 try:
162 self.handle_stdin_request(timeout=0.05)
169 self.handle_stdin_request(msg_id, timeout=0.05)
163 except Empty:
170 except Empty:
164 # display intermediate print statements, etc.
171 # display intermediate print statements, etc.
165 self.handle_iopub()
172 self.handle_iopub(msg_id)
173 pass
174
175 # after all of that is done, wait for the execute reply
176 while self.client.is_alive():
177 try:
178 self.handle_execute_reply(msg_id, timeout=0.05)
179 except Empty:
166 pass
180 pass
167 if self.client.shell_channel.msg_ready():
181 else:
168 self.handle_execute_reply(msg_id)
182 break
169 self._executing = False
183 self._executing = False
170
184
171 #-----------------
185 #-----------------
172 # message handlers
186 # message handlers
173 #-----------------
187 #-----------------
174
188
175 def handle_execute_reply(self, msg_id):
189 def handle_execute_reply(self, msg_id, timeout=None):
176 msg = self.client.shell_channel.get_msg()
190 msg = self.client.shell_channel.get_msg(block=False, timeout=timeout)
177 if msg["parent_header"].get("msg_id", None) == msg_id:
191 if msg["parent_header"].get("msg_id", None) == msg_id:
178
192
179 self.handle_iopub()
193 self.handle_iopub(msg_id)
180
194
181 content = msg["content"]
195 content = msg["content"]
182 status = content['status']
196 status = content['status']
@@ -198,26 +212,27 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
198 self.execution_count = int(content["execution_count"] + 1)
212 self.execution_count = int(content["execution_count"] + 1)
199
213
200
214
201 def handle_iopub(self):
215 def handle_iopub(self, msg_id):
202 """ Method to procces subscribe channel's messages
216 """ Method to process subscribe channel's messages
203
217
204 This method reads a message and processes the content in different
218 This method consumes and processes messages on the IOPub channel,
205 outputs like stdout, stderr, pyout and status
219 such as stdout, stderr, pyout and status.
206
220
207 Arguments:
221 It only displays output that is caused by the given msg_id
208 sub_msg: message receive from kernel in the sub socket channel
209 capture by kernel manager.
210 """
222 """
211 while self.client.iopub_channel.msg_ready():
223 while self.client.iopub_channel.msg_ready():
212 sub_msg = self.client.iopub_channel.get_msg()
224 sub_msg = self.client.iopub_channel.get_msg()
213 msg_type = sub_msg['header']['msg_type']
225 msg_type = sub_msg['header']['msg_type']
214 parent = sub_msg["parent_header"]
226 parent = sub_msg["parent_header"]
215 if (not parent) or self.session_id == parent['session']:
227 if (not parent) or msg_id == parent['msg_id']:
216 if msg_type == 'status' :
228 if msg_type == 'status':
217 if sub_msg["content"]["execution_state"] == "busy" :
229 state = self._execution_state = sub_msg["content"]["execution_state"]
218 pass
230 # idle messages mean an individual sequence is complete,
219
231 # so break out of consumption to allow other things to take over.
220 elif msg_type == 'stream' :
232 if state == 'idle':
233 break
234
235 elif msg_type == 'stream':
221 if sub_msg["content"]["name"] == "stdout":
236 if sub_msg["content"]["name"] == "stdout":
222 print(sub_msg["content"]["data"], file=io.stdout, end="")
237 print(sub_msg["content"]["data"], file=io.stdout, end="")
223 io.stdout.flush()
238 io.stdout.flush()
@@ -239,6 +254,7 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
239
254
240 elif msg_type == 'display_data':
255 elif msg_type == 'display_data':
241 self.handle_rich_data(sub_msg["content"]["data"])
256 self.handle_rich_data(sub_msg["content"]["data"])
257
242
258
243 _imagemime = {
259 _imagemime = {
244 'image/png': 'png',
260 'image/png': 'png',
@@ -292,13 +308,13 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
292 def handle_image_callable(self, data, mime):
308 def handle_image_callable(self, data, mime):
293 self.callable_image_handler(data)
309 self.callable_image_handler(data)
294
310
295 def handle_stdin_request(self, timeout=0.1):
311 def handle_stdin_request(self, msg_id, timeout=0.1):
296 """ Method to capture raw_input
312 """ Method to capture raw_input
297 """
313 """
298 msg_rep = self.client.stdin_channel.get_msg(timeout=timeout)
314 msg_rep = self.client.stdin_channel.get_msg(timeout=timeout)
299 # in case any iopub came while we were waiting:
315 # in case any iopub came while we were waiting:
300 self.handle_iopub()
316 self.handle_iopub(msg_id)
301 if self.session_id == msg_rep["parent_header"].get("session"):
317 if msg_id == msg_rep["parent_header"].get("msg_id"):
302 # wrap SIGINT handler
318 # wrap SIGINT handler
303 real_handler = signal.getsignal(signal.SIGINT)
319 real_handler = signal.getsignal(signal.SIGINT)
304 def double_int(sig,frame):
320 def double_int(sig,frame):
General Comments 0
You need to be logged in to leave comments. Login now