# RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2023 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import io import os import sys import pytest from vcsserver import subprocessio from vcsserver.str_utils import ascii_bytes class FileLikeObj: # pragma: no cover def __init__(self, data: bytes, size): chunks = size // len(data) self.stream = self._get_stream(data, chunks) def _get_stream(self, data, chunks): for x in range(chunks): yield data def read(self, n): buffer_stream = b'' for chunk in self.stream: buffer_stream += chunk if len(buffer_stream) >= n: break # self.stream = self.bytes[n:] return buffer_stream @pytest.fixture(scope='module') def environ(): """Delete coverage variables, as they make the tests fail.""" env = dict(os.environ) for key in list(env.keys()): if key.startswith('COV_CORE_'): del env[key] return env def _get_python_args(script): return [sys.executable, '-c', 'import sys; import time; import shutil; ' + script] def test_raise_exception_on_non_zero_return_code(environ): call_args = _get_python_args('raise ValueError("fail")') with pytest.raises(OSError): b''.join(subprocessio.SubprocessIOChunker(call_args, shell=False, env=environ)) def test_does_not_fail_on_non_zero_return_code(environ): call_args = _get_python_args('sys.stdout.write("hello"); sys.exit(1)') proc = subprocessio.SubprocessIOChunker(call_args, shell=False, fail_on_return_code=False, env=environ) output = b''.join(proc) assert output == b'hello' def test_raise_exception_on_stderr(environ): call_args = _get_python_args('sys.stderr.write("WRITE_TO_STDERR"); time.sleep(1);') with pytest.raises(OSError) as excinfo: b''.join(subprocessio.SubprocessIOChunker(call_args, shell=False, env=environ)) assert 'exited due to an error:\nWRITE_TO_STDERR' in str(excinfo.value) def test_does_not_fail_on_stderr(environ): call_args = _get_python_args('sys.stderr.write("WRITE_TO_STDERR"); sys.stderr.flush; time.sleep(2);') proc = subprocessio.SubprocessIOChunker(call_args, shell=False, fail_on_stderr=False, env=environ) output = b''.join(proc) assert output == b'' @pytest.mark.parametrize('size', [ 1, 10 ** 5 ]) def test_output_with_no_input(size, environ): call_args = _get_python_args(f'sys.stdout.write("X" * {size});') proc = subprocessio.SubprocessIOChunker(call_args, shell=False, env=environ) output = b''.join(proc) assert output == ascii_bytes("X" * size) @pytest.mark.parametrize('size', [ 1, 10 ** 5 ]) def test_output_with_no_input_does_not_fail(size, environ): call_args = _get_python_args(f'sys.stdout.write("X" * {size}); sys.exit(1)') proc = subprocessio.SubprocessIOChunker(call_args, shell=False, fail_on_return_code=False, env=environ) output = b''.join(proc) assert output == ascii_bytes("X" * size) @pytest.mark.parametrize('size', [ 1, 10 ** 5 ]) def test_output_with_input(size, environ): data_len = size inputstream = FileLikeObj(b'X', size) # This acts like the cat command. call_args = _get_python_args('shutil.copyfileobj(sys.stdin, sys.stdout)') # note: in this tests we explicitly don't assign chunker to a variable and let it stream directly output = b''.join( subprocessio.SubprocessIOChunker(call_args, shell=False, input_stream=inputstream, env=environ) ) assert len(output) == data_len @pytest.mark.parametrize('size', [ 1, 10 ** 5 ]) def test_output_with_input_skipping_iterator(size, environ): data_len = size inputstream = FileLikeObj(b'X', size) # This acts like the cat command. call_args = _get_python_args('shutil.copyfileobj(sys.stdin, sys.stdout)') # Note: assigning the chunker makes sure that it is not deleted too early proc = subprocessio.SubprocessIOChunker(call_args, shell=False, input_stream=inputstream, env=environ) output = b''.join(proc.stdout) assert len(output) == data_len