##// END OF EJS Templates
Add missing flush of output streams on execute
Fernando Perez -
Show More
@@ -1,77 +1,80 b''
1 1 import sys
2 2 import time
3 3 from cStringIO import StringIO
4 4
5 5 from session import extract_header, Message
6 6
7 from IPython.utils import io
8
7 9 #-----------------------------------------------------------------------------
8 10 # Stream classes
9 11 #-----------------------------------------------------------------------------
10 12
11 13 class OutStream(object):
12 14 """A file like object that publishes the stream to a 0MQ PUB socket."""
13 15
14 16 # The time interval between automatic flushes, in seconds.
15 17 flush_interval = 0.05
16 18
17 19 def __init__(self, session, pub_socket, name):
18 20 self.session = session
19 21 self.pub_socket = pub_socket
20 22 self.name = name
21 23 self.parent_header = {}
22 24 self._new_buffer()
23 25
24 26 def set_parent(self, parent):
25 27 self.parent_header = extract_header(parent)
26 28
27 29 def close(self):
28 30 self.pub_socket = None
29 31
30 32 def flush(self):
33 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
31 34 if self.pub_socket is None:
32 35 raise ValueError(u'I/O operation on closed file')
33 36 else:
34 37 data = self._buffer.getvalue()
35 38 if data:
36 39 content = {u'name':self.name, u'data':data}
37 40 msg = self.session.msg(u'stream', content=content,
38 41 parent=self.parent_header)
39 print>>sys.__stdout__, Message(msg)
42 io.raw_print(msg)
40 43 self.pub_socket.send_json(msg)
41 44
42 45 self._buffer.close()
43 46 self._new_buffer()
44 47
45 48 def isatty(self):
46 49 return False
47 50
48 51 def next(self):
49 52 raise IOError('Read not supported on a write only stream.')
50 53
51 54 def read(self, size=-1):
52 55 raise IOError('Read not supported on a write only stream.')
53 56
54 57 def readline(self, size=-1):
55 58 raise IOError('Read not supported on a write only stream.')
56 59
57 60 def write(self, string):
58 61 if self.pub_socket is None:
59 62 raise ValueError('I/O operation on closed file')
60 63 else:
61 64 self._buffer.write(string)
62 65 current_time = time.time()
63 66 if self._start <= 0:
64 67 self._start = current_time
65 68 elif current_time - self._start > self.flush_interval:
66 69 self.flush()
67 70
68 71 def writelines(self, sequence):
69 72 if self.pub_socket is None:
70 73 raise ValueError('I/O operation on closed file')
71 74 else:
72 75 for string in sequence:
73 76 self.write(string)
74 77
75 78 def _new_buffer(self):
76 79 self._buffer = StringIO()
77 self._start = -1 No newline at end of file
80 self._start = -1
@@ -1,487 +1,496 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 import sys
21 21 import time
22 22 import traceback
23 23
24 24 # System library imports.
25 25 import zmq
26 26
27 27 # Local imports.
28 28 from IPython.config.configurable import Configurable
29 29 from IPython.utils import io
30 30 from IPython.lib import pylabtools
31 31 from IPython.utils.traitlets import Instance
32 32 from entry_point import base_launch_kernel, make_argument_parser, make_kernel, \
33 33 start_kernel
34 34 from iostream import OutStream
35 35 from session import Session, Message
36 36 from zmqshell import ZMQInteractiveShell
37 37
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Main kernel class
41 41 #-----------------------------------------------------------------------------
42 42
43 43 class Kernel(Configurable):
44 44
45 45 #---------------------------------------------------------------------------
46 46 # Kernel interface
47 47 #---------------------------------------------------------------------------
48 48
49 49 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
50 50 session = Instance(Session)
51 51 reply_socket = Instance('zmq.Socket')
52 52 pub_socket = Instance('zmq.Socket')
53 53 req_socket = Instance('zmq.Socket')
54 54
55 55 def __init__(self, **kwargs):
56 56 super(Kernel, self).__init__(**kwargs)
57 57
58 58 # Initialize the InteractiveShell subclass
59 59 self.shell = ZMQInteractiveShell.instance()
60 60 self.shell.displayhook.session = self.session
61 61 self.shell.displayhook.pub_socket = self.pub_socket
62 62
63 63 # TMP - hack while developing
64 64 self.shell._reply_content = None
65 65
66 66 # Build dict of handlers for message types
67 67 msg_types = [ 'execute_request', 'complete_request',
68 68 'object_info_request', 'history_request' ]
69 69 self.handlers = {}
70 70 for msg_type in msg_types:
71 71 self.handlers[msg_type] = getattr(self, msg_type)
72 72
73 73 def do_one_iteration(self):
74 74 try:
75 75 ident = self.reply_socket.recv(zmq.NOBLOCK)
76 76 except zmq.ZMQError, e:
77 77 if e.errno == zmq.EAGAIN:
78 78 return
79 79 else:
80 80 raise
81 81 # FIXME: Bug in pyzmq/zmq?
82 82 # assert self.reply_socket.rcvmore(), "Missing message part."
83 83 msg = self.reply_socket.recv_json()
84 84
85 85 # Print some info about this message and leave a '--->' marker, so it's
86 86 # easier to trace visually the message chain when debugging. Each
87 87 # handler prints its message at the end.
88 88 # Eventually we'll move these from stdout to a logger.
89 89 io.raw_print('\n*** MESSAGE TYPE:', msg['msg_type'], '***')
90 90 io.raw_print(' Content: ', msg['content'],
91 91 '\n --->\n ', sep='', end='')
92 92
93 93 # Find and call actual handler for message
94 94 handler = self.handlers.get(msg['msg_type'], None)
95 95 if handler is None:
96 96 io.raw_print_err("UNKNOWN MESSAGE TYPE:", msg)
97 97 else:
98 98 handler(ident, msg)
99 99
100 100 def start(self):
101 101 """ Start the kernel main loop.
102 102 """
103 103 while True:
104 104 time.sleep(0.05)
105 105 self.do_one_iteration()
106 106
107 107 #---------------------------------------------------------------------------
108 108 # Kernel request handlers
109 109 #---------------------------------------------------------------------------
110 110
111 111 def _publish_pyin(self, code, parent):
112 112 """Publish the code request on the pyin stream."""
113 113
114 114 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
115 115 self.pub_socket.send_json(pyin_msg)
116 116
117 117 def execute_request(self, ident, parent):
118 118 try:
119 119 content = parent[u'content']
120 120 code = content[u'code']
121 121 silent = content[u'silent']
122 122 except:
123 123 io.raw_print_err("Got bad msg: ")
124 124 io.raw_print_err(Message(parent))
125 125 return
126 126
127 127 shell = self.shell # we'll need this a lot here
128 128
129 129 # Replace raw_input. Note that is not sufficient to replace
130 130 # raw_input in the user namespace.
131 131 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
132 132 __builtin__.raw_input = raw_input
133 133
134 134 # Set the parent message of the display hook and out streams.
135 135 shell.displayhook.set_parent(parent)
136 136 sys.stdout.set_parent(parent)
137 137 sys.stderr.set_parent(parent)
138 138
139 139 # Re-broadcast our input for the benefit of listening clients, and
140 140 # start computing output
141 141 if not silent:
142 142 self._publish_pyin(code, parent)
143 143
144 144 reply_content = {}
145 145 try:
146 146 if silent:
147 147 # runcode uses 'exec' mode, so no displayhook will fire, and it
148 148 # doesn't call logging or history manipulations. Print
149 149 # statements in that code will obviously still execute.
150 150 shell.runcode(code)
151 151 else:
152 152 # FIXME: runlines calls the exception handler itself.
153 153 shell._reply_content = None
154 154 shell.runlines(code)
155 155 except:
156 156 status = u'error'
157 157 # FIXME: this code right now isn't being used yet by default,
158 158 # because the runlines() call above directly fires off exception
159 159 # reporting. This code, therefore, is only active in the scenario
160 160 # where runlines itself has an unhandled exception. We need to
161 161 # uniformize this, for all exception construction to come from a
162 162 # single location in the codbase.
163 163 etype, evalue, tb = sys.exc_info()
164 164 tb_list = traceback.format_exception(etype, evalue, tb)
165 165 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
166 166 else:
167 167 status = u'ok'
168 168 reply_content[u'payload'] = shell.payload_manager.read_payload()
169 169 # Be agressive about clearing the payload because we don't want
170 170 # it to sit in memory until the next execute_request comes in.
171 171 shell.payload_manager.clear_payload()
172 172
173 173 reply_content[u'status'] = status
174 174 # Compute the execution counter so clients can display prompts
175 175 reply_content['execution_count'] = shell.displayhook.prompt_count
176 176
177 177 # FIXME - fish exception info out of shell, possibly left there by
178 178 # runlines. We'll need to clean up this logic later.
179 179 if shell._reply_content is not None:
180 180 reply_content.update(shell._reply_content)
181 181
182 182 # At this point, we can tell whether the main code execution succeeded
183 183 # or not. If it did, we proceed to evaluate user_variables/expressions
184 184 if reply_content['status'] == 'ok':
185 185 reply_content[u'user_variables'] = \
186 186 shell.get_user_variables(content[u'user_variables'])
187 187 reply_content[u'user_expressions'] = \
188 188 shell.eval_expressions(content[u'user_expressions'])
189 189 else:
190 190 # If there was an error, don't even try to compute variables or
191 191 # expressions
192 192 reply_content[u'user_variables'] = {}
193 193 reply_content[u'user_expressions'] = {}
194 194
195 195 # Send the reply.
196 196 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
197 197 io.raw_print(reply_msg)
198
199 # Flush output before sending the reply.
200 sys.stdout.flush()
201 sys.stderr.flush()
202 # FIXME: on rare occasions, the flush doesn't seem to make it to the
203 # clients... This seems to mitigate the problem, but we definitely need
204 # to better understand what's going on.
205 time.sleep(0.05)
206
198 207 self.reply_socket.send(ident, zmq.SNDMORE)
199 208 self.reply_socket.send_json(reply_msg)
200 209 if reply_msg['content']['status'] == u'error':
201 210 self._abort_queue()
202 211
203 212 def complete_request(self, ident, parent):
204 213 txt, matches = self._complete(parent)
205 214 matches = {'matches' : matches,
206 215 'matched_text' : txt,
207 216 'status' : 'ok'}
208 217 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
209 218 matches, parent, ident)
210 219 io.raw_print(completion_msg)
211 220
212 221 def object_info_request(self, ident, parent):
213 222 ##context = parent['content']['oname'].split('.')
214 223 ##object_info = self._object_info(context)
215 224 object_info = self.shell.object_inspect(parent['content']['oname'])
216 225 msg = self.session.send(self.reply_socket, 'object_info_reply',
217 226 object_info._asdict(), parent, ident)
218 227 io.raw_print(msg)
219 228
220 229 def history_request(self, ident, parent):
221 230 output = parent['content']['output']
222 231 index = parent['content']['index']
223 232 raw = parent['content']['raw']
224 233 hist = self.shell.get_history(index=index, raw=raw, output=output)
225 234 content = {'history' : hist}
226 235 msg = self.session.send(self.reply_socket, 'history_reply',
227 236 content, parent, ident)
228 237 io.raw_print(msg)
229 238
230 239 #---------------------------------------------------------------------------
231 240 # Protected interface
232 241 #---------------------------------------------------------------------------
233 242
234 243 def _abort_queue(self):
235 244 while True:
236 245 try:
237 246 ident = self.reply_socket.recv(zmq.NOBLOCK)
238 247 except zmq.ZMQError, e:
239 248 if e.errno == zmq.EAGAIN:
240 249 break
241 250 else:
242 251 assert self.reply_socket.rcvmore(), \
243 252 "Unexpected missing message part."
244 253 msg = self.reply_socket.recv_json()
245 254 io.raw_print("Aborting:\n", Message(msg))
246 255 msg_type = msg['msg_type']
247 256 reply_type = msg_type.split('_')[0] + '_reply'
248 257 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
249 258 io.raw_print(reply_msg)
250 259 self.reply_socket.send(ident,zmq.SNDMORE)
251 260 self.reply_socket.send_json(reply_msg)
252 261 # We need to wait a bit for requests to come in. This can probably
253 262 # be set shorter for true asynchronous clients.
254 263 time.sleep(0.1)
255 264
256 265 def _raw_input(self, prompt, ident, parent):
257 266 # Flush output before making the request.
258 267 sys.stderr.flush()
259 268 sys.stdout.flush()
260 269
261 270 # Send the input request.
262 271 content = dict(prompt=prompt)
263 272 msg = self.session.msg(u'input_request', content, parent)
264 273 self.req_socket.send_json(msg)
265 274
266 275 # Await a response.
267 276 reply = self.req_socket.recv_json()
268 277 try:
269 278 value = reply['content']['value']
270 279 except:
271 280 io.raw_print_err("Got bad raw_input reply: ")
272 281 io.raw_print_err(Message(parent))
273 282 value = ''
274 283 return value
275 284
276 285 def _complete(self, msg):
277 286 c = msg['content']
278 287 try:
279 288 cpos = int(c['cursor_pos'])
280 289 except:
281 290 # If we don't get something that we can convert to an integer, at
282 291 # least attempt the completion guessing the cursor is at the end of
283 292 # the text, if there's any, and otherwise of the line
284 293 cpos = len(c['text'])
285 294 if cpos==0:
286 295 cpos = len(c['line'])
287 296 return self.shell.complete(c['text'], c['line'], cpos)
288 297
289 298 def _object_info(self, context):
290 299 symbol, leftover = self._symbol_from_context(context)
291 300 if symbol is not None and not leftover:
292 301 doc = getattr(symbol, '__doc__', '')
293 302 else:
294 303 doc = ''
295 304 object_info = dict(docstring = doc)
296 305 return object_info
297 306
298 307 def _symbol_from_context(self, context):
299 308 if not context:
300 309 return None, context
301 310
302 311 base_symbol_string = context[0]
303 312 symbol = self.shell.user_ns.get(base_symbol_string, None)
304 313 if symbol is None:
305 314 symbol = __builtin__.__dict__.get(base_symbol_string, None)
306 315 if symbol is None:
307 316 return None, context
308 317
309 318 context = context[1:]
310 319 for i, name in enumerate(context):
311 320 new_symbol = getattr(symbol, name, None)
312 321 if new_symbol is None:
313 322 return symbol, context[i:]
314 323 else:
315 324 symbol = new_symbol
316 325
317 326 return symbol, []
318 327
319 328
320 329 class QtKernel(Kernel):
321 330 """A Kernel subclass with Qt support."""
322 331
323 332 def start(self):
324 333 """Start a kernel with QtPy4 event loop integration."""
325 334
326 335 from PyQt4 import QtGui, QtCore
327 336 from IPython.lib.guisupport import (
328 337 get_app_qt4, start_event_loop_qt4
329 338 )
330 339 self.app = get_app_qt4([" "])
331 340 self.app.setQuitOnLastWindowClosed(False)
332 341 self.timer = QtCore.QTimer()
333 342 self.timer.timeout.connect(self.do_one_iteration)
334 343 self.timer.start(50)
335 344 start_event_loop_qt4(self.app)
336 345
337 346
338 347 class WxKernel(Kernel):
339 348 """A Kernel subclass with Wx support."""
340 349
341 350 def start(self):
342 351 """Start a kernel with wx event loop support."""
343 352
344 353 import wx
345 354 from IPython.lib.guisupport import start_event_loop_wx
346 355 doi = self.do_one_iteration
347 356
348 357 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
349 358 # We make the Frame hidden when we create it in the main app below.
350 359 class TimerFrame(wx.Frame):
351 360 def __init__(self, func):
352 361 wx.Frame.__init__(self, None, -1)
353 362 self.timer = wx.Timer(self)
354 363 self.timer.Start(50)
355 364 self.Bind(wx.EVT_TIMER, self.on_timer)
356 365 self.func = func
357 366 def on_timer(self, event):
358 367 self.func()
359 368
360 369 # We need a custom wx.App to create our Frame subclass that has the
361 370 # wx.Timer to drive the ZMQ event loop.
362 371 class IPWxApp(wx.App):
363 372 def OnInit(self):
364 373 self.frame = TimerFrame(doi)
365 374 self.frame.Show(False)
366 375 return True
367 376
368 377 # The redirect=False here makes sure that wx doesn't replace
369 378 # sys.stdout/stderr with its own classes.
370 379 self.app = IPWxApp(redirect=False)
371 380 start_event_loop_wx(self.app)
372 381
373 382
374 383 class TkKernel(Kernel):
375 384 """A Kernel subclass with Tk support."""
376 385
377 386 def start(self):
378 387 """Start a Tk enabled event loop."""
379 388
380 389 import Tkinter
381 390 doi = self.do_one_iteration
382 391
383 392 # For Tkinter, we create a Tk object and call its withdraw method.
384 393 class Timer(object):
385 394 def __init__(self, func):
386 395 self.app = Tkinter.Tk()
387 396 self.app.withdraw()
388 397 self.func = func
389 398 def on_timer(self):
390 399 self.func()
391 400 self.app.after(50, self.on_timer)
392 401 def start(self):
393 402 self.on_timer() # Call it once to get things going.
394 403 self.app.mainloop()
395 404
396 405 self.timer = Timer(doi)
397 406 self.timer.start()
398 407
399 408 #-----------------------------------------------------------------------------
400 409 # Kernel main and launch functions
401 410 #-----------------------------------------------------------------------------
402 411
403 412 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, hb_port=0,
404 413 independent=False, pylab=False):
405 414 """ Launches a localhost kernel, binding to the specified ports.
406 415
407 416 Parameters
408 417 ----------
409 418 xrep_port : int, optional
410 419 The port to use for XREP channel.
411 420
412 421 pub_port : int, optional
413 422 The port to use for the SUB channel.
414 423
415 424 req_port : int, optional
416 425 The port to use for the REQ (raw input) channel.
417 426
418 427 hb_port : int, optional
419 428 The port to use for the hearbeat REP channel.
420 429
421 430 independent : bool, optional (default False)
422 431 If set, the kernel process is guaranteed to survive if this process
423 432 dies. If not set, an effort is made to ensure that the kernel is killed
424 433 when this process dies. Note that in this case it is still good practice
425 434 to kill kernels manually before exiting.
426 435
427 436 pylab : bool or string, optional (default False)
428 437 If not False, the kernel will be launched with pylab enabled. If a
429 438 string is passed, matplotlib will use the specified backend. Otherwise,
430 439 matplotlib's default backend will be used.
431 440
432 441 Returns
433 442 -------
434 443 A tuple of form:
435 444 (kernel_process, xrep_port, pub_port, req_port)
436 445 where kernel_process is a Popen object and the ports are integers.
437 446 """
438 447 extra_arguments = []
439 448 if pylab:
440 449 extra_arguments.append('--pylab')
441 450 if isinstance(pylab, basestring):
442 451 extra_arguments.append(pylab)
443 452 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
444 453 xrep_port, pub_port, req_port, hb_port,
445 454 independent, extra_arguments)
446 455
447 456
448 457 def main():
449 458 """ The IPython kernel main entry point.
450 459 """
451 460 parser = make_argument_parser()
452 461 parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
453 462 const='auto', help = \
454 463 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
455 464 given, the GUI backend is matplotlib's, otherwise use one of: \
456 465 ['tk', 'gtk', 'qt', 'wx', 'payload-svg'].")
457 466 namespace = parser.parse_args()
458 467
459 468 kernel_class = Kernel
460 469
461 470 _kernel_classes = {
462 471 'qt' : QtKernel,
463 472 'qt4' : QtKernel,
464 473 'payload-svg': Kernel,
465 474 'wx' : WxKernel,
466 475 'tk' : TkKernel
467 476 }
468 477 if namespace.pylab:
469 478 if namespace.pylab == 'auto':
470 479 gui, backend = pylabtools.find_gui_and_backend()
471 480 else:
472 481 gui, backend = pylabtools.find_gui_and_backend(namespace.pylab)
473 482 kernel_class = _kernel_classes.get(gui)
474 483 if kernel_class is None:
475 484 raise ValueError('GUI is not supported: %r' % gui)
476 485 pylabtools.activate_matplotlib(backend)
477 486
478 487 kernel = make_kernel(namespace, kernel_class, OutStream)
479 488
480 489 if namespace.pylab:
481 490 pylabtools.import_pylab(kernel.shell.user_ns)
482 491
483 492 start_kernel(namespace, kernel)
484 493
485 494
486 495 if __name__ == '__main__':
487 496 main()
General Comments 0
You need to be logged in to leave comments. Login now