test-stdio.py
133 lines
| 3.6 KiB
| text/x-python
|
PythonLexer
/ tests / test-stdio.py
Manuel Jacob
|
r45583 | #!/usr/bin/env python | ||
""" | ||||
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`. | ||||
""" | ||||
from __future__ import absolute_import | ||||
import contextlib | ||||
Manuel Jacob
|
r45628 | import errno | ||
Manuel Jacob
|
r45583 | import os | ||
import subprocess | ||||
import sys | ||||
import unittest | ||||
from mercurial import pycompat | ||||
Manuel Jacob
|
r45630 | BUFFERING_CHILD_SCRIPT = r''' | ||
Manuel Jacob
|
r45583 | import os | ||
from mercurial import dispatch | ||||
from mercurial.utils import procutil | ||||
dispatch.initstdio() | ||||
Manuel Jacob
|
r45589 | procutil.{stream}.write(b'aaa') | ||
os.write(procutil.{stream}.fileno(), b'[written aaa]') | ||||
procutil.{stream}.write(b'bbb\n') | ||||
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]') | ||||
Manuel Jacob
|
r45583 | ''' | ||
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]' | ||||
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' | ||||
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' | ||||
@contextlib.contextmanager | ||||
def _closing(fds): | ||||
try: | ||||
yield | ||||
finally: | ||||
for fd in fds: | ||||
try: | ||||
os.close(fd) | ||||
except EnvironmentError: | ||||
pass | ||||
@contextlib.contextmanager | ||||
def _pipes(): | ||||
rwpair = os.pipe() | ||||
with _closing(rwpair): | ||||
yield rwpair | ||||
@contextlib.contextmanager | ||||
def _ptys(): | ||||
if pycompat.iswindows: | ||||
raise unittest.SkipTest("PTYs are not supported on Windows") | ||||
import pty | ||||
import tty | ||||
rwpair = pty.openpty() | ||||
with _closing(rwpair): | ||||
tty.setraw(rwpair[0]) | ||||
yield rwpair | ||||
Manuel Jacob
|
r45628 | def _readall(fd, buffer_size): | ||
buf = [] | ||||
while True: | ||||
try: | ||||
s = os.read(fd, buffer_size) | ||||
except OSError as e: | ||||
if e.errno == errno.EIO: | ||||
# If the child-facing PTY got closed, reading from the | ||||
# parent-facing PTY raises EIO. | ||||
break | ||||
raise | ||||
if not s: | ||||
break | ||||
buf.append(s) | ||||
return b''.join(buf) | ||||
Manuel Jacob
|
r45589 | class TestStdio(unittest.TestCase): | ||
def _test(self, stream, rwpair_generator, expected_output, python_args=[]): | ||||
assert stream in ('stdout', 'stderr') | ||||
with rwpair_generator() as (stream_receiver, child_stream), open( | ||||
Manuel Jacob
|
r45583 | os.devnull, 'rb' | ||
) as child_stdin: | ||||
proc = subprocess.Popen( | ||||
Manuel Jacob
|
r45589 | [sys.executable] | ||
+ python_args | ||||
Manuel Jacob
|
r45630 | + ['-c', BUFFERING_CHILD_SCRIPT.format(stream=stream)], | ||
Manuel Jacob
|
r45583 | stdin=child_stdin, | ||
Manuel Jacob
|
r45589 | stdout=child_stream if stream == 'stdout' else None, | ||
stderr=child_stream if stream == 'stderr' else None, | ||||
Manuel Jacob
|
r45583 | ) | ||
Manuel Jacob
|
r45628 | try: | ||
os.close(child_stream) | ||||
self.assertEqual( | ||||
_readall(stream_receiver, 1024), expected_output | ||||
) | ||||
Manuel Jacob
|
r45629 | except: # re-raises | ||
proc.terminate() | ||||
raise | ||||
Manuel Jacob
|
r45628 | finally: | ||
retcode = proc.wait() | ||||
Manuel Jacob
|
r45583 | self.assertEqual(retcode, 0) | ||
Manuel Jacob
|
r45630 | def test_buffering_stdout_pipes(self): | ||
Manuel Jacob
|
r45589 | self._test('stdout', _pipes, FULLY_BUFFERED) | ||
Manuel Jacob
|
r45583 | |||
Manuel Jacob
|
r45630 | def test_buffering_stdout_ptys(self): | ||
Manuel Jacob
|
r45589 | self._test('stdout', _ptys, LINE_BUFFERED) | ||
Manuel Jacob
|
r45583 | |||
Manuel Jacob
|
r45630 | def test_buffering_stdout_pipes_unbuffered(self): | ||
Manuel Jacob
|
r45589 | self._test('stdout', _pipes, UNBUFFERED, python_args=['-u']) | ||
Manuel Jacob
|
r45583 | |||
Manuel Jacob
|
r45630 | def test_buffering_stdout_ptys_unbuffered(self): | ||
Manuel Jacob
|
r45589 | self._test('stdout', _ptys, UNBUFFERED, python_args=['-u']) | ||
Manuel Jacob
|
r45583 | |||
if not pycompat.ispy3 and not pycompat.iswindows: | ||||
# On Python 2 on non-Windows, we manually open stdout in line-buffered | ||||
# mode if connected to a TTY. We should check if Python was configured | ||||
# to use unbuffered stdout, but it's hard to do that. | ||||
Manuel Jacob
|
r45630 | test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure( | ||
test_buffering_stdout_ptys_unbuffered | ||||
Manuel Jacob
|
r45583 | ) | ||
if __name__ == '__main__': | ||||
import silenttestrunner | ||||
silenttestrunner.main(__name__) | ||||