##// END OF EJS Templates
Merge pull request #4456 from takluyver/stream-capturer-simplify...
Thomas Kluyver -
r13435:42b3342b merge
parent child Browse files
Show More
@@ -43,11 +43,10 b' KC = None'
43 def start_new_kernel(argv=None):
43 def start_new_kernel(argv=None):
44 """start a new kernel, and return its Manager and Client"""
44 """start a new kernel, and return its Manager and Client"""
45 km = KernelManager()
45 km = KernelManager()
46 kwargs = dict(stdout=PIPE, stderr=STDOUT)
46 kwargs = dict(stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT)
47 if argv:
47 if argv:
48 kwargs['extra_arguments'] = argv
48 kwargs['extra_arguments'] = argv
49 km.start_kernel(**kwargs)
49 km.start_kernel(**kwargs)
50 nose.ipy_stream_capturer.add_stream(km.kernel.stdout.fileno())
51 nose.ipy_stream_capturer.ensure_started()
50 nose.ipy_stream_capturer.ensure_started()
52 kc = km.client()
51 kc = km.client()
53 kc.start_channels()
52 kc.start_channels()
@@ -38,7 +38,7 b' class TestProcessLauncher(LocalProcessLauncher):'
38 def start(self):
38 def start(self):
39 if self.state == 'before':
39 if self.state == 'before':
40 self.process = Popen(self.args,
40 self.process = Popen(self.args,
41 stdout=PIPE, stderr=STDOUT,
41 stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT,
42 env=os.environ,
42 env=os.environ,
43 cwd=self.work_dir
43 cwd=self.work_dir
44 )
44 )
@@ -46,7 +46,6 b' class TestProcessLauncher(LocalProcessLauncher):'
46 self.poll = self.process.poll
46 self.poll = self.process.poll
47 # Store stdout & stderr to show with failing tests.
47 # Store stdout & stderr to show with failing tests.
48 # This is defined in IPython.testing.iptest
48 # This is defined in IPython.testing.iptest
49 nose.ipy_stream_capturer.add_stream(self.process.stdout.fileno())
50 nose.ipy_stream_capturer.ensure_started()
49 nose.ipy_stream_capturer.ensure_started()
51 else:
50 else:
52 s = 'The process was already started and has state: %r' % self.state
51 s = 'The process was already started and has state: %r' % self.state
@@ -114,7 +113,6 b' def teardown():'
114 time.sleep(1)
113 time.sleep(1)
115 while launchers:
114 while launchers:
116 p = launchers.pop()
115 p = launchers.pop()
117 nose.ipy_stream_capturer.remove_stream(p.process.stdout.fileno())
118 if p.poll() is None:
116 if p.poll() is None:
119 try:
117 try:
120 p.stop()
118 p.stop()
@@ -31,7 +31,6 b' import glob'
31 from io import BytesIO
31 from io import BytesIO
32 import os
32 import os
33 import os.path as path
33 import os.path as path
34 from select import select
35 import sys
34 import sys
36 from threading import Thread, Lock, Event
35 from threading import Thread, Lock, Event
37 import warnings
36 import warnings
@@ -365,68 +364,51 b' class StreamCapturer(Thread):'
365 super(StreamCapturer, self).__init__()
364 super(StreamCapturer, self).__init__()
366 self.streams = []
365 self.streams = []
367 self.buffer = BytesIO()
366 self.buffer = BytesIO()
368 self.streams_lock = Lock()
367 self.readfd, self.writefd = os.pipe()
369 self.buffer_lock = Lock()
368 self.buffer_lock = Lock()
370 self.stream_added = Event()
371 self.stop = Event()
369 self.stop = Event()
372
370
373 def run(self):
371 def run(self):
374 self.started = True
372 self.started = True
375 while not self.stop.is_set():
376 with self.streams_lock:
377 streams = self.streams
378
373
379 if not streams:
374 while not self.stop.is_set():
380 self.stream_added.wait(timeout=1)
375 chunk = os.read(self.readfd, 1024)
381 self.stream_added.clear()
382 continue
383
376
384 ready = select(streams, [], [], 0.5)[0]
385 dead = []
386 with self.buffer_lock:
377 with self.buffer_lock:
387 for fd in ready:
378 self.buffer.write(chunk)
388 try:
389 self.buffer.write(os.read(fd, 1024))
390 except OSError as e:
391 import errno
392 if e.errno == errno.EBADF:
393 dead.append(fd)
394 else:
395 raise
396
397 with self.streams_lock:
398 for fd in dead:
399 self.streams.remove(fd)
400
401 def add_stream(self, fd):
402 with self.streams_lock:
403 self.streams.append(fd)
404 self.stream_added.set()
405
379
406 def remove_stream(self, fd):
380 os.close(self.readfd)
407 with self.streams_lock:
381 os.close(self.writefd)
408 self.streams.remove(fd)
382
409
410 def reset_buffer(self):
383 def reset_buffer(self):
411 with self.buffer_lock:
384 with self.buffer_lock:
412 self.buffer.truncate(0)
385 self.buffer.truncate(0)
413 self.buffer.seek(0)
386 self.buffer.seek(0)
414
387
415 def get_buffer(self):
388 def get_buffer(self):
416 with self.buffer_lock:
389 with self.buffer_lock:
417 return self.buffer.getvalue()
390 return self.buffer.getvalue()
418
391
419 def ensure_started(self):
392 def ensure_started(self):
420 if not self.started:
393 if not self.started:
421 self.start()
394 self.start()
422
395
396 def halt(self):
397 """Safely stop the thread."""
398 if not self.started:
399 return
400
401 self.stop.set()
402 os.write(self.writefd, b'wake up') # Ensure we're not locked in a read()
403 self.join()
404
423 class SubprocessStreamCapturePlugin(Plugin):
405 class SubprocessStreamCapturePlugin(Plugin):
424 name='subprocstreams'
406 name='subprocstreams'
425 def __init__(self):
407 def __init__(self):
426 Plugin.__init__(self)
408 Plugin.__init__(self)
427 self.stream_capturer = StreamCapturer()
409 self.stream_capturer = StreamCapturer()
428 # This is ugly, but distant parts of the test machinery need to be able
410 # This is ugly, but distant parts of the test machinery need to be able
429 # to add streams, so we make the object globally accessible.
411 # to redirect streams, so we make the object globally accessible.
430 nose.ipy_stream_capturer = self.stream_capturer
412 nose.ipy_stream_capturer = self.stream_capturer
431
413
432 def configure(self, options, config):
414 def configure(self, options, config):
@@ -454,9 +436,7 b' class SubprocessStreamCapturePlugin(Plugin):'
454 formatError = formatFailure
436 formatError = formatFailure
455
437
456 def finalize(self, result):
438 def finalize(self, result):
457 if self.stream_capturer.started:
439 self.stream_capturer.halt()
458 self.stream_capturer.stop.set()
459 self.stream_capturer.join()
460
440
461
441
462 def run_iptest():
442 def run_iptest():
General Comments 0
You need to be logged in to leave comments. Login now