test_pygrack.py
295 lines
| 9.8 KiB
| text/x-python
|
PythonLexer
r130 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r1126 | # Copyright (C) 2014-2023 RhodeCode GmbH | |||
r130 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
import io | ||||
r1048 | import more_itertools | |||
r130 | ||||
import dulwich.protocol | ||||
import mock | ||||
import pytest | ||||
import webob | ||||
import webtest | ||||
from vcsserver import hooks, pygrack | ||||
r1060 | from vcsserver.str_utils import ascii_bytes | |||
r130 | ||||
@pytest.fixture() | ||||
def pygrack_instance(tmpdir): | ||||
""" | ||||
Creates a pygrack app instance. | ||||
Right now, it does not much helpful regarding the passed directory. | ||||
It just contains the required folders to pass the signature test. | ||||
""" | ||||
for dir_name in ('config', 'head', 'info', 'objects', 'refs'): | ||||
tmpdir.mkdir(dir_name) | ||||
return pygrack.GitRepository('repo_name', str(tmpdir), 'git', False, {}) | ||||
@pytest.fixture() | ||||
def pygrack_app(pygrack_instance): | ||||
""" | ||||
Creates a pygrack app wrapped in webtest.TestApp. | ||||
""" | ||||
return webtest.TestApp(pygrack_instance) | ||||
def test_invalid_service_info_refs_returns_403(pygrack_app): | ||||
response = pygrack_app.get('/info/refs?service=git-upload-packs', | ||||
expect_errors=True) | ||||
assert response.status_int == 403 | ||||
def test_invalid_endpoint_returns_403(pygrack_app): | ||||
response = pygrack_app.post('/git-upload-packs', expect_errors=True) | ||||
assert response.status_int == 403 | ||||
@pytest.mark.parametrize('sideband', [ | ||||
'side-band-64k', | ||||
'side-band', | ||||
'side-band no-progress', | ||||
]) | ||||
def test_pre_pull_hook_fails_with_sideband(pygrack_app, sideband): | ||||
request = ''.join([ | ||||
'0054want 74730d410fcb6603ace96f1dc55ea6196122532d ', | ||||
r1152 | f'multi_ack {sideband} ofs-delta\n', | |||
r130 | '0000', | |||
'0009done\n', | ||||
]) | ||||
r1048 | with mock.patch('vcsserver.hooks.git_pre_pull', return_value=hooks.HookResponse(1, 'foo')): | |||
r130 | response = pygrack_app.post( | |||
'/git-upload-pack', params=request, | ||||
content_type='application/x-git-upload-pack') | ||||
data = io.BytesIO(response.body) | ||||
proto = dulwich.protocol.Protocol(data.read, None) | ||||
packets = list(proto.read_pkt_seq()) | ||||
expected_packets = [ | ||||
r1048 | b'NAK\n', b'\x02foo', b'\x02Pre pull hook failed: aborting\n', | |||
b'\x01' + pygrack.GitRepository.EMPTY_PACK, | ||||
r130 | ] | |||
assert packets == expected_packets | ||||
def test_pre_pull_hook_fails_no_sideband(pygrack_app): | ||||
request = ''.join([ | ||||
'0054want 74730d410fcb6603ace96f1dc55ea6196122532d ' + | ||||
'multi_ack ofs-delta\n' | ||||
'0000', | ||||
'0009done\n', | ||||
]) | ||||
with mock.patch('vcsserver.hooks.git_pre_pull', | ||||
return_value=hooks.HookResponse(1, 'foo')): | ||||
response = pygrack_app.post( | ||||
'/git-upload-pack', params=request, | ||||
content_type='application/x-git-upload-pack') | ||||
assert response.body == pygrack.GitRepository.EMPTY_PACK | ||||
def test_pull_has_hook_messages(pygrack_app): | ||||
request = ''.join([ | ||||
'0054want 74730d410fcb6603ace96f1dc55ea6196122532d ' + | ||||
'multi_ack side-band-64k ofs-delta\n' | ||||
'0000', | ||||
'0009done\n', | ||||
]) | ||||
r1109 | ||||
pre_pull = 'pre_pull_output' | ||||
post_pull = 'post_pull_output' | ||||
r130 | with mock.patch('vcsserver.hooks.git_pre_pull', | |||
r1109 | return_value=hooks.HookResponse(0, pre_pull)): | |||
r130 | with mock.patch('vcsserver.hooks.git_post_pull', | |||
r1109 | return_value=hooks.HookResponse(1, post_pull)): | |||
r130 | with mock.patch('vcsserver.subprocessio.SubprocessIOChunker', | |||
r1048 | return_value=more_itertools.always_iterable([b'0008NAK\n0009subp\n0000'])): | |||
r130 | response = pygrack_app.post( | |||
'/git-upload-pack', params=request, | ||||
content_type='application/x-git-upload-pack') | ||||
data = io.BytesIO(response.body) | ||||
proto = dulwich.protocol.Protocol(data.read, None) | ||||
packets = list(proto.read_pkt_seq()) | ||||
r1109 | assert packets == [b'NAK\n', | |||
# pre-pull only outputs if IT FAILS as in != 0 ret code | ||||
#b'\x02pre_pull_output', | ||||
b'subp\n', | ||||
b'\x02post_pull_output'] | ||||
r130 | ||||
def test_get_want_capabilities(pygrack_instance): | ||||
data = io.BytesIO( | ||||
r1048 | b'0054want 74730d410fcb6603ace96f1dc55ea6196122532d ' + | |||
b'multi_ack side-band-64k ofs-delta\n00000009done\n') | ||||
r130 | ||||
request = webob.Request({ | ||||
'wsgi.input': data, | ||||
'REQUEST_METHOD': 'POST', | ||||
'webob.is_body_seekable': True | ||||
}) | ||||
capabilities = pygrack_instance._get_want_capabilities(request) | ||||
assert capabilities == frozenset( | ||||
r1048 | (b'ofs-delta', b'multi_ack', b'side-band-64k')) | |||
r130 | assert data.tell() == 0 | |||
@pytest.mark.parametrize('data,capabilities,expected', [ | ||||
('foo', [], []), | ||||
r1048 | ('', [pygrack.CAPABILITY_SIDE_BAND_64K], []), | |||
('', [pygrack.CAPABILITY_SIDE_BAND], []), | ||||
('foo', [pygrack.CAPABILITY_SIDE_BAND_64K], [b'0008\x02foo']), | ||||
('foo', [pygrack.CAPABILITY_SIDE_BAND], [b'0008\x02foo']), | ||||
('f'*1000, [pygrack.CAPABILITY_SIDE_BAND_64K], [b'03ed\x02' + b'f' * 1000]), | ||||
('f'*1000, [pygrack.CAPABILITY_SIDE_BAND], [b'03e8\x02' + b'f' * 995, b'000a\x02fffff']), | ||||
('f'*65520, [pygrack.CAPABILITY_SIDE_BAND_64K], [b'fff0\x02' + b'f' * 65515, b'000a\x02fffff']), | ||||
('f'*65520, [pygrack.CAPABILITY_SIDE_BAND], [b'03e8\x02' + b'f' * 995] * 65 + [b'0352\x02' + b'f' * 845]), | ||||
r130 | ], ids=[ | |||
'foo-empty', | ||||
'empty-64k', 'empty', | ||||
'foo-64k', 'foo', | ||||
'f-1000-64k', 'f-1000', | ||||
'f-65520-64k', 'f-65520']) | ||||
def test_get_messages(pygrack_instance, data, capabilities, expected): | ||||
messages = pygrack_instance._get_messages(data, capabilities) | ||||
assert messages == expected | ||||
@pytest.mark.parametrize('response,capabilities,pre_pull_messages,post_pull_messages', [ | ||||
# Unexpected response | ||||
r1048 | ([b'unexpected_response[no_initial_header]'], [pygrack.CAPABILITY_SIDE_BAND_64K], 'foo', 'bar'), | |||
r130 | # No sideband | |||
r1048 | ([b'no-sideband'], [], 'foo', 'bar'), | |||
r130 | # No messages | |||
r1048 | ([b'no-messages'], [pygrack.CAPABILITY_SIDE_BAND_64K], '', ''), | |||
r130 | ]) | |||
def test_inject_messages_to_response_nothing_to_do( | ||||
r1048 | pygrack_instance, response, capabilities, pre_pull_messages, post_pull_messages): | |||
r130 | ||||
r1048 | new_response = pygrack_instance._build_post_pull_response( | |||
more_itertools.always_iterable(response), capabilities, pre_pull_messages, post_pull_messages) | ||||
assert list(new_response) == response | ||||
r130 | ||||
@pytest.mark.parametrize('capabilities', [ | ||||
r1048 | [pygrack.CAPABILITY_SIDE_BAND], | |||
[pygrack.CAPABILITY_SIDE_BAND_64K], | ||||
r130 | ]) | |||
r1048 | def test_inject_messages_to_response_single_element(pygrack_instance, capabilities): | |||
response = [b'0008NAK\n0009subp\n0000'] | ||||
new_response = pygrack_instance._build_post_pull_response( | ||||
more_itertools.always_iterable(response), capabilities, 'foo', 'bar') | ||||
r130 | ||||
r1048 | expected_response = b''.join([ | |||
b'0008NAK\n', | ||||
b'0008\x02foo', | ||||
b'0009subp\n', | ||||
b'0008\x02bar', | ||||
b'0000']) | ||||
r130 | ||||
r1048 | assert b''.join(new_response) == expected_response | |||
r130 | ||||
@pytest.mark.parametrize('capabilities', [ | ||||
r1048 | [pygrack.CAPABILITY_SIDE_BAND], | |||
[pygrack.CAPABILITY_SIDE_BAND_64K], | ||||
r130 | ]) | |||
r1048 | def test_inject_messages_to_response_multi_element(pygrack_instance, capabilities): | |||
response = more_itertools.always_iterable([ | ||||
b'0008NAK\n000asubp1\n', b'000asubp2\n', b'000asubp3\n', b'000asubp4\n0000' | ||||
]) | ||||
new_response = pygrack_instance._build_post_pull_response(response, capabilities, 'foo', 'bar') | ||||
r130 | ||||
r1048 | expected_response = b''.join([ | |||
b'0008NAK\n', | ||||
b'0008\x02foo', | ||||
b'000asubp1\n', b'000asubp2\n', b'000asubp3\n', b'000asubp4\n', | ||||
b'0008\x02bar', | ||||
b'0000' | ||||
]) | ||||
r130 | ||||
r1048 | assert b''.join(new_response) == expected_response | |||
r130 | ||||
def test_build_failed_pre_pull_response_no_sideband(pygrack_instance): | ||||
response = pygrack_instance._build_failed_pre_pull_response([], 'foo') | ||||
assert response == [pygrack.GitRepository.EMPTY_PACK] | ||||
@pytest.mark.parametrize('capabilities', [ | ||||
r1048 | [pygrack.CAPABILITY_SIDE_BAND], | |||
[pygrack.CAPABILITY_SIDE_BAND_64K], | ||||
[pygrack.CAPABILITY_SIDE_BAND_64K, b'no-progress'], | ||||
r130 | ]) | |||
def test_build_failed_pre_pull_response(pygrack_instance, capabilities): | ||||
r1048 | response = pygrack_instance._build_failed_pre_pull_response(capabilities, 'foo') | |||
r130 | ||||
expected_response = [ | ||||
r1048 | b'0008NAK\n', b'0008\x02foo', b'0024\x02Pre pull hook failed: aborting\n', | |||
b'%04x\x01%s' % (len(pygrack.GitRepository.EMPTY_PACK) + 5, pygrack.GitRepository.EMPTY_PACK), | ||||
pygrack.GitRepository.FLUSH_PACKET, | ||||
r130 | ] | |||
assert response == expected_response | ||||
r1048 | ||||
def test_inject_messages_to_response_generator(pygrack_instance): | ||||
def response_generator(): | ||||
response = [ | ||||
# protocol start | ||||
b'0008NAK\n', | ||||
] | ||||
response += [ascii_bytes(f'000asubp{x}\n') for x in range(1000)] | ||||
response += [ | ||||
# protocol end | ||||
pygrack.GitRepository.FLUSH_PACKET | ||||
] | ||||
for elem in response: | ||||
yield elem | ||||
new_response = pygrack_instance._build_post_pull_response( | ||||
response_generator(), [pygrack.CAPABILITY_SIDE_BAND_64K, b'no-progress'], 'PRE_PULL_MSG\n', 'POST_PULL_MSG\n') | ||||
assert iter(new_response) | ||||
expected_response = b''.join([ | ||||
# start | ||||
b'0008NAK\n0012\x02PRE_PULL_MSG\n', | ||||
] + [ | ||||
# ... rest | ||||
ascii_bytes(f'000asubp{x}\n') for x in range(1000) | ||||
] + [ | ||||
# final message, | ||||
b'0013\x02POST_PULL_MSG\n0000', | ||||
]) | ||||
assert b''.join(new_response) == expected_response | ||||