test_subprocessio.py
155 lines
| 4.8 KiB
| text/x-python
|
PythonLexer
r130 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r1126 | # Copyright (C) 2014-2023 RhodeCode GmbH | |||
r130 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
import io | ||||
import os | ||||
import sys | ||||
import pytest | ||||
from vcsserver import subprocessio | ||||
r1060 | from vcsserver.str_utils import ascii_bytes | |||
r130 | ||||
r1152 | class FileLikeObj: # pragma: no cover | |||
r638 | ||||
r1048 | def __init__(self, data: bytes, size): | |||
chunks = size // len(data) | ||||
r638 | ||||
self.stream = self._get_stream(data, chunks) | ||||
def _get_stream(self, data, chunks): | ||||
r982 | for x in range(chunks): | |||
r638 | yield data | |||
def read(self, n): | ||||
r1048 | buffer_stream = b'' | |||
r638 | for chunk in self.stream: | |||
buffer_stream += chunk | ||||
if len(buffer_stream) >= n: | ||||
break | ||||
# self.stream = self.bytes[n:] | ||||
return buffer_stream | ||||
r130 | @pytest.fixture(scope='module') | |||
def environ(): | ||||
"""Delete coverage variables, as they make the tests fail.""" | ||||
env = dict(os.environ) | ||||
r1063 | for key in list(env.keys()): | |||
r130 | if key.startswith('COV_CORE_'): | |||
del env[key] | ||||
return env | ||||
def _get_python_args(script): | ||||
r589 | return [sys.executable, '-c', 'import sys; import time; import shutil; ' + script] | |||
r130 | ||||
def test_raise_exception_on_non_zero_return_code(environ): | ||||
r1048 | call_args = _get_python_args('raise ValueError("fail")') | |||
with pytest.raises(OSError): | ||||
b''.join(subprocessio.SubprocessIOChunker(call_args, shell=False, env=environ)) | ||||
r130 | ||||
def test_does_not_fail_on_non_zero_return_code(environ): | ||||
r1048 | call_args = _get_python_args('sys.stdout.write("hello"); sys.exit(1)') | |||
proc = subprocessio.SubprocessIOChunker(call_args, shell=False, fail_on_return_code=False, env=environ) | ||||
output = b''.join(proc) | ||||
r130 | ||||
r1048 | assert output == b'hello' | |||
r130 | ||||
def test_raise_exception_on_stderr(environ): | ||||
r1048 | call_args = _get_python_args('sys.stderr.write("WRITE_TO_STDERR"); time.sleep(1);') | |||
r130 | ||||
r1048 | with pytest.raises(OSError) as excinfo: | |||
b''.join(subprocessio.SubprocessIOChunker(call_args, shell=False, env=environ)) | ||||
assert 'exited due to an error:\nWRITE_TO_STDERR' in str(excinfo.value) | ||||
r130 | ||||
def test_does_not_fail_on_stderr(environ): | ||||
r1048 | call_args = _get_python_args('sys.stderr.write("WRITE_TO_STDERR"); sys.stderr.flush; time.sleep(2);') | |||
proc = subprocessio.SubprocessIOChunker(call_args, shell=False, fail_on_stderr=False, env=environ) | ||||
output = b''.join(proc) | ||||
r130 | ||||
r1048 | assert output == b'' | |||
r130 | ||||
r1048 | @pytest.mark.parametrize('size', [ | |||
1, | ||||
10 ** 5 | ||||
]) | ||||
r130 | def test_output_with_no_input(size, environ): | |||
r1048 | call_args = _get_python_args(f'sys.stdout.write("X" * {size});') | |||
proc = subprocessio.SubprocessIOChunker(call_args, shell=False, env=environ) | ||||
output = b''.join(proc) | ||||
r130 | ||||
r1048 | assert output == ascii_bytes("X" * size) | |||
r130 | ||||
r1048 | @pytest.mark.parametrize('size', [ | |||
1, | ||||
10 ** 5 | ||||
]) | ||||
r130 | def test_output_with_no_input_does_not_fail(size, environ): | |||
r1048 | call_args = _get_python_args(f'sys.stdout.write("X" * {size}); sys.exit(1)') | |||
proc = subprocessio.SubprocessIOChunker(call_args, shell=False, fail_on_return_code=False, env=environ) | ||||
output = b''.join(proc) | ||||
assert output == ascii_bytes("X" * size) | ||||
r130 | ||||
r1048 | @pytest.mark.parametrize('size', [ | |||
1, | ||||
10 ** 5 | ||||
]) | ||||
r130 | def test_output_with_input(size, environ): | |||
r638 | data_len = size | |||
r1048 | inputstream = FileLikeObj(b'X', size) | |||
r638 | ||||
r130 | # This acts like the cat command. | |||
r1048 | call_args = _get_python_args('shutil.copyfileobj(sys.stdin, sys.stdout)') | |||
# note: in this tests we explicitly don't assign chunker to a variable and let it stream directly | ||||
output = b''.join( | ||||
subprocessio.SubprocessIOChunker(call_args, shell=False, input_stream=inputstream, env=environ) | ||||
r589 | ) | |||
r130 | ||||
r638 | assert len(output) == data_len | |||
r130 | ||||
r1048 | @pytest.mark.parametrize('size', [ | |||
1, | ||||
10 ** 5 | ||||
]) | ||||
r130 | def test_output_with_input_skipping_iterator(size, environ): | |||
r638 | data_len = size | |||
r1048 | inputstream = FileLikeObj(b'X', size) | |||
r638 | ||||
r130 | # This acts like the cat command. | |||
r1048 | call_args = _get_python_args('shutil.copyfileobj(sys.stdin, sys.stdout)') | |||
r130 | ||||
# Note: assigning the chunker makes sure that it is not deleted too early | ||||
r1048 | proc = subprocessio.SubprocessIOChunker(call_args, shell=False, input_stream=inputstream, env=environ) | |||
output = b''.join(proc.stdout) | ||||
r130 | ||||
r638 | assert len(output) == data_len | |||