##// END OF EJS Templates
Greatly increased frontend performance by improving kernel stdout/stderr buffering.
epatters -
Show More
@@ -1,535 +1,547 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Finish implementing `raw_input`.
7 7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 8 call set_parent on all the PUB objects with the message about to be executed.
9 9 * Implement random port and security key logic.
10 10 * Implement control messages.
11 11 * Implement event loop and poll version.
12 12 """
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 from code import CommandCompiler
21 from cStringIO import StringIO
21 22 import os
22 23 import sys
23 24 from threading import Thread
24 25 import time
25 26 import traceback
26 27
27 28 # System library imports.
28 29 import zmq
29 30
30 31 # Local imports.
31 32 from IPython.external.argparse import ArgumentParser
32 33 from session import Session, Message, extract_header
33 34 from completer import KernelCompleter
34 35
35 36 #-----------------------------------------------------------------------------
36 37 # Kernel and stream classes
37 38 #-----------------------------------------------------------------------------
38 39
39 40 class InStream(object):
40 41 """ A file like object that reads from a 0MQ XREQ socket."""
41 42
42 43 def __init__(self, session, socket):
43 44 self.session = session
44 45 self.socket = socket
45 46
46 47 def close(self):
47 48 self.socket = None
48 49
49 50 def flush(self):
50 51 if self.socket is None:
51 52 raise ValueError('I/O operation on closed file')
52 53
53 54 def isatty(self):
54 55 return False
55 56
56 57 def next(self):
57 58 raise IOError('Seek not supported.')
58 59
59 60 def read(self, size=-1):
60 61 # FIXME: Do we want another request for this?
61 62 string = '\n'.join(self.readlines())
62 63 return self._truncate(string, size)
63 64
64 65 def readline(self, size=-1):
65 66 if self.socket is None:
66 67 raise ValueError('I/O operation on closed file')
67 68 else:
68 69 content = dict(size=size)
69 70 msg = self.session.msg('readline_request', content=content)
70 71 reply = self._request(msg)
71 72 line = reply['content']['line']
72 73 return self._truncate(line, size)
73 74
74 75 def readlines(self, sizehint=-1):
75 76 # Sizehint is ignored, as is permitted.
76 77 if self.socket is None:
77 78 raise ValueError('I/O operation on closed file')
78 79 else:
79 80 lines = []
80 81 while True:
81 82 line = self.readline()
82 83 if line:
83 84 lines.append(line)
84 85 else:
85 86 break
86 87 return lines
87 88
88 89 def seek(self, offset, whence=None):
89 90 raise IOError('Seek not supported.')
90 91
91 92 def write(self, string):
92 93 raise IOError('Write not supported on a read only stream.')
93 94
94 95 def writelines(self, sequence):
95 96 raise IOError('Write not supported on a read only stream.')
96 97
97 98 def _request(self, msg):
98 99 # Flush output before making the request. This ensures, for example,
99 100 # that raw_input(prompt) actually gets a prompt written.
100 101 sys.stderr.flush()
101 102 sys.stdout.flush()
102 103
103 104 self.socket.send_json(msg)
104 105 while True:
105 106 try:
106 107 reply = self.socket.recv_json(zmq.NOBLOCK)
107 108 except zmq.ZMQError, e:
108 109 if e.errno == zmq.EAGAIN:
109 110 pass
110 111 else:
111 112 raise
112 113 else:
113 114 break
114 115 return reply
115 116
116 117 def _truncate(self, string, size):
117 118 if size >= 0:
118 119 if isinstance(string, str):
119 120 return string[:size]
120 121 elif isinstance(string, unicode):
121 122 encoded = string.encode('utf-8')[:size]
122 123 return encoded.decode('utf-8', 'ignore')
123 124 return string
124 125
125 126
126 127 class OutStream(object):
127 128 """A file like object that publishes the stream to a 0MQ PUB socket."""
128 129
129 def __init__(self, session, pub_socket, name, max_buffer=200):
130 # The time interval between automatic flushes, in seconds.
131 flush_interval = 0.05
132
133 def __init__(self, session, pub_socket, name):
130 134 self.session = session
131 135 self.pub_socket = pub_socket
132 136 self.name = name
133 self._buffer = []
134 self._buffer_len = 0
135 self.max_buffer = max_buffer
136 137 self.parent_header = {}
138 self._new_buffer()
137 139
138 140 def set_parent(self, parent):
139 141 self.parent_header = extract_header(parent)
140 142
141 143 def close(self):
142 144 self.pub_socket = None
143 145
144 146 def flush(self):
145 147 if self.pub_socket is None:
146 148 raise ValueError(u'I/O operation on closed file')
147 149 else:
148 if self._buffer:
149 data = ''.join(self._buffer)
150 data = self._buffer.getvalue()
151 if data:
150 152 content = {u'name':self.name, u'data':data}
151 153 msg = self.session.msg(u'stream', content=content,
152 154 parent=self.parent_header)
153 155 print>>sys.__stdout__, Message(msg)
154 156 self.pub_socket.send_json(msg)
155 self._buffer_len = 0
156 self._buffer = []
157
158 self._buffer.close()
159 self._new_buffer()
157 160
158 161 def isatty(self):
159 162 return False
160 163
161 164 def next(self):
162 165 raise IOError('Read not supported on a write only stream.')
163 166
164 def read(self, size=None):
167 def read(self, size=-1):
165 168 raise IOError('Read not supported on a write only stream.')
166 169
167 readline=read
170 def readline(self, size=-1):
171 raise IOError('Read not supported on a write only stream.')
168 172
169 def write(self, s):
173 def write(self, string):
170 174 if self.pub_socket is None:
171 175 raise ValueError('I/O operation on closed file')
172 176 else:
173 self._buffer.append(s)
174 self._buffer_len += len(s)
175 self._maybe_send()
176
177 def _maybe_send(self):
178 if '\n' in self._buffer[-1]:
179 self.flush()
180 if self._buffer_len > self.max_buffer:
177 self._buffer.write(string)
178 current_time = time.time()
179 if self._start <= 0:
180 self._start = current_time
181 elif current_time - self._start > self.flush_interval:
181 182 self.flush()
182 183
183 184 def writelines(self, sequence):
184 185 if self.pub_socket is None:
185 186 raise ValueError('I/O operation on closed file')
186 187 else:
187 for s in sequence:
188 self.write(s)
188 for string in sequence:
189 self.write(string)
190
191 def _new_buffer(self):
192 self._buffer = StringIO()
193 self._start = -1
189 194
190 195
191 196 class DisplayHook(object):
192 197
193 198 def __init__(self, session, pub_socket):
194 199 self.session = session
195 200 self.pub_socket = pub_socket
196 201 self.parent_header = {}
197 202
198 203 def __call__(self, obj):
199 204 if obj is None:
200 205 return
201 206
202 207 __builtin__._ = obj
203 208 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
204 209 parent=self.parent_header)
205 210 self.pub_socket.send_json(msg)
206 211
207 212 def set_parent(self, parent):
208 213 self.parent_header = extract_header(parent)
209 214
210 215
211 216 class Kernel(object):
212 217
213 218 def __init__(self, session, reply_socket, pub_socket):
214 219 self.session = session
215 220 self.reply_socket = reply_socket
216 221 self.pub_socket = pub_socket
217 222 self.user_ns = {}
218 223 self.history = []
219 224 self.compiler = CommandCompiler()
220 225 self.completer = KernelCompleter(self.user_ns)
221 226
222 227 # Build dict of handlers for message types
223 228 msg_types = [ 'execute_request', 'complete_request',
224 229 'object_info_request' ]
225 230 self.handlers = {}
226 231 for msg_type in msg_types:
227 232 self.handlers[msg_type] = getattr(self, msg_type)
228 233
229 234 def abort_queue(self):
230 235 while True:
231 236 try:
232 237 ident = self.reply_socket.recv(zmq.NOBLOCK)
233 238 except zmq.ZMQError, e:
234 239 if e.errno == zmq.EAGAIN:
235 240 break
236 241 else:
237 242 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
238 243 msg = self.reply_socket.recv_json()
239 244 print>>sys.__stdout__, "Aborting:"
240 245 print>>sys.__stdout__, Message(msg)
241 246 msg_type = msg['msg_type']
242 247 reply_type = msg_type.split('_')[0] + '_reply'
243 248 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
244 249 print>>sys.__stdout__, Message(reply_msg)
245 250 self.reply_socket.send(ident,zmq.SNDMORE)
246 251 self.reply_socket.send_json(reply_msg)
247 252 # We need to wait a bit for requests to come in. This can probably
248 253 # be set shorter for true asynchronous clients.
249 254 time.sleep(0.1)
250 255
251 256 def execute_request(self, ident, parent):
252 257 try:
253 258 code = parent[u'content'][u'code']
254 259 except:
255 260 print>>sys.__stderr__, "Got bad msg: "
256 261 print>>sys.__stderr__, Message(parent)
257 262 return
258 263 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
259 264 self.pub_socket.send_json(pyin_msg)
265
260 266 try:
261 267 comp_code = self.compiler(code, '<zmq-kernel>')
262 268 sys.displayhook.set_parent(parent)
263 269 exec comp_code in self.user_ns, self.user_ns
264 270 except:
265 271 result = u'error'
266 272 etype, evalue, tb = sys.exc_info()
267 273 tb = traceback.format_exception(etype, evalue, tb)
268 274 exc_content = {
269 275 u'status' : u'error',
270 276 u'traceback' : tb,
271 277 u'ename' : unicode(etype.__name__),
272 278 u'evalue' : unicode(evalue)
273 279 }
274 280 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
275 281 self.pub_socket.send_json(exc_msg)
276 282 reply_content = exc_content
277 283 else:
278 284 reply_content = {'status' : 'ok'}
285
286 # Flush output before sending the reply.
287 sys.stderr.flush()
288 sys.stdout.flush()
289
290 # Send the reply.
279 291 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
280 292 print>>sys.__stdout__, Message(reply_msg)
281 293 self.reply_socket.send(ident, zmq.SNDMORE)
282 294 self.reply_socket.send_json(reply_msg)
283 295 if reply_msg['content']['status'] == u'error':
284 296 self.abort_queue()
285 297
286 298 def complete_request(self, ident, parent):
287 299 matches = {'matches' : self.complete(parent),
288 300 'status' : 'ok'}
289 301 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
290 302 matches, parent, ident)
291 303 print >> sys.__stdout__, completion_msg
292 304
293 305 def complete(self, msg):
294 306 return self.completer.complete(msg.content.line, msg.content.text)
295 307
296 308 def object_info_request(self, ident, parent):
297 309 context = parent['content']['oname'].split('.')
298 310 object_info = self.object_info(context)
299 311 msg = self.session.send(self.reply_socket, 'object_info_reply',
300 312 object_info, parent, ident)
301 313 print >> sys.__stdout__, msg
302 314
303 315 def object_info(self, context):
304 316 symbol, leftover = self.symbol_from_context(context)
305 317 if symbol is not None and not leftover:
306 318 doc = getattr(symbol, '__doc__', '')
307 319 else:
308 320 doc = ''
309 321 object_info = dict(docstring = doc)
310 322 return object_info
311 323
312 324 def symbol_from_context(self, context):
313 325 if not context:
314 326 return None, context
315 327
316 328 base_symbol_string = context[0]
317 329 symbol = self.user_ns.get(base_symbol_string, None)
318 330 if symbol is None:
319 331 symbol = __builtin__.__dict__.get(base_symbol_string, None)
320 332 if symbol is None:
321 333 return None, context
322 334
323 335 context = context[1:]
324 336 for i, name in enumerate(context):
325 337 new_symbol = getattr(symbol, name, None)
326 338 if new_symbol is None:
327 339 return symbol, context[i:]
328 340 else:
329 341 symbol = new_symbol
330 342
331 343 return symbol, []
332 344
333 345 def start(self):
334 346 while True:
335 347 ident = self.reply_socket.recv()
336 348 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
337 349 msg = self.reply_socket.recv_json()
338 350 omsg = Message(msg)
339 351 print>>sys.__stdout__
340 352 print>>sys.__stdout__, omsg
341 353 handler = self.handlers.get(omsg.msg_type, None)
342 354 if handler is None:
343 355 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
344 356 else:
345 357 handler(ident, omsg)
346 358
347 359 #-----------------------------------------------------------------------------
348 360 # Kernel main and launch functions
349 361 #-----------------------------------------------------------------------------
350 362
351 363 class ExitPollerUnix(Thread):
352 364 """ A Unix-specific daemon thread that terminates the program immediately
353 365 when this process' parent process no longer exists.
354 366 """
355 367
356 368 def __init__(self):
357 369 super(ExitPollerUnix, self).__init__()
358 370 self.daemon = True
359 371
360 372 def run(self):
361 373 # We cannot use os.waitpid because it works only for child processes.
362 374 from errno import EINTR
363 375 while True:
364 376 try:
365 377 if os.getppid() == 1:
366 378 os._exit(1)
367 379 time.sleep(1.0)
368 380 except OSError, e:
369 381 if e.errno == EINTR:
370 382 continue
371 383 raise
372 384
373 385 class ExitPollerWindows(Thread):
374 386 """ A Windows-specific daemon thread that terminates the program immediately
375 387 when a Win32 handle is signaled.
376 388 """
377 389
378 390 def __init__(self, handle):
379 391 super(ExitPollerWindows, self).__init__()
380 392 self.daemon = True
381 393 self.handle = handle
382 394
383 395 def run(self):
384 396 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
385 397 result = WaitForSingleObject(self.handle, INFINITE)
386 398 if result == WAIT_OBJECT_0:
387 399 os._exit(1)
388 400
389 401
390 402 def bind_port(socket, ip, port):
391 403 """ Binds the specified ZMQ socket. If the port is less than zero, a random
392 404 port is chosen. Returns the port that was bound.
393 405 """
394 406 connection = 'tcp://%s' % ip
395 407 if port <= 0:
396 408 port = socket.bind_to_random_port(connection)
397 409 else:
398 410 connection += ':%i' % port
399 411 socket.bind(connection)
400 412 return port
401 413
402 414
403 415 def main():
404 416 """ Main entry point for launching a kernel.
405 417 """
406 418 # Parse command line arguments.
407 419 parser = ArgumentParser()
408 420 parser.add_argument('--ip', type=str, default='127.0.0.1',
409 421 help='set the kernel\'s IP address [default: local]')
410 422 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
411 423 help='set the XREP channel port [default: random]')
412 424 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
413 425 help='set the PUB channel port [default: random]')
414 426 parser.add_argument('--req', type=int, metavar='PORT', default=0,
415 427 help='set the REQ channel port [default: random]')
416 428 if sys.platform == 'win32':
417 429 parser.add_argument('--parent', type=int, metavar='HANDLE',
418 430 default=0, help='kill this process if the process '
419 431 'with HANDLE dies')
420 432 else:
421 433 parser.add_argument('--parent', action='store_true',
422 434 help='kill this process if its parent dies')
423 435 namespace = parser.parse_args()
424 436
425 437 # Create a context, a session, and the kernel sockets.
426 438 print >>sys.__stdout__, "Starting the kernel..."
427 439 context = zmq.Context()
428 440 session = Session(username=u'kernel')
429 441
430 442 reply_socket = context.socket(zmq.XREP)
431 443 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
432 444 print >>sys.__stdout__, "XREP Channel on port", xrep_port
433 445
434 446 pub_socket = context.socket(zmq.PUB)
435 447 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
436 448 print >>sys.__stdout__, "PUB Channel on port", pub_port
437 449
438 450 req_socket = context.socket(zmq.XREQ)
439 451 req_port = bind_port(req_socket, namespace.ip, namespace.req)
440 452 print >>sys.__stdout__, "REQ Channel on port", req_port
441 453
442 454 # Redirect input streams and set a display hook.
443 455 sys.stdin = InStream(session, req_socket)
444 456 sys.stdout = OutStream(session, pub_socket, u'stdout')
445 457 sys.stderr = OutStream(session, pub_socket, u'stderr')
446 458 sys.displayhook = DisplayHook(session, pub_socket)
447 459
448 460 # Create the kernel.
449 461 kernel = Kernel(session, reply_socket, pub_socket)
450 462
451 463 # Configure this kernel/process to die on parent termination, if necessary.
452 464 if namespace.parent:
453 465 if sys.platform == 'win32':
454 466 poller = ExitPollerWindows(namespace.parent)
455 467 else:
456 468 poller = ExitPollerUnix()
457 469 poller.start()
458 470
459 471 # Start the kernel mainloop.
460 472 kernel.start()
461 473
462 474
463 475 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
464 476 """ Launches a localhost kernel, binding to the specified ports.
465 477
466 478 Parameters
467 479 ----------
468 480 xrep_port : int, optional
469 481 The port to use for XREP channel.
470 482
471 483 pub_port : int, optional
472 484 The port to use for the SUB channel.
473 485
474 486 req_port : int, optional
475 487 The port to use for the REQ (raw input) channel.
476 488
477 489 independent : bool, optional (default False)
478 490 If set, the kernel process is guaranteed to survive if this process
479 491 dies. If not set, an effort is made to ensure that the kernel is killed
480 492 when this process dies. Note that in this case it is still good practice
481 493 to kill kernels manually before exiting.
482 494
483 495 Returns
484 496 -------
485 497 A tuple of form:
486 498 (kernel_process, xrep_port, pub_port, req_port)
487 499 where kernel_process is a Popen object and the ports are integers.
488 500 """
489 501 import socket
490 502 from subprocess import Popen
491 503
492 504 # Find open ports as necessary.
493 505 ports = []
494 506 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
495 507 for i in xrange(ports_needed):
496 508 sock = socket.socket()
497 509 sock.bind(('', 0))
498 510 ports.append(sock)
499 511 for i, sock in enumerate(ports):
500 512 port = sock.getsockname()[1]
501 513 sock.close()
502 514 ports[i] = port
503 515 if xrep_port <= 0:
504 516 xrep_port = ports.pop(0)
505 517 if pub_port <= 0:
506 518 pub_port = ports.pop(0)
507 519 if req_port <= 0:
508 520 req_port = ports.pop(0)
509 521
510 522 # Spawn a kernel.
511 523 command = 'from IPython.zmq.kernel import main; main()'
512 524 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
513 525 '--pub', str(pub_port), '--req', str(req_port) ]
514 526 if independent:
515 527 if sys.platform == 'win32':
516 528 proc = Popen(['start', '/b'] + arguments, shell=True)
517 529 else:
518 530 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
519 531 else:
520 532 if sys.platform == 'win32':
521 533 from _subprocess import DuplicateHandle, GetCurrentProcess, \
522 534 DUPLICATE_SAME_ACCESS
523 535 pid = GetCurrentProcess()
524 536 handle = DuplicateHandle(pid, pid, pid, 0,
525 537 True, # Inheritable by new processes.
526 538 DUPLICATE_SAME_ACCESS)
527 539 proc = Popen(arguments + ['--parent', str(int(handle))])
528 540 else:
529 541 proc = Popen(arguments + ['--parent'])
530 542
531 543 return proc, xrep_port, pub_port, req_port
532 544
533 545
534 546 if __name__ == '__main__':
535 547 main()
General Comments 0
You need to be logged in to leave comments. Login now