test_simplevcs.py
492 lines
| 18.1 KiB
| text/x-python
|
PythonLexer
r1 | ||||
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import mock | ||||
import pytest | ||||
r1256 | ||||
r5087 | from rhodecode.lib.str_utils import base64_to_str | |||
r2140 | from rhodecode.lib.utils2 import AttributeDict | |||
r1256 | from rhodecode.tests.utils import CustomTestApp | |||
r1 | ||||
from rhodecode.lib.caching_query import FromCache | ||||
Martin Bornhold
|
r592 | from rhodecode.lib.hooks_daemon import DummyHooksCallbackDaemon | ||
r1 | from rhodecode.lib.middleware import simplevcs | |||
from rhodecode.lib.middleware.https_fixup import HttpsFixup | ||||
Martin Bornhold
|
r975 | from rhodecode.lib.middleware.utils import scm_app_http | ||
r1 | from rhodecode.model.db import User, _hash_key | |||
r5087 | from rhodecode.model.meta import Session, cache as db_cache | |||
r1 | from rhodecode.tests import ( | |||
HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS) | ||||
from rhodecode.tests.lib.middleware import mock_scm_app | ||||
class StubVCSController(simplevcs.SimpleVCS): | ||||
SCM = 'hg' | ||||
stub_response_body = tuple() | ||||
r757 | def __init__(self, *args, **kwargs): | |||
super(StubVCSController, self).__init__(*args, **kwargs) | ||||
Martin Bornhold
|
r912 | self._action = 'pull' | ||
r2069 | self._is_shadow_repo_dir = True | |||
Martin Bornhold
|
r912 | self._name = HG_REPO | ||
self.set_repo_names(None) | ||||
r757 | ||||
r2069 | @property | |||
def is_shadow_repo_dir(self): | ||||
return self._is_shadow_repo_dir | ||||
r1 | def _get_repository_name(self, environ): | |||
Martin Bornhold
|
r912 | return self._name | ||
r1 | ||||
def _get_action(self, environ): | ||||
Martin Bornhold
|
r912 | return self._action | ||
r1 | ||||
def _create_wsgi_app(self, repo_path, repo_name, config): | ||||
def fake_app(environ, start_response): | ||||
r1906 | headers = [ | |||
('Http-Accept', 'application/mercurial') | ||||
] | ||||
start_response('200 OK', headers) | ||||
r1 | return self.stub_response_body | |||
return fake_app | ||||
r3781 | def _create_config(self, extras, repo_name, scheme='http'): | |||
r1 | return None | |||
r3946 | @pytest.fixture() | |||
r2351 | def vcscontroller(baseapp, config_stub, request_stub): | |||
r5087 | from rhodecode.config.middleware import ce_auth_resources | |||
r1 | config_stub.testing_securitypolicy() | |||
config_stub.include('rhodecode.authentication') | ||||
r5087 | ||||
for resource in ce_auth_resources: | ||||
config_stub.include(resource) | ||||
r1 | ||||
r2140 | controller = StubVCSController( | |||
r2358 | baseapp.config.get_settings(), request_stub.registry) | |||
app = HttpsFixup(controller, baseapp.config.get_settings()) | ||||
r1256 | app = CustomTestApp(app) | |||
r1 | ||||
_remove_default_user_from_query_cache() | ||||
# Sanity checks that things are set up correctly | ||||
app.get('/' + HG_REPO, status=200) | ||||
app.controller = controller | ||||
return app | ||||
def _remove_default_user_from_query_cache(): | ||||
user = User.get_default_user(cache=True) | ||||
query = Session().query(User).filter(User.username == user.username) | ||||
r1749 | query = query.options( | |||
r5087 | FromCache("sql_cache_short", f"get_user_{_hash_key(user.username)}")) | |||
db_cache.invalidate( | ||||
query, {}, | ||||
FromCache("sql_cache_short", f"get_user_{_hash_key(user.username)}")) | ||||
r1 | Session().expire(user) | |||
def test_handles_exceptions_during_permissions_checks( | ||||
r5087 | vcscontroller, disable_anonymous_user, enable_auth_plugins, test_user_factory): | |||
test_password = 'qweqwe' | ||||
test_user = test_user_factory(password=test_password, extern_type='headers', extern_name='headers') | ||||
test_username = test_user.username | ||||
enable_auth_plugins.enable([ | ||||
'egg:rhodecode-enterprise-ce#headers', | ||||
'egg:rhodecode-enterprise-ce#token', | ||||
'egg:rhodecode-enterprise-ce#rhodecode'], | ||||
override={ | ||||
'egg:rhodecode-enterprise-ce#headers': {'auth_headers_header': 'REMOTE_USER'} | ||||
}) | ||||
user_and_pass = f'{test_username}:{test_password}' | ||||
auth_password = base64_to_str(user_and_pass) | ||||
r1 | extra_environ = { | |||
'AUTH_TYPE': 'Basic', | ||||
r5087 | 'HTTP_AUTHORIZATION': f'Basic {auth_password}', | |||
'REMOTE_USER': test_username, | ||||
r1 | } | |||
r5087 | # Verify that things are hooked up correctly, we pass user with headers bound auth, and headers filled in | |||
r1 | vcscontroller.get('/', status=200, extra_environ=extra_environ) | |||
# Simulate trouble during permission checks | ||||
with mock.patch('rhodecode.model.db.User.get_by_username', | ||||
r5087 | side_effect=Exception('permission_error_test')) as get_user: | |||
r1 | # Verify that a correct 500 is returned and check that the expected | |||
# code path was hit. | ||||
vcscontroller.get('/', status=500, extra_environ=extra_environ) | ||||
assert get_user.called | ||||
def test_returns_forbidden_if_no_anonymous_access( | ||||
vcscontroller, disable_anonymous_user): | ||||
vcscontroller.get('/', status=401) | ||||
class StubFailVCSController(simplevcs.SimpleVCS): | ||||
def _handle_request(self, environ, start_response): | ||||
raise Exception("BOOM") | ||||
@pytest.fixture(scope='module') | ||||
r2351 | def fail_controller(baseapp): | |||
r2140 | controller = StubFailVCSController( | |||
r2358 | baseapp.config.get_settings(), baseapp.config) | |||
controller = HttpsFixup(controller, baseapp.config.get_settings()) | ||||
r1256 | controller = CustomTestApp(controller) | |||
r1 | return controller | |||
def test_handles_exceptions_as_internal_server_error(fail_controller): | ||||
fail_controller.get('/', status=500) | ||||
def test_provides_traceback_for_appenlight(fail_controller): | ||||
response = fail_controller.get( | ||||
'/', status=500, extra_environ={'appenlight.client': 'fake'}) | ||||
assert 'appenlight.__traceback' in response.request.environ | ||||
r2351 | def test_provides_utils_scm_app_as_scm_app_by_default(baseapp, request_stub): | |||
r2358 | controller = StubVCSController(baseapp.config.get_settings(), request_stub.registry) | |||
Martin Bornhold
|
r975 | assert controller.scm_app is scm_app_http | ||
r1 | ||||
r2351 | def test_allows_to_override_scm_app_via_config(baseapp, request_stub): | |||
r2358 | config = baseapp.config.get_settings().copy() | |||
r1 | config['vcs.scm_app_implementation'] = ( | |||
'rhodecode.tests.lib.middleware.mock_scm_app') | ||||
r2358 | controller = StubVCSController(config, request_stub.registry) | |||
r1 | assert controller.scm_app is mock_scm_app | |||
@pytest.mark.parametrize('query_string, expected', [ | ||||
('cmd=stub_command', True), | ||||
('cmd=listkeys', False), | ||||
]) | ||||
def test_should_check_locking(query_string, expected): | ||||
result = simplevcs._should_check_locking(query_string) | ||||
assert result == expected | ||||
Martin Bornhold
|
r919 | class TestShadowRepoRegularExpression(object): | ||
Martin Bornhold
|
r920 | pr_segment = 'pull-request' | ||
shadow_segment = 'repository' | ||||
Martin Bornhold
|
r919 | @pytest.mark.parametrize('url, expected', [ | ||
# repo with/without groups | ||||
Martin Bornhold
|
r920 | ('My-Repo/{pr_segment}/1/{shadow_segment}', True), | ||
('Group/My-Repo/{pr_segment}/2/{shadow_segment}', True), | ||||
('Group/Sub-Group/My-Repo/{pr_segment}/3/{shadow_segment}', True), | ||||
('Group/Sub-Group1/Sub-Group2/My-Repo/{pr_segment}/3/{shadow_segment}', True), | ||||
Martin Bornhold
|
r919 | |||
# pull request ID | ||||
Martin Bornhold
|
r920 | ('MyRepo/{pr_segment}/1/{shadow_segment}', True), | ||
('MyRepo/{pr_segment}/1234567890/{shadow_segment}', True), | ||||
('MyRepo/{pr_segment}/-1/{shadow_segment}', False), | ||||
('MyRepo/{pr_segment}/invalid/{shadow_segment}', False), | ||||
Martin Bornhold
|
r919 | |||
# unicode | ||||
Martin Bornhold
|
r920 | (u'Sp€çîál-Repö/{pr_segment}/1/{shadow_segment}', True), | ||
(u'Sp€çîál-Gröüp/Sp€çîál-Repö/{pr_segment}/1/{shadow_segment}', True), | ||||
Martin Bornhold
|
r919 | |||
# trailing/leading slash | ||||
Martin Bornhold
|
r920 | ('/My-Repo/{pr_segment}/1/{shadow_segment}', False), | ||
('My-Repo/{pr_segment}/1/{shadow_segment}/', False), | ||||
('/My-Repo/{pr_segment}/1/{shadow_segment}/', False), | ||||
Martin Bornhold
|
r919 | |||
# misc | ||||
Martin Bornhold
|
r920 | ('My-Repo/{pr_segment}/1/{shadow_segment}/extra', False), | ||
('My-Repo/{pr_segment}/1/{shadow_segment}extra', False), | ||||
Martin Bornhold
|
r919 | ]) | ||
def test_shadow_repo_regular_expression(self, url, expected): | ||||
from rhodecode.lib.middleware.simplevcs import SimpleVCS | ||||
Martin Bornhold
|
r920 | url = url.format( | ||
pr_segment=self.pr_segment, | ||||
shadow_segment=self.shadow_segment) | ||||
Martin Bornhold
|
r919 | match_obj = SimpleVCS.shadow_repo_re.match(url) | ||
assert (match_obj is not None) == expected | ||||
Martin Bornhold
|
r913 | @pytest.mark.backends('git', 'hg') | ||
class TestShadowRepoExposure(object): | ||||
Martin Bornhold
|
r919 | |||
r2140 | def test_pull_on_shadow_repo_propagates_to_wsgi_app( | |||
r2351 | self, baseapp, request_stub): | |||
Martin Bornhold
|
r913 | """ | ||
Check that a pull action to a shadow repo is propagated to the | ||||
underlying wsgi app. | ||||
""" | ||||
r2140 | controller = StubVCSController( | |||
r2358 | baseapp.config.get_settings(), request_stub.registry) | |||
Martin Bornhold
|
r913 | controller._check_ssl = mock.Mock() | ||
controller.is_shadow_repo = True | ||||
controller._action = 'pull' | ||||
r2069 | controller._is_shadow_repo_dir = True | |||
r5087 | controller.stub_response_body = (b'dummy body value',) | |||
r2425 | controller._get_default_cache_ttl = mock.Mock( | |||
return_value=(False, 0)) | ||||
Martin Bornhold
|
r913 | environ_stub = { | ||
'HTTP_HOST': 'test.example.com', | ||||
r1906 | 'HTTP_ACCEPT': 'application/mercurial', | |||
Martin Bornhold
|
r913 | 'REQUEST_METHOD': 'GET', | ||
'wsgi.url_scheme': 'http', | ||||
} | ||||
response = controller(environ_stub, mock.Mock()) | ||||
r5087 | response_body = b''.join(response) | |||
Martin Bornhold
|
r913 | |||
# Assert that we got the response from the wsgi app. | ||||
r5087 | assert response_body == b''.join(controller.stub_response_body) | |||
Martin Bornhold
|
r913 | |||
r2351 | def test_pull_on_shadow_repo_that_is_missing(self, baseapp, request_stub): | |||
r2069 | """ | |||
Check that a pull action to a shadow repo is propagated to the | ||||
underlying wsgi app. | ||||
""" | ||||
r2140 | controller = StubVCSController( | |||
r2358 | baseapp.config.get_settings(), request_stub.registry) | |||
r2069 | controller._check_ssl = mock.Mock() | |||
controller.is_shadow_repo = True | ||||
controller._action = 'pull' | ||||
controller._is_shadow_repo_dir = False | ||||
r5087 | controller.stub_response_body = (b'dummy body value',) | |||
r2069 | environ_stub = { | |||
'HTTP_HOST': 'test.example.com', | ||||
'HTTP_ACCEPT': 'application/mercurial', | ||||
'REQUEST_METHOD': 'GET', | ||||
'wsgi.url_scheme': 'http', | ||||
} | ||||
response = controller(environ_stub, mock.Mock()) | ||||
r5087 | response_body = b''.join(response) | |||
r2069 | ||||
# Assert that we got the response from the wsgi app. | ||||
r5087 | assert b'404 Not Found' in response_body | |||
r2069 | ||||
r2351 | def test_push_on_shadow_repo_raises(self, baseapp, request_stub): | |||
Martin Bornhold
|
r913 | """ | ||
Check that a push action to a shadow repo is aborted. | ||||
""" | ||||
r2140 | controller = StubVCSController( | |||
r2358 | baseapp.config.get_settings(), request_stub.registry) | |||
Martin Bornhold
|
r913 | controller._check_ssl = mock.Mock() | ||
controller.is_shadow_repo = True | ||||
controller._action = 'push' | ||||
r5087 | controller.stub_response_body = (b'dummy body value',) | |||
Martin Bornhold
|
r913 | environ_stub = { | ||
'HTTP_HOST': 'test.example.com', | ||||
r1906 | 'HTTP_ACCEPT': 'application/mercurial', | |||
Martin Bornhold
|
r913 | 'REQUEST_METHOD': 'GET', | ||
'wsgi.url_scheme': 'http', | ||||
} | ||||
response = controller(environ_stub, mock.Mock()) | ||||
r5087 | response_body = b''.join(response) | |||
Martin Bornhold
|
r913 | |||
assert response_body != controller.stub_response_body | ||||
# Assert that a 406 error is returned. | ||||
r5087 | assert b'406 Not Acceptable' in response_body | |||
Martin Bornhold
|
r913 | |||
r2351 | def test_set_repo_names_no_shadow(self, baseapp, request_stub): | |||
Martin Bornhold
|
r913 | """ | ||
Check that the set_repo_names method sets all names to the one returned | ||||
by the _get_repository_name method on a request to a non shadow repo. | ||||
""" | ||||
environ_stub = {} | ||||
r2140 | controller = StubVCSController( | |||
r2358 | baseapp.config.get_settings(), request_stub.registry) | |||
Martin Bornhold
|
r913 | controller._name = 'RepoGroup/MyRepo' | ||
controller.set_repo_names(environ_stub) | ||||
assert not controller.is_shadow_repo | ||||
assert (controller.url_repo_name == | ||||
controller.acl_repo_name == | ||||
controller.vcs_repo_name == | ||||
controller._get_repository_name(environ_stub)) | ||||
r2140 | def test_set_repo_names_with_shadow( | |||
r2351 | self, baseapp, pr_util, config_stub, request_stub): | |||
Martin Bornhold
|
r913 | """ | ||
Check that the set_repo_names method sets correct names on a request | ||||
to a shadow repo. | ||||
""" | ||||
from rhodecode.model.pull_request import PullRequestModel | ||||
pull_request = pr_util.create_pull_request() | ||||
Martin Bornhold
|
r920 | shadow_url = '{target}/{pr_segment}/{pr_id}/{shadow_segment}'.format( | ||
Martin Bornhold
|
r913 | target=pull_request.target_repo.repo_name, | ||
Martin Bornhold
|
r920 | pr_id=pull_request.pull_request_id, | ||
pr_segment=TestShadowRepoRegularExpression.pr_segment, | ||||
shadow_segment=TestShadowRepoRegularExpression.shadow_segment) | ||||
r2140 | controller = StubVCSController( | |||
r2358 | baseapp.config.get_settings(), request_stub.registry) | |||
Martin Bornhold
|
r913 | controller._name = shadow_url | ||
controller.set_repo_names({}) | ||||
# Get file system path to shadow repo for assertions. | ||||
workspace_id = PullRequestModel()._workspace_id(pull_request) | ||||
r3931 | vcs_repo_name = pull_request.target_repo.get_shadow_repository_path(workspace_id) | |||
Martin Bornhold
|
r913 | |||
assert controller.vcs_repo_name == vcs_repo_name | ||||
assert controller.url_repo_name == shadow_url | ||||
assert controller.acl_repo_name == pull_request.target_repo.repo_name | ||||
assert controller.is_shadow_repo | ||||
def test_set_repo_names_with_shadow_but_missing_pr( | ||||
r2351 | self, baseapp, pr_util, config_stub, request_stub): | |||
Martin Bornhold
|
r913 | """ | ||
Checks that the set_repo_names method enforces matching target repos | ||||
and pull request IDs. | ||||
""" | ||||
pull_request = pr_util.create_pull_request() | ||||
Martin Bornhold
|
r920 | shadow_url = '{target}/{pr_segment}/{pr_id}/{shadow_segment}'.format( | ||
Martin Bornhold
|
r913 | target=pull_request.target_repo.repo_name, | ||
Martin Bornhold
|
r920 | pr_id=999999999, | ||
pr_segment=TestShadowRepoRegularExpression.pr_segment, | ||||
shadow_segment=TestShadowRepoRegularExpression.shadow_segment) | ||||
r2140 | controller = StubVCSController( | |||
r2358 | baseapp.config.get_settings(), request_stub.registry) | |||
Martin Bornhold
|
r913 | controller._name = shadow_url | ||
controller.set_repo_names({}) | ||||
assert not controller.is_shadow_repo | ||||
assert (controller.url_repo_name == | ||||
controller.acl_repo_name == | ||||
controller.vcs_repo_name) | ||||
r2358 | @pytest.mark.usefixtures('baseapp') | |||
r1774 | class TestGenerateVcsResponse(object): | |||
r1 | ||||
def test_ensures_that_start_response_is_called_early_enough(self): | ||||
self.call_controller_with_response_body(iter(['a', 'b'])) | ||||
assert self.start_response.called | ||||
def test_invalidates_cache_after_body_is_consumed(self): | ||||
result = self.call_controller_with_response_body(iter(['a', 'b'])) | ||||
assert not self.was_cache_invalidated() | ||||
# Consume the result | ||||
list(result) | ||||
assert self.was_cache_invalidated() | ||||
def test_raises_unknown_exceptions(self): | ||||
result = self.call_controller_with_response_body( | ||||
self.raise_result_iter(vcs_kind='unknown')) | ||||
with pytest.raises(Exception): | ||||
list(result) | ||||
def test_prepare_callback_daemon_is_called(self): | ||||
r2677 | def side_effect(extras, environ, action, txn_id=None): | |||
r1 | return DummyHooksCallbackDaemon(), extras | |||
prepare_patcher = mock.patch.object( | ||||
StubVCSController, '_prepare_callback_daemon') | ||||
with prepare_patcher as prepare_mock: | ||||
prepare_mock.side_effect = side_effect | ||||
self.call_controller_with_response_body(iter(['a', 'b'])) | ||||
assert prepare_mock.called | ||||
assert prepare_mock.call_count == 1 | ||||
def call_controller_with_response_body(self, response_body): | ||||
Martin Bornhold
|
r592 | settings = { | ||
'base_path': 'fake_base_path', | ||||
'vcs.hooks.protocol': 'http', | ||||
'vcs.hooks.direct_calls': False, | ||||
} | ||||
r2140 | registry = AttributeDict() | |||
r2358 | controller = StubVCSController(settings, registry) | |||
r1 | controller._invalidate_cache = mock.Mock() | |||
controller.stub_response_body = response_body | ||||
self.start_response = mock.Mock() | ||||
result = controller._generate_vcs_response( | ||||
environ={}, start_response=self.start_response, | ||||
repo_path='fake_repo_path', | ||||
extras={}, action='push') | ||||
self.controller = controller | ||||
return result | ||||
def raise_result_iter(self, vcs_kind='repo_locked'): | ||||
""" | ||||
Simulates an exception due to a vcs raised exception if kind vcs_kind | ||||
""" | ||||
raise self.vcs_exception(vcs_kind=vcs_kind) | ||||
yield "never_reached" | ||||
def vcs_exception(self, vcs_kind='repo_locked'): | ||||
locked_exception = Exception('TEST_MESSAGE') | ||||
locked_exception._vcs_kind = vcs_kind | ||||
return locked_exception | ||||
def was_cache_invalidated(self): | ||||
return self.controller._invalidate_cache.called | ||||
r1774 | class TestInitializeGenerator(object): | |||
r1 | ||||
def test_drains_first_element(self): | ||||
gen = self.factory(['__init__', 1, 2]) | ||||
result = list(gen) | ||||
assert result == [1, 2] | ||||
@pytest.mark.parametrize('values', [ | ||||
[], | ||||
[1, 2], | ||||
]) | ||||
def test_raises_value_error(self, values): | ||||
with pytest.raises(ValueError): | ||||
self.factory(values) | ||||
@simplevcs.initialize_generator | ||||
def factory(self, iterable): | ||||
for elem in iterable: | ||||
yield elem | ||||
class TestPrepareHooksDaemon(object): | ||||
r2140 | def test_calls_imported_prepare_callback_daemon(self, app_settings, request_stub): | |||
r1 | expected_extras = {'extra1': 'value1'} | |||
daemon = DummyHooksCallbackDaemon() | ||||
r2358 | controller = StubVCSController(app_settings, request_stub.registry) | |||
r1 | prepare_patcher = mock.patch.object( | |||
simplevcs, 'prepare_callback_daemon', | ||||
return_value=(daemon, expected_extras)) | ||||
with prepare_patcher as prepare_mock: | ||||
callback_daemon, extras = controller._prepare_callback_daemon( | ||||
r2677 | expected_extras.copy(), {}, 'push') | |||
r1 | prepare_mock.assert_called_once_with( | |||
Martin Bornhold
|
r592 | expected_extras, | ||
Martin Bornhold
|
r603 | protocol=app_settings['vcs.hooks.protocol'], | ||
r2833 | host=app_settings['vcs.hooks.host'], | |||
r2677 | txn_id=None, | |||
Martin Bornhold
|
r603 | use_direct_calls=app_settings['vcs.hooks.direct_calls']) | ||
r1 | ||||
assert callback_daemon == daemon | ||||
assert extras == extras | ||||