##// END OF EJS Templates
unbundle: free temporary objects after use...
unbundle: free temporary objects after use This reduces peak RSS for larger unbundle operations by ~30 Bytes per changeset on AMD64. This can't be a direct delete for Python 2.7, so reset the object instead and leave a comment. The efilesset object can't be deleted as it is referenced by the local onchangelog function and Python 2.7 rejects a delete on the existance of a nested scope. Differential Revision: https://phab.mercurial-scm.org/D9153

File last commit:

r45708:a37f290a default
r46321:44d84b72 default
Show More
test-stdio.py
365 lines | 11.1 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
"""
from __future__ import absolute_import
import contextlib
import errno
import os
import signal
import subprocess
import sys
import tempfile
import unittest
from mercurial import pycompat, util
if pycompat.ispy3:
def set_noninheritable(fd):
# On Python 3, file descriptors are non-inheritable by default.
pass
else:
if pycompat.iswindows:
# unused
set_noninheritable = None
else:
import fcntl
def set_noninheritable(fd):
old = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
TEST_BUFFERING_CHILD_SCRIPT = r'''
import os
from mercurial import dispatch
from mercurial.utils import procutil
dispatch.initstdio()
procutil.{stream}.write(b'aaa')
os.write(procutil.{stream}.fileno(), b'[written aaa]')
procutil.{stream}.write(b'bbb\n')
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
'''
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
import os
import signal
import sys
from mercurial import dispatch
from mercurial.utils import procutil
signal.signal(signal.SIGINT, lambda *x: None)
dispatch.initstdio()
write_result = procutil.{stream}.write(b'x' * 1048576)
with os.fdopen(
os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)),
'w',
) as write_result_f:
write_result_f.write(str(write_result))
'''
TEST_BROKEN_PIPE_CHILD_SCRIPT = r'''
import os
import pickle
from mercurial import dispatch
from mercurial.utils import procutil
dispatch.initstdio()
procutil.stdin.read(1) # wait until parent process closed pipe
try:
procutil.{stream}.write(b'test')
procutil.{stream}.flush()
except EnvironmentError as e:
with os.fdopen(
os.open(
{err_fn!r},
os.O_WRONLY
| getattr(os, 'O_BINARY', 0)
| getattr(os, 'O_TEMPORARY', 0),
),
'wb',
) as err_f:
pickle.dump(e, err_f)
# Exit early to suppress further broken pipe errors at interpreter shutdown.
os._exit(0)
'''
@contextlib.contextmanager
def _closing(fds):
try:
yield
finally:
for fd in fds:
try:
os.close(fd)
except EnvironmentError:
pass
# In the following, we set the FDs non-inheritable mainly to make it possible
# for tests to close the receiving end of the pipe / PTYs.
@contextlib.contextmanager
def _devnull():
devnull = os.open(os.devnull, os.O_WRONLY)
# We don't have a receiving end, so it's not worth the effort on Python 2
# on Windows to make the FD non-inheritable.
with _closing([devnull]):
yield (None, devnull)
@contextlib.contextmanager
def _pipes():
rwpair = os.pipe()
# Pipes are already non-inheritable on Windows.
if not pycompat.iswindows:
set_noninheritable(rwpair[0])
set_noninheritable(rwpair[1])
with _closing(rwpair):
yield rwpair
@contextlib.contextmanager
def _ptys():
if pycompat.iswindows:
raise unittest.SkipTest("PTYs are not supported on Windows")
import pty
import tty
rwpair = pty.openpty()
set_noninheritable(rwpair[0])
set_noninheritable(rwpair[1])
with _closing(rwpair):
tty.setraw(rwpair[0])
yield rwpair
def _readall(fd, buffer_size, initial_buf=None):
buf = initial_buf or []
while True:
try:
s = os.read(fd, buffer_size)
except OSError as e:
if e.errno == errno.EIO:
# If the child-facing PTY got closed, reading from the
# parent-facing PTY raises EIO.
break
raise
if not s:
break
buf.append(s)
return b''.join(buf)
class TestStdio(unittest.TestCase):
def _test(
self,
child_script,
stream,
rwpair_generator,
check_output,
python_args=[],
post_child_check=None,
stdin_generator=None,
):
assert stream in ('stdout', 'stderr')
if stdin_generator is None:
stdin_generator = open(os.devnull, 'rb')
with rwpair_generator() as (
stream_receiver,
child_stream,
), stdin_generator as child_stdin:
proc = subprocess.Popen(
[sys.executable] + python_args + ['-c', child_script],
stdin=child_stdin,
stdout=child_stream if stream == 'stdout' else None,
stderr=child_stream if stream == 'stderr' else None,
)
try:
os.close(child_stream)
if stream_receiver is not None:
check_output(stream_receiver, proc)
except: # re-raises
proc.terminate()
raise
finally:
retcode = proc.wait()
self.assertEqual(retcode, 0)
if post_child_check is not None:
post_child_check()
def _test_buffering(
self, stream, rwpair_generator, expected_output, python_args=[]
):
def check_output(stream_receiver, proc):
self.assertEqual(_readall(stream_receiver, 1024), expected_output)
self._test(
TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream),
stream,
rwpair_generator,
check_output,
python_args,
)
def test_buffering_stdout_devnull(self):
self._test_buffering('stdout', _devnull, None)
def test_buffering_stdout_pipes(self):
self._test_buffering('stdout', _pipes, FULLY_BUFFERED)
def test_buffering_stdout_ptys(self):
self._test_buffering('stdout', _ptys, LINE_BUFFERED)
def test_buffering_stdout_devnull_unbuffered(self):
self._test_buffering('stdout', _devnull, None, python_args=['-u'])
def test_buffering_stdout_pipes_unbuffered(self):
self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u'])
def test_buffering_stdout_ptys_unbuffered(self):
self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u'])
if not pycompat.ispy3 and not pycompat.iswindows:
# On Python 2 on non-Windows, we manually open stdout in line-buffered
# mode if connected to a TTY. We should check if Python was configured
# to use unbuffered stdout, but it's hard to do that.
test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
test_buffering_stdout_ptys_unbuffered
)
def _test_large_write(self, stream, rwpair_generator, python_args=[]):
if not pycompat.ispy3 and pycompat.isdarwin:
# Python 2 doesn't always retry on EINTR, but the libc might retry.
# So far, it was observed only on macOS that EINTR is raised at the
# Python level. As Python 2 support will be dropped soon-ish, we
# won't attempt to fix it.
raise unittest.SkipTest("raises EINTR on macOS")
def check_output(stream_receiver, proc):
if not pycompat.iswindows:
# On Unix, we can provoke a partial write() by interrupting it
# by a signal handler as soon as a bit of data was written.
# We test that write() is called until all data is written.
buf = [os.read(stream_receiver, 1)]
proc.send_signal(signal.SIGINT)
else:
# On Windows, there doesn't seem to be a way to cause partial
# writes.
buf = []
self.assertEqual(
_readall(stream_receiver, 131072, buf), b'x' * 1048576
)
def post_child_check():
write_result_str = write_result_f.read()
if pycompat.ispy3:
# On Python 3, we test that the correct number of bytes is
# claimed to have been written.
expected_write_result_str = '1048576'
else:
# On Python 2, we only check that the large write does not
# crash.
expected_write_result_str = 'None'
self.assertEqual(write_result_str, expected_write_result_str)
with tempfile.NamedTemporaryFile('r') as write_result_f:
self._test(
TEST_LARGE_WRITE_CHILD_SCRIPT.format(
stream=stream, write_result_fn=write_result_f.name
),
stream,
rwpair_generator,
check_output,
python_args,
post_child_check=post_child_check,
)
def test_large_write_stdout_devnull(self):
self._test_large_write('stdout', _devnull)
def test_large_write_stdout_pipes(self):
self._test_large_write('stdout', _pipes)
def test_large_write_stdout_ptys(self):
self._test_large_write('stdout', _ptys)
def test_large_write_stdout_devnull_unbuffered(self):
self._test_large_write('stdout', _devnull, python_args=['-u'])
def test_large_write_stdout_pipes_unbuffered(self):
self._test_large_write('stdout', _pipes, python_args=['-u'])
def test_large_write_stdout_ptys_unbuffered(self):
self._test_large_write('stdout', _ptys, python_args=['-u'])
def test_large_write_stderr_devnull(self):
self._test_large_write('stderr', _devnull)
def test_large_write_stderr_pipes(self):
self._test_large_write('stderr', _pipes)
def test_large_write_stderr_ptys(self):
self._test_large_write('stderr', _ptys)
def test_large_write_stderr_devnull_unbuffered(self):
self._test_large_write('stderr', _devnull, python_args=['-u'])
def test_large_write_stderr_pipes_unbuffered(self):
self._test_large_write('stderr', _pipes, python_args=['-u'])
def test_large_write_stderr_ptys_unbuffered(self):
self._test_large_write('stderr', _ptys, python_args=['-u'])
def _test_broken_pipe(self, stream):
assert stream in ('stdout', 'stderr')
def check_output(stream_receiver, proc):
os.close(stream_receiver)
proc.stdin.write(b'x')
proc.stdin.close()
def post_child_check():
err = util.pickle.load(err_f)
self.assertEqual(err.errno, errno.EPIPE)
self.assertEqual(err.strerror, "Broken pipe")
with tempfile.NamedTemporaryFile('rb') as err_f:
self._test(
TEST_BROKEN_PIPE_CHILD_SCRIPT.format(
stream=stream, err_fn=err_f.name
),
stream,
_pipes,
check_output,
post_child_check=post_child_check,
stdin_generator=util.nullcontextmanager(subprocess.PIPE),
)
def test_broken_pipe_stdout(self):
self._test_broken_pipe('stdout')
def test_broken_pipe_stderr(self):
self._test_broken_pipe('stderr')
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)