diff --git a/IPython/kernel/tests/utils.py b/IPython/kernel/tests/utils.py index 1d0b9ab..c1ce603 100644 --- a/IPython/kernel/tests/utils.py +++ b/IPython/kernel/tests/utils.py @@ -43,11 +43,10 @@ KC = None def start_new_kernel(argv=None): """start a new kernel, and return its Manager and Client""" km = KernelManager() - kwargs = dict(stdout=PIPE, stderr=STDOUT) + kwargs = dict(stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT) if argv: kwargs['extra_arguments'] = argv km.start_kernel(**kwargs) - nose.ipy_stream_capturer.add_stream(km.kernel.stdout.fileno()) nose.ipy_stream_capturer.ensure_started() kc = km.client() kc.start_channels() diff --git a/IPython/parallel/tests/__init__.py b/IPython/parallel/tests/__init__.py index 367ae7b..c264eff 100644 --- a/IPython/parallel/tests/__init__.py +++ b/IPython/parallel/tests/__init__.py @@ -38,7 +38,7 @@ class TestProcessLauncher(LocalProcessLauncher): def start(self): if self.state == 'before': self.process = Popen(self.args, - stdout=PIPE, stderr=STDOUT, + stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT, env=os.environ, cwd=self.work_dir ) @@ -46,7 +46,6 @@ class TestProcessLauncher(LocalProcessLauncher): self.poll = self.process.poll # Store stdout & stderr to show with failing tests. # This is defined in IPython.testing.iptest - nose.ipy_stream_capturer.add_stream(self.process.stdout.fileno()) nose.ipy_stream_capturer.ensure_started() else: s = 'The process was already started and has state: %r' % self.state @@ -114,7 +113,6 @@ def teardown(): time.sleep(1) while launchers: p = launchers.pop() - nose.ipy_stream_capturer.remove_stream(p.process.stdout.fileno()) if p.poll() is None: try: p.stop() diff --git a/IPython/testing/iptest.py b/IPython/testing/iptest.py index e94540a..e951094 100644 --- a/IPython/testing/iptest.py +++ b/IPython/testing/iptest.py @@ -365,47 +365,22 @@ class StreamCapturer(Thread): super(StreamCapturer, self).__init__() self.streams = [] self.buffer = BytesIO() - self.streams_lock = Lock() + self.readfd, self.writefd = os.pipe() self.buffer_lock = Lock() - self.stream_added = Event() self.stop = Event() def run(self): self.started = True + while not self.stop.is_set(): - with self.streams_lock: - streams = self.streams - - if not streams: - self.stream_added.wait(timeout=1) - self.stream_added.clear() - continue - - ready = select(streams, [], [], 0.5)[0] - dead = [] - with self.buffer_lock: - for fd in ready: - try: - self.buffer.write(os.read(fd, 1024)) - except OSError as e: - import errno - if e.errno == errno.EBADF: - dead.append(fd) - else: - raise - - with self.streams_lock: - for fd in dead: - self.streams.remove(fd) - - def add_stream(self, fd): - with self.streams_lock: - self.streams.append(fd) - self.stream_added.set() + ready = select([self.readfd], [], [], 1)[0] + + if ready: + with self.buffer_lock: + self.buffer.write(os.read(self.readfd, 1024)) - def remove_stream(self, fd): - with self.streams_lock: - self.streams.remove(fd) + os.close(self.readfd) + os.close(self.writefd) def reset_buffer(self): with self.buffer_lock: @@ -426,7 +401,7 @@ class SubprocessStreamCapturePlugin(Plugin): Plugin.__init__(self) self.stream_capturer = StreamCapturer() # This is ugly, but distant parts of the test machinery need to be able - # to add streams, so we make the object globally accessible. + # to redirect streams, so we make the object globally accessible. nose.ipy_stream_capturer = self.stream_capturer def configure(self, options, config):