conftest.py
382 lines
| 11.6 KiB
| text/x-python
|
PythonLexer
r5607 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r2456 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
py.test config for test suite for making push/pull operations. | ||||
""" | ||||
import os | ||||
r5607 | ||||
import pyramid.paster | ||||
r2456 | import pytest | |||
r4879 | import logging | |||
r5087 | import requests | |||
r2456 | ||||
r2457 | from rhodecode import events | |||
r5607 | from rhodecode.lib.type_utils import AttributeDict | |||
r2979 | from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \ | |||
UserToRepoBranchPermission, User | ||||
r2457 | from rhodecode.model.integration import IntegrationModel | |||
r2456 | from rhodecode.model.db import Repository | |||
from rhodecode.model.meta import Session | ||||
r2457 | from rhodecode.integrations.types.webhook import WebhookIntegrationType | |||
r5607 | ||||
r5459 | from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO | |||
r5154 | from rhodecode.tests.conftest import HTTPBIN_DOMAIN, HTTPBIN_POST | |||
r5607 | from rhodecode.tests.fixtures.rc_fixture import Fixture | |||
from rhodecode.tests.fixtures.fixture_utils import backend_base | ||||
from rhodecode.tests.utils import set_anonymous_access, AuthPluginManager | ||||
from rhodecode.tests import console_printer | ||||
r5087 | ||||
r2456 | REPO_GROUP = 'a_repo_group' | |||
r5087 | HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}' | |||
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}' | ||||
r5459 | SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}' | |||
r2456 | ||||
r4879 | log = logging.getLogger(__name__) | |||
r5087 | ||||
def check_httpbin_connection(): | ||||
r5459 | log.debug('Checking if HTTPBIN_DOMAIN: %s is available', HTTPBIN_DOMAIN) | |||
r5087 | try: | |||
r5459 | response = requests.get(HTTPBIN_DOMAIN, timeout=5) | |||
r5087 | return response.status_code == 200 | |||
except Exception as e: | ||||
r5607 | console_printer(e) | |||
r5087 | ||||
return False | ||||
r5607 | #overrides backend_N with init_pyramid_app instead of baseapp | |||
@pytest.fixture() | ||||
def vcs_backend_git(request, init_pyramid_app, test_repo): | ||||
return backend_base(request, 'git', test_repo) | ||||
@pytest.fixture() | ||||
def vcs_backend_hg(request, init_pyramid_app, test_repo): | ||||
return backend_base(request, 'hg', test_repo) | ||||
@pytest.fixture() | ||||
def vcs_backend_svn(request, init_pyramid_app, test_repo): | ||||
return backend_base(request, 'svn', test_repo) | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r5607 | def tmp_storage_location(request, tmpdir_factory): | |||
r2456 | """ | |||
r5607 | Defines a module level storage_location, used mostly to define per-test persistent repo storage | |||
shared across vcsserver, rhodecode and celery | ||||
""" | ||||
r2456 | ||||
r5607 | dest = tmpdir_factory.mktemp('tmp_storage_location_', numbered=True) | |||
log.info("Creating test TMP directory at %s", dest) | ||||
return dest | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r5607 | def repo_group_repos(request): | |||
r2456 | """Create a copy of each test repo in a repo group.""" | |||
r5607 | ||||
r2456 | fixture = Fixture() | |||
repo_group = fixture.create_repo_group(REPO_GROUP) | ||||
repo_group_id = repo_group.group_id | ||||
fixture.create_fork(HG_REPO, HG_REPO, | ||||
repo_name_full=HG_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
fixture.create_fork(GIT_REPO, GIT_REPO, | ||||
repo_name_full=GIT_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
r5459 | fixture.create_fork(SVN_REPO, SVN_REPO, | |||
repo_name_full=SVN_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
r2456 | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
fixture.destroy_repo(HG_REPO_WITH_GROUP) | ||||
fixture.destroy_repo(GIT_REPO_WITH_GROUP) | ||||
r5459 | fixture.destroy_repo(SVN_REPO_WITH_GROUP) | |||
r2456 | fixture.destroy_repo_group(repo_group_id) | |||
r5607 | @pytest.fixture(scope='module') | |||
def rcstack_vcsserver_factory(vcsserver_factory): | ||||
return vcsserver_factory | ||||
@pytest.fixture(scope='module') | ||||
def rcstack_celery_factory(celery_factory): | ||||
return celery_factory | ||||
@pytest.fixture(scope='module') | ||||
def rcstack_rhodecode_factory(rhodecode_factory): | ||||
return rhodecode_factory | ||||
r2456 | ||||
r5607 | @pytest.fixture(scope='module') | |||
def init_pyramid_app(request, available_port_factory, ini_config_factory, rcstack_vcsserver_factory, tmp_storage_location): | ||||
from rhodecode.lib.config_utils import get_app_config | ||||
from rhodecode.config.middleware import make_pyramid_app | ||||
r2457 | ||||
r5607 | store_dir = tmp_storage_location | |||
port = available_port_factory() | ||||
rcstack_vcsserver_factory( | ||||
request, | ||||
store_dir=store_dir, | ||||
port=port, | ||||
info_prefix='init-app-' | ||||
) | ||||
app_ini_config = ini_config_factory(store_dir) | ||||
pyramid.paster.setup_logging(app_ini_config) | ||||
settings = get_app_config(app_ini_config) | ||||
settings['startup.import_repos'] = True | ||||
settings['vcs.server'] = f'localhost:{port}' | ||||
settings['repo_store.path'] = str(store_dir) | ||||
pyramid_app = make_pyramid_app({'__file__': app_ini_config}, **settings) | ||||
return pyramid_app | ||||
r2456 | ||||
r5607 | @pytest.fixture(scope='module') | |||
def rcstack(request, tmp_storage_location, rcextensions, available_port_factory, rcstack_vcsserver_factory, rcstack_celery_factory, rcstack_rhodecode_factory): | ||||
r2456 | """ | |||
r5607 | Runs minimal rcstack, i.e vcsserver, celery, rhodecode unpacks rcextensions and repos to a shared location | |||
r2457 | """ | |||
r5607 | rcstack_data = AttributeDict() | |||
store_dir = tmp_storage_location | ||||
r2456 | ||||
r5459 | vcsserver_port: int = available_port_factory() | |||
r5607 | vcsserver_log = os.path.join(tmp_storage_location, 'vcsserver.log') | |||
log.info('Using vcsserver test port %s and log %s', vcsserver_port, vcsserver_log) # start vcsserver | ||||
_factory = rcstack_vcsserver_factory( | ||||
request, | ||||
store_dir=store_dir, | ||||
port=vcsserver_port, | ||||
log_file=vcsserver_log, | ||||
overrides=( | ||||
{'handler_console': {'level': 'DEBUG'}}, | ||||
)) | ||||
rcstack_data.vcsserver_port = vcsserver_port | ||||
rcstack_data.vcsserver_log = _factory.log_file | ||||
r2456 | ||||
r5607 | celery_log = os.path.join(tmp_storage_location, 'celery.log') | |||
log.info('Using celery log %s', celery_log) | ||||
# start celery | ||||
_factory = rcstack_celery_factory( | ||||
request, | ||||
store_dir=store_dir, | ||||
port=None, | ||||
log_file=celery_log, | ||||
r2462 | overrides=( | |||
r5607 | {'handler_console': {'level': 'DEBUG'}}, | |||
{'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}}, | ||||
{'app:main': {'repo_store.path': store_dir}} | ||||
r2462 | )) | |||
r2456 | ||||
r5607 | rcstack_data.celery_log = _factory.log_file | |||
rhodecode_port: int = available_port_factory() | ||||
rhodecode_log = os.path.join(tmp_storage_location, 'rhodecode.log') | ||||
log.info('Using rhodecode test port %s and log %s', rhodecode_port, rhodecode_port) | ||||
r2456 | ||||
r5607 | # start rhodecode | |||
rc = rcstack_rhodecode_factory( | ||||
request, | ||||
store_dir=store_dir, | ||||
port=rhodecode_port, | ||||
log_file=rhodecode_log, | ||||
overrides=( | ||||
{'handler_console': {'level': 'DEBUG'}}, | ||||
{'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}}, | ||||
{'app:main': {'repo_store.path': store_dir}} | ||||
)) | ||||
r2456 | ||||
r5607 | rcstack_data.rhodecode_port = rhodecode_port | |||
rcstack_data.rhodecode_log = rc.log_file | ||||
rc.rcstack_data = rcstack_data | ||||
return rc | ||||
r2456 | ||||
r3946 | @pytest.fixture() | |||
r5607 | def disable_locking(init_pyramid_app): | |||
r2456 | r = Repository.get_by_repo_name(GIT_REPO) | |||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
r3946 | @pytest.fixture() | |||
r5607 | def disable_anonymous_user(request, init_pyramid_app, db_connection): | |||
set_anonymous_access(False) | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
set_anonymous_access(True) | ||||
@pytest.fixture(scope='module') | ||||
def enable_auth_plugins(request, init_pyramid_app): | ||||
""" | ||||
Return a factory object that when called, allows to control which | ||||
authentication plugins are enabled. | ||||
""" | ||||
enabler = AuthPluginManager() | ||||
request.addfinalizer(enabler.cleanup) | ||||
return enabler | ||||
@pytest.fixture() | ||||
r2456 | def fs_repo_only(request, rhodecode_fixtures): | |||
def fs_repo_fabric(repo_name, repo_type): | ||||
rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type) | ||||
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False) | ||||
def cleanup(): | ||||
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True) | ||||
rhodecode_fixtures.destroy_repo_on_filesystem(repo_name) | ||||
request.addfinalizer(cleanup) | ||||
return fs_repo_fabric | ||||
r2457 | ||||
r3946 | @pytest.fixture() | |||
r2457 | def enable_webhook_push_integration(request): | |||
integration = Integration() | ||||
integration.integration_type = WebhookIntegrationType.key | ||||
Session().add(integration) | ||||
settings = dict( | ||||
r5087 | url=HTTPBIN_POST, | |||
r2457 | secret_token='secret', | |||
username=None, | ||||
password=None, | ||||
custom_header_key=None, | ||||
custom_header_val=None, | ||||
r2832 | method_type='post', | |||
r2457 | events=[events.RepoPushEvent.name], | |||
log_data=True | ||||
) | ||||
IntegrationModel().update_integration( | ||||
integration, | ||||
name='IntegrationWebhookTest', | ||||
enabled=True, | ||||
settings=settings, | ||||
repo=None, | ||||
repo_group=None, | ||||
child_repos_only=False, | ||||
) | ||||
Session().commit() | ||||
integration_id = integration.integration_id | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
integration = Integration.get(integration_id) | ||||
Session().delete(integration) | ||||
Session().commit() | ||||
r2979 | ||||
r3946 | @pytest.fixture() | |||
r2979 | def branch_permission_setter(request): | |||
""" | ||||
def my_test(branch_permission_setter) | ||||
branch_permission_setter(repo_name, username, pattern='*', permission='branch.push') | ||||
""" | ||||
rule_id = None | ||||
write_perm_id = None | ||||
r4009 | write_perm = None | |||
rule = None | ||||
r2979 | ||||
def _branch_permissions_setter( | ||||
repo_name, username, pattern='*', permission='branch.push_force'): | ||||
global rule_id, write_perm_id | ||||
r4009 | global rule, write_perm | |||
r2979 | ||||
repo = Repository.get_by_repo_name(repo_name) | ||||
repo_id = repo.repo_id | ||||
user = User.get_by_username(username) | ||||
user_id = user.user_id | ||||
rule_perm_obj = Permission.get_by_key(permission) | ||||
# add new entry, based on existing perm entry | ||||
perm = UserRepoToPerm.query() \ | ||||
.filter(UserRepoToPerm.repository_id == repo_id) \ | ||||
.filter(UserRepoToPerm.user_id == user_id) \ | ||||
.first() | ||||
if not perm: | ||||
# such user isn't defined in Permissions for repository | ||||
# we now on-the-fly add new permission | ||||
write_perm = UserRepoToPerm() | ||||
write_perm.permission = Permission.get_by_key('repository.write') | ||||
write_perm.repository_id = repo_id | ||||
write_perm.user_id = user_id | ||||
Session().add(write_perm) | ||||
Session().flush() | ||||
perm = write_perm | ||||
rule = UserToRepoBranchPermission() | ||||
rule.rule_to_perm_id = perm.repo_to_perm_id | ||||
rule.branch_pattern = pattern | ||||
rule.rule_order = 10 | ||||
rule.permission = rule_perm_obj | ||||
rule.repository_id = repo_id | ||||
Session().add(rule) | ||||
Session().commit() | ||||
return rule | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
if rule: | ||||
Session().delete(rule) | ||||
Session().commit() | ||||
if write_perm: | ||||
Session().delete(write_perm) | ||||
Session().commit() | ||||
return _branch_permissions_setter | ||||