From 4ad2c0114261b0abe252ed68d3559f0a8e956964 2013-10-29 19:24:24 From: Thomas Kluyver Date: 2013-10-29 19:24:24 Subject: [PATCH] Simplify StreamCapturer for subprocess testing Rather than using a transient pipe for each subprocess started, the StreamCapturer now makes a single pipe, and subprocesses redirect their output to it. So long as this works on Windows (I've done brief testing, and os.pipe() seems to be functional), this will hopefully make this much more robust. The recent failures in ShiningPanda on IPython.parallel have been caused by StreamCapturer. --- diff --git a/IPython/kernel/tests/utils.py b/IPython/kernel/tests/utils.py index 1d0b9ab..c1ce603 100644 --- a/IPython/kernel/tests/utils.py +++ b/IPython/kernel/tests/utils.py @@ -43,11 +43,10 @@ KC = None def start_new_kernel(argv=None): """start a new kernel, and return its Manager and Client""" km = KernelManager() - kwargs = dict(stdout=PIPE, stderr=STDOUT) + kwargs = dict(stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT) if argv: kwargs['extra_arguments'] = argv km.start_kernel(**kwargs) - nose.ipy_stream_capturer.add_stream(km.kernel.stdout.fileno()) nose.ipy_stream_capturer.ensure_started() kc = km.client() kc.start_channels() diff --git a/IPython/parallel/tests/__init__.py b/IPython/parallel/tests/__init__.py index 367ae7b..c264eff 100644 --- a/IPython/parallel/tests/__init__.py +++ b/IPython/parallel/tests/__init__.py @@ -38,7 +38,7 @@ class TestProcessLauncher(LocalProcessLauncher): def start(self): if self.state == 'before': self.process = Popen(self.args, - stdout=PIPE, stderr=STDOUT, + stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT, env=os.environ, cwd=self.work_dir ) @@ -46,7 +46,6 @@ class TestProcessLauncher(LocalProcessLauncher): self.poll = self.process.poll # Store stdout & stderr to show with failing tests. # This is defined in IPython.testing.iptest - nose.ipy_stream_capturer.add_stream(self.process.stdout.fileno()) nose.ipy_stream_capturer.ensure_started() else: s = 'The process was already started and has state: %r' % self.state @@ -114,7 +113,6 @@ def teardown(): time.sleep(1) while launchers: p = launchers.pop() - nose.ipy_stream_capturer.remove_stream(p.process.stdout.fileno()) if p.poll() is None: try: p.stop() diff --git a/IPython/testing/iptest.py b/IPython/testing/iptest.py index e94540a..e951094 100644 --- a/IPython/testing/iptest.py +++ b/IPython/testing/iptest.py @@ -365,47 +365,22 @@ class StreamCapturer(Thread): super(StreamCapturer, self).__init__() self.streams = [] self.buffer = BytesIO() - self.streams_lock = Lock() + self.readfd, self.writefd = os.pipe() self.buffer_lock = Lock() - self.stream_added = Event() self.stop = Event() def run(self): self.started = True + while not self.stop.is_set(): - with self.streams_lock: - streams = self.streams - - if not streams: - self.stream_added.wait(timeout=1) - self.stream_added.clear() - continue - - ready = select(streams, [], [], 0.5)[0] - dead = [] - with self.buffer_lock: - for fd in ready: - try: - self.buffer.write(os.read(fd, 1024)) - except OSError as e: - import errno - if e.errno == errno.EBADF: - dead.append(fd) - else: - raise - - with self.streams_lock: - for fd in dead: - self.streams.remove(fd) - - def add_stream(self, fd): - with self.streams_lock: - self.streams.append(fd) - self.stream_added.set() + ready = select([self.readfd], [], [], 1)[0] + + if ready: + with self.buffer_lock: + self.buffer.write(os.read(self.readfd, 1024)) - def remove_stream(self, fd): - with self.streams_lock: - self.streams.remove(fd) + os.close(self.readfd) + os.close(self.writefd) def reset_buffer(self): with self.buffer_lock: @@ -426,7 +401,7 @@ class SubprocessStreamCapturePlugin(Plugin): Plugin.__init__(self) self.stream_capturer = StreamCapturer() # This is ugly, but distant parts of the test machinery need to be able - # to add streams, so we make the object globally accessible. + # to redirect streams, so we make the object globally accessible. nose.ipy_stream_capturer = self.stream_capturer def configure(self, options, config):