##// END OF EJS Templates
cleanup terminal console iopub handling...
MinRK -
Show More
@@ -43,6 +43,8 from IPython.terminal.console.completer import ZMQCompleter
43 43 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
44 44 """A subclass of TerminalInteractiveShell that uses the 0MQ kernel"""
45 45 _executing = False
46 _execution_state = Unicode('')
47 _current_msg_id = Unicode('')
46 48
47 49 image_handler = Enum(('PIL', 'stream', 'tempfile', 'callable'),
48 50 config=True, help=
@@ -151,32 +153,44 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
151 153 # explicitly handle 'exit' command
152 154 return self.ask_exit()
153 155
154 self._executing = True
155 156 # flush stale replies, which could have been ignored, due to missed heartbeats
156 157 while self.client.shell_channel.msg_ready():
157 158 self.client.shell_channel.get_msg()
158 159 # shell_channel.execute takes 'hidden', which is the inverse of store_hist
159 160 msg_id = self.client.shell_channel.execute(cell, not store_history)
160 while not self.client.shell_channel.msg_ready() and self.client.is_alive():
161 self._current_msg_id = msg_id
162
163 # first thing is wait for any side effects (output, stdin, etc.)
164 self._executing = True
165 self._execution_state = "busy"
166 while self._execution_state != 'idle' and self.client.is_alive():
167 print (self._execution_state)
161 168 try:
162 self.handle_stdin_request(timeout=0.05)
169 self.handle_stdin_request(msg_id, timeout=0.05)
163 170 except Empty:
164 171 # display intermediate print statements, etc.
165 self.handle_iopub()
172 self.handle_iopub(msg_id)
173 pass
174
175 # after all of that is done, wait for the execute reply
176 while self.client.is_alive():
177 try:
178 self.handle_execute_reply(msg_id, timeout=0.05)
179 except Empty:
166 180 pass
167 if self.client.shell_channel.msg_ready():
168 self.handle_execute_reply(msg_id)
181 else:
182 break
169 183 self._executing = False
170 184
171 185 #-----------------
172 186 # message handlers
173 187 #-----------------
174 188
175 def handle_execute_reply(self, msg_id):
176 msg = self.client.shell_channel.get_msg()
189 def handle_execute_reply(self, msg_id, timeout=None):
190 msg = self.client.shell_channel.get_msg(block=False, timeout=timeout)
177 191 if msg["parent_header"].get("msg_id", None) == msg_id:
178 192
179 self.handle_iopub()
193 self.handle_iopub(msg_id)
180 194
181 195 content = msg["content"]
182 196 status = content['status']
@@ -198,26 +212,27 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
198 212 self.execution_count = int(content["execution_count"] + 1)
199 213
200 214
201 def handle_iopub(self):
202 """ Method to procces subscribe channel's messages
215 def handle_iopub(self, msg_id):
216 """ Method to process subscribe channel's messages
203 217
204 This method reads a message and processes the content in different
205 outputs like stdout, stderr, pyout and status
206
207 Arguments:
208 sub_msg: message receive from kernel in the sub socket channel
209 capture by kernel manager.
218 This method consumes and processes messages on the IOPub channel,
219 such as stdout, stderr, pyout and status.
220
221 It only displays output that is caused by the given msg_id
210 222 """
211 223 while self.client.iopub_channel.msg_ready():
212 224 sub_msg = self.client.iopub_channel.get_msg()
213 225 msg_type = sub_msg['header']['msg_type']
214 226 parent = sub_msg["parent_header"]
215 if (not parent) or self.session_id == parent['session']:
216 if msg_type == 'status' :
217 if sub_msg["content"]["execution_state"] == "busy" :
218 pass
219
220 elif msg_type == 'stream' :
227 if (not parent) or msg_id == parent['msg_id']:
228 if msg_type == 'status':
229 state = self._execution_state = sub_msg["content"]["execution_state"]
230 # idle messages mean an individual sequence is complete,
231 # so break out of consumption to allow other things to take over.
232 if state == 'idle':
233 break
234
235 elif msg_type == 'stream':
221 236 if sub_msg["content"]["name"] == "stdout":
222 237 print(sub_msg["content"]["data"], file=io.stdout, end="")
223 238 io.stdout.flush()
@@ -239,6 +254,7 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
239 254
240 255 elif msg_type == 'display_data':
241 256 self.handle_rich_data(sub_msg["content"]["data"])
257
242 258
243 259 _imagemime = {
244 260 'image/png': 'png',
@@ -292,13 +308,13 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
292 308 def handle_image_callable(self, data, mime):
293 309 self.callable_image_handler(data)
294 310
295 def handle_stdin_request(self, timeout=0.1):
311 def handle_stdin_request(self, msg_id, timeout=0.1):
296 312 """ Method to capture raw_input
297 313 """
298 314 msg_rep = self.client.stdin_channel.get_msg(timeout=timeout)
299 315 # in case any iopub came while we were waiting:
300 self.handle_iopub()
301 if self.session_id == msg_rep["parent_header"].get("session"):
316 self.handle_iopub(msg_id)
317 if msg_id == msg_rep["parent_header"].get("msg_id"):
302 318 # wrap SIGINT handler
303 319 real_handler = signal.getsignal(signal.SIGINT)
304 320 def double_int(sig,frame):
General Comments 0
You need to be logged in to leave comments. Login now