test_subprocessio.py
155 lines
| 4.6 KiB
| text/x-python
|
PythonLexer
r130 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r620 | # Copyright (C) 2014-2019 RhodeCode GmbH | |||
r130 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
import io | ||||
import os | ||||
import sys | ||||
import pytest | ||||
from vcsserver import subprocessio | ||||
r638 | class KindaFilelike(object): # pragma: no cover | |||
def __init__(self, data, size): | ||||
chunks = size / len(data) | ||||
self.stream = self._get_stream(data, chunks) | ||||
def _get_stream(self, data, chunks): | ||||
for x in xrange(chunks): | ||||
yield data | ||||
def read(self, n): | ||||
buffer_stream = '' | ||||
for chunk in self.stream: | ||||
buffer_stream += chunk | ||||
if len(buffer_stream) >= n: | ||||
break | ||||
# self.stream = self.bytes[n:] | ||||
return buffer_stream | ||||
r130 | @pytest.fixture(scope='module') | |||
def environ(): | ||||
"""Delete coverage variables, as they make the tests fail.""" | ||||
env = dict(os.environ) | ||||
for key in env.keys(): | ||||
if key.startswith('COV_CORE_'): | ||||
del env[key] | ||||
return env | ||||
def _get_python_args(script): | ||||
r589 | return [sys.executable, '-c', 'import sys; import time; import shutil; ' + script] | |||
r130 | ||||
def test_raise_exception_on_non_zero_return_code(environ): | ||||
args = _get_python_args('sys.exit(1)') | ||||
with pytest.raises(EnvironmentError): | ||||
list(subprocessio.SubprocessIOChunker(args, shell=False, env=environ)) | ||||
def test_does_not_fail_on_non_zero_return_code(environ): | ||||
args = _get_python_args('sys.exit(1)') | ||||
r589 | output = ''.join( | |||
subprocessio.SubprocessIOChunker( | ||||
args, shell=False, fail_on_return_code=False, env=environ | ||||
) | ||||
) | ||||
r130 | ||||
assert output == '' | ||||
def test_raise_exception_on_stderr(environ): | ||||
args = _get_python_args('sys.stderr.write("X"); time.sleep(1);') | ||||
with pytest.raises(EnvironmentError) as excinfo: | ||||
list(subprocessio.SubprocessIOChunker(args, shell=False, env=environ)) | ||||
assert 'exited due to an error:\nX' in str(excinfo.value) | ||||
def test_does_not_fail_on_stderr(environ): | ||||
args = _get_python_args('sys.stderr.write("X"); time.sleep(1);') | ||||
r589 | output = ''.join( | |||
subprocessio.SubprocessIOChunker( | ||||
args, shell=False, fail_on_stderr=False, env=environ | ||||
) | ||||
) | ||||
r130 | ||||
assert output == '' | ||||
r589 | @pytest.mark.parametrize('size', [1, 10 ** 5]) | |||
r130 | def test_output_with_no_input(size, environ): | |||
r589 | print(type(environ)) | |||
r130 | data = 'X' | |||
args = _get_python_args('sys.stdout.write("%s" * %d)' % (data, size)) | ||||
r589 | output = ''.join(subprocessio.SubprocessIOChunker(args, shell=False, env=environ)) | |||
r130 | ||||
assert output == data * size | ||||
r589 | @pytest.mark.parametrize('size', [1, 10 ** 5]) | |||
r130 | def test_output_with_no_input_does_not_fail(size, environ): | |||
data = 'X' | ||||
r589 | args = _get_python_args('sys.stdout.write("%s" * %d); sys.exit(1)' % (data, size)) | |||
output = ''.join( | ||||
subprocessio.SubprocessIOChunker( | ||||
args, shell=False, fail_on_return_code=False, env=environ | ||||
) | ||||
) | ||||
r130 | ||||
r589 | print("{} {}".format(len(data * size), len(output))) | |||
r130 | assert output == data * size | |||
r589 | @pytest.mark.parametrize('size', [1, 10 ** 5]) | |||
r130 | def test_output_with_input(size, environ): | |||
r638 | data_len = size | |||
inputstream = KindaFilelike('X', size) | ||||
r130 | # This acts like the cat command. | |||
args = _get_python_args('shutil.copyfileobj(sys.stdin, sys.stdout)') | ||||
r589 | output = ''.join( | |||
subprocessio.SubprocessIOChunker( | ||||
args, shell=False, inputstream=inputstream, env=environ | ||||
) | ||||
) | ||||
r130 | ||||
r638 | assert len(output) == data_len | |||
r130 | ||||
r589 | @pytest.mark.parametrize('size', [1, 10 ** 5]) | |||
r130 | def test_output_with_input_skipping_iterator(size, environ): | |||
r638 | data_len = size | |||
inputstream = KindaFilelike('X', size) | ||||
r130 | # This acts like the cat command. | |||
args = _get_python_args('shutil.copyfileobj(sys.stdin, sys.stdout)') | ||||
# Note: assigning the chunker makes sure that it is not deleted too early | ||||
chunker = subprocessio.SubprocessIOChunker( | ||||
r589 | args, shell=False, inputstream=inputstream, env=environ | |||
) | ||||
r130 | output = ''.join(chunker.output) | |||
r638 | assert len(output) == data_len | |||