# RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2023 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import io import more_itertools import dulwich.protocol import mock import pytest import webob import webtest from vcsserver import hooks, pygrack from vcsserver.str_utils import ascii_bytes @pytest.fixture() def pygrack_instance(tmpdir): """ Creates a pygrack app instance. Right now, it does not much helpful regarding the passed directory. It just contains the required folders to pass the signature test. """ for dir_name in ('config', 'head', 'info', 'objects', 'refs'): tmpdir.mkdir(dir_name) return pygrack.GitRepository('repo_name', str(tmpdir), 'git', False, {}) @pytest.fixture() def pygrack_app(pygrack_instance): """ Creates a pygrack app wrapped in webtest.TestApp. """ return webtest.TestApp(pygrack_instance) def test_invalid_service_info_refs_returns_403(pygrack_app): response = pygrack_app.get('/info/refs?service=git-upload-packs', expect_errors=True) assert response.status_int == 403 def test_invalid_endpoint_returns_403(pygrack_app): response = pygrack_app.post('/git-upload-packs', expect_errors=True) assert response.status_int == 403 @pytest.mark.parametrize('sideband', [ 'side-band-64k', 'side-band', 'side-band no-progress', ]) def test_pre_pull_hook_fails_with_sideband(pygrack_app, sideband): request = ''.join([ '0054want 74730d410fcb6603ace96f1dc55ea6196122532d ', f'multi_ack {sideband} ofs-delta\n', '0000', '0009done\n', ]) with mock.patch('vcsserver.hooks.git_pre_pull', return_value=hooks.HookResponse(1, 'foo')): response = pygrack_app.post( '/git-upload-pack', params=request, content_type='application/x-git-upload-pack') data = io.BytesIO(response.body) proto = dulwich.protocol.Protocol(data.read, None) packets = list(proto.read_pkt_seq()) expected_packets = [ b'NAK\n', b'\x02foo', b'\x02Pre pull hook failed: aborting\n', b'\x01' + pygrack.GitRepository.EMPTY_PACK, ] assert packets == expected_packets def test_pre_pull_hook_fails_no_sideband(pygrack_app): request = ''.join([ '0054want 74730d410fcb6603ace96f1dc55ea6196122532d ' + 'multi_ack ofs-delta\n' '0000', '0009done\n', ]) with mock.patch('vcsserver.hooks.git_pre_pull', return_value=hooks.HookResponse(1, 'foo')): response = pygrack_app.post( '/git-upload-pack', params=request, content_type='application/x-git-upload-pack') assert response.body == pygrack.GitRepository.EMPTY_PACK def test_pull_has_hook_messages(pygrack_app): request = ''.join([ '0054want 74730d410fcb6603ace96f1dc55ea6196122532d ' + 'multi_ack side-band-64k ofs-delta\n' '0000', '0009done\n', ]) pre_pull = 'pre_pull_output' post_pull = 'post_pull_output' with mock.patch('vcsserver.hooks.git_pre_pull', return_value=hooks.HookResponse(0, pre_pull)): with mock.patch('vcsserver.hooks.git_post_pull', return_value=hooks.HookResponse(1, post_pull)): with mock.patch('vcsserver.subprocessio.SubprocessIOChunker', return_value=more_itertools.always_iterable([b'0008NAK\n0009subp\n0000'])): response = pygrack_app.post( '/git-upload-pack', params=request, content_type='application/x-git-upload-pack') data = io.BytesIO(response.body) proto = dulwich.protocol.Protocol(data.read, None) packets = list(proto.read_pkt_seq()) assert packets == [b'NAK\n', # pre-pull only outputs if IT FAILS as in != 0 ret code #b'\x02pre_pull_output', b'subp\n', b'\x02post_pull_output'] def test_get_want_capabilities(pygrack_instance): data = io.BytesIO( b'0054want 74730d410fcb6603ace96f1dc55ea6196122532d ' + b'multi_ack side-band-64k ofs-delta\n00000009done\n') request = webob.Request({ 'wsgi.input': data, 'REQUEST_METHOD': 'POST', 'webob.is_body_seekable': True }) capabilities = pygrack_instance._get_want_capabilities(request) assert capabilities == frozenset( (b'ofs-delta', b'multi_ack', b'side-band-64k')) assert data.tell() == 0 @pytest.mark.parametrize('data,capabilities,expected', [ ('foo', [], []), ('', [pygrack.CAPABILITY_SIDE_BAND_64K], []), ('', [pygrack.CAPABILITY_SIDE_BAND], []), ('foo', [pygrack.CAPABILITY_SIDE_BAND_64K], [b'0008\x02foo']), ('foo', [pygrack.CAPABILITY_SIDE_BAND], [b'0008\x02foo']), ('f'*1000, [pygrack.CAPABILITY_SIDE_BAND_64K], [b'03ed\x02' + b'f' * 1000]), ('f'*1000, [pygrack.CAPABILITY_SIDE_BAND], [b'03e8\x02' + b'f' * 995, b'000a\x02fffff']), ('f'*65520, [pygrack.CAPABILITY_SIDE_BAND_64K], [b'fff0\x02' + b'f' * 65515, b'000a\x02fffff']), ('f'*65520, [pygrack.CAPABILITY_SIDE_BAND], [b'03e8\x02' + b'f' * 995] * 65 + [b'0352\x02' + b'f' * 845]), ], ids=[ 'foo-empty', 'empty-64k', 'empty', 'foo-64k', 'foo', 'f-1000-64k', 'f-1000', 'f-65520-64k', 'f-65520']) def test_get_messages(pygrack_instance, data, capabilities, expected): messages = pygrack_instance._get_messages(data, capabilities) assert messages == expected @pytest.mark.parametrize('response,capabilities,pre_pull_messages,post_pull_messages', [ # Unexpected response ([b'unexpected_response[no_initial_header]'], [pygrack.CAPABILITY_SIDE_BAND_64K], 'foo', 'bar'), # No sideband ([b'no-sideband'], [], 'foo', 'bar'), # No messages ([b'no-messages'], [pygrack.CAPABILITY_SIDE_BAND_64K], '', ''), ]) def test_inject_messages_to_response_nothing_to_do( pygrack_instance, response, capabilities, pre_pull_messages, post_pull_messages): new_response = pygrack_instance._build_post_pull_response( more_itertools.always_iterable(response), capabilities, pre_pull_messages, post_pull_messages) assert list(new_response) == response @pytest.mark.parametrize('capabilities', [ [pygrack.CAPABILITY_SIDE_BAND], [pygrack.CAPABILITY_SIDE_BAND_64K], ]) def test_inject_messages_to_response_single_element(pygrack_instance, capabilities): response = [b'0008NAK\n0009subp\n0000'] new_response = pygrack_instance._build_post_pull_response( more_itertools.always_iterable(response), capabilities, 'foo', 'bar') expected_response = b''.join([ b'0008NAK\n', b'0008\x02foo', b'0009subp\n', b'0008\x02bar', b'0000']) assert b''.join(new_response) == expected_response @pytest.mark.parametrize('capabilities', [ [pygrack.CAPABILITY_SIDE_BAND], [pygrack.CAPABILITY_SIDE_BAND_64K], ]) def test_inject_messages_to_response_multi_element(pygrack_instance, capabilities): response = more_itertools.always_iterable([ b'0008NAK\n000asubp1\n', b'000asubp2\n', b'000asubp3\n', b'000asubp4\n0000' ]) new_response = pygrack_instance._build_post_pull_response(response, capabilities, 'foo', 'bar') expected_response = b''.join([ b'0008NAK\n', b'0008\x02foo', b'000asubp1\n', b'000asubp2\n', b'000asubp3\n', b'000asubp4\n', b'0008\x02bar', b'0000' ]) assert b''.join(new_response) == expected_response def test_build_failed_pre_pull_response_no_sideband(pygrack_instance): response = pygrack_instance._build_failed_pre_pull_response([], 'foo') assert response == [pygrack.GitRepository.EMPTY_PACK] @pytest.mark.parametrize('capabilities', [ [pygrack.CAPABILITY_SIDE_BAND], [pygrack.CAPABILITY_SIDE_BAND_64K], [pygrack.CAPABILITY_SIDE_BAND_64K, b'no-progress'], ]) def test_build_failed_pre_pull_response(pygrack_instance, capabilities): response = pygrack_instance._build_failed_pre_pull_response(capabilities, 'foo') expected_response = [ b'0008NAK\n', b'0008\x02foo', b'0024\x02Pre pull hook failed: aborting\n', b'%04x\x01%s' % (len(pygrack.GitRepository.EMPTY_PACK) + 5, pygrack.GitRepository.EMPTY_PACK), pygrack.GitRepository.FLUSH_PACKET, ] assert response == expected_response def test_inject_messages_to_response_generator(pygrack_instance): def response_generator(): response = [ # protocol start b'0008NAK\n', ] response += [ascii_bytes(f'000asubp{x}\n') for x in range(1000)] response += [ # protocol end pygrack.GitRepository.FLUSH_PACKET ] for elem in response: yield elem new_response = pygrack_instance._build_post_pull_response( response_generator(), [pygrack.CAPABILITY_SIDE_BAND_64K, b'no-progress'], 'PRE_PULL_MSG\n', 'POST_PULL_MSG\n') assert iter(new_response) expected_response = b''.join([ # start b'0008NAK\n0012\x02PRE_PULL_MSG\n', ] + [ # ... rest ascii_bytes(f'000asubp{x}\n') for x in range(1000) ] + [ # final message, b'0013\x02POST_PULL_MSG\n0000', ]) assert b''.join(new_response) == expected_response