# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ py.test config for test suite for making push/pull operations. """ import os import pyramid.paster import pytest import logging import requests from rhodecode import events from rhodecode.lib.type_utils import AttributeDict from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \ UserToRepoBranchPermission, User from rhodecode.model.integration import IntegrationModel from rhodecode.model.db import Repository from rhodecode.model.meta import Session from rhodecode.integrations.types.webhook import WebhookIntegrationType from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO from rhodecode.tests.conftest import HTTPBIN_DOMAIN, HTTPBIN_POST from rhodecode.tests.fixtures.rc_fixture import Fixture from rhodecode.tests.fixtures.fixture_utils import backend_base from rhodecode.tests.utils import set_anonymous_access, AuthPluginManager from rhodecode.tests import console_printer REPO_GROUP = 'a_repo_group' HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}' GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}' SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}' log = logging.getLogger(__name__) def check_httpbin_connection(): log.debug('Checking if HTTPBIN_DOMAIN: %s is available', HTTPBIN_DOMAIN) try: response = requests.get(HTTPBIN_DOMAIN, timeout=5) return response.status_code == 200 except Exception as e: console_printer(e) return False #overrides backend_N with init_pyramid_app instead of baseapp @pytest.fixture() def vcs_backend_git(request, init_pyramid_app, test_repo): return backend_base(request, 'git', test_repo) @pytest.fixture() def vcs_backend_hg(request, init_pyramid_app, test_repo): return backend_base(request, 'hg', test_repo) @pytest.fixture() def vcs_backend_svn(request, init_pyramid_app, test_repo): return backend_base(request, 'svn', test_repo) @pytest.fixture(scope="module") def tmp_storage_location(request, tmpdir_factory): """ Defines a module level storage_location, used mostly to define per-test persistent repo storage shared across vcsserver, rhodecode and celery """ dest = tmpdir_factory.mktemp('tmp_storage_location_', numbered=True) log.info("Creating test TMP directory at %s", dest) return dest @pytest.fixture(scope="module") def repo_group_repos(request): """Create a copy of each test repo in a repo group.""" fixture = Fixture() repo_group = fixture.create_repo_group(REPO_GROUP) repo_group_id = repo_group.group_id fixture.create_fork(HG_REPO, HG_REPO, repo_name_full=HG_REPO_WITH_GROUP, repo_group=repo_group_id) fixture.create_fork(GIT_REPO, GIT_REPO, repo_name_full=GIT_REPO_WITH_GROUP, repo_group=repo_group_id) fixture.create_fork(SVN_REPO, SVN_REPO, repo_name_full=SVN_REPO_WITH_GROUP, repo_group=repo_group_id) @request.addfinalizer def cleanup(): fixture.destroy_repo(HG_REPO_WITH_GROUP) fixture.destroy_repo(GIT_REPO_WITH_GROUP) fixture.destroy_repo(SVN_REPO_WITH_GROUP) fixture.destroy_repo_group(repo_group_id) @pytest.fixture(scope='module') def rcstack_vcsserver_factory(vcsserver_factory): return vcsserver_factory @pytest.fixture(scope='module') def rcstack_celery_factory(celery_factory): return celery_factory @pytest.fixture(scope='module') def rcstack_rhodecode_factory(rhodecode_factory): return rhodecode_factory @pytest.fixture(scope='module') def init_pyramid_app(request, available_port_factory, ini_config_factory, rcstack_vcsserver_factory, tmp_storage_location): from rhodecode.lib.config_utils import get_app_config from rhodecode.config.middleware import make_pyramid_app store_dir = tmp_storage_location port = available_port_factory() rcstack_vcsserver_factory( request, store_dir=store_dir, port=port, info_prefix='init-app-' ) app_ini_config = ini_config_factory(store_dir) pyramid.paster.setup_logging(app_ini_config) settings = get_app_config(app_ini_config) settings['startup.import_repos'] = True settings['vcs.server'] = f'localhost:{port}' settings['repo_store.path'] = str(store_dir) pyramid_app = make_pyramid_app({'__file__': app_ini_config}, **settings) return pyramid_app @pytest.fixture(scope='module') def rcstack(request, tmp_storage_location, rcextensions, available_port_factory, rcstack_vcsserver_factory, rcstack_celery_factory, rcstack_rhodecode_factory): """ Runs minimal rcstack, i.e vcsserver, celery, rhodecode unpacks rcextensions and repos to a shared location """ rcstack_data = AttributeDict() store_dir = tmp_storage_location vcsserver_port: int = available_port_factory() vcsserver_log = os.path.join(tmp_storage_location, 'vcsserver.log') log.info('Using vcsserver test port %s and log %s', vcsserver_port, vcsserver_log) # start vcsserver _factory = rcstack_vcsserver_factory( request, store_dir=store_dir, port=vcsserver_port, log_file=vcsserver_log, overrides=( {'handler_console': {'level': 'DEBUG'}}, )) rcstack_data.vcsserver_port = vcsserver_port rcstack_data.vcsserver_log = _factory.log_file celery_log = os.path.join(tmp_storage_location, 'celery.log') log.info('Using celery log %s', celery_log) # start celery _factory = rcstack_celery_factory( request, store_dir=store_dir, port=None, log_file=celery_log, overrides=( {'handler_console': {'level': 'DEBUG'}}, {'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}}, {'app:main': {'repo_store.path': store_dir}} )) rcstack_data.celery_log = _factory.log_file rhodecode_port: int = available_port_factory() rhodecode_log = os.path.join(tmp_storage_location, 'rhodecode.log') log.info('Using rhodecode test port %s and log %s', rhodecode_port, rhodecode_port) # start rhodecode rc = rcstack_rhodecode_factory( request, store_dir=store_dir, port=rhodecode_port, log_file=rhodecode_log, overrides=( {'handler_console': {'level': 'DEBUG'}}, {'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}}, {'app:main': {'repo_store.path': store_dir}} )) rcstack_data.rhodecode_port = rhodecode_port rcstack_data.rhodecode_log = rc.log_file rc.rcstack_data = rcstack_data return rc @pytest.fixture() def disable_locking(init_pyramid_app): r = Repository.get_by_repo_name(GIT_REPO) Repository.unlock(r) r.enable_locking = False Session().add(r) Session().commit() r = Repository.get_by_repo_name(HG_REPO) Repository.unlock(r) r.enable_locking = False Session().add(r) Session().commit() @pytest.fixture() def disable_anonymous_user(request, init_pyramid_app, db_connection): set_anonymous_access(False) @request.addfinalizer def cleanup(): set_anonymous_access(True) @pytest.fixture(scope='module') def enable_auth_plugins(request, init_pyramid_app): """ Return a factory object that when called, allows to control which authentication plugins are enabled. """ enabler = AuthPluginManager() request.addfinalizer(enabler.cleanup) return enabler @pytest.fixture() def fs_repo_only(request, rhodecode_fixtures): def fs_repo_fabric(repo_name, repo_type): rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type) rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False) def cleanup(): rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True) rhodecode_fixtures.destroy_repo_on_filesystem(repo_name) request.addfinalizer(cleanup) return fs_repo_fabric @pytest.fixture() def enable_webhook_push_integration(request): integration = Integration() integration.integration_type = WebhookIntegrationType.key Session().add(integration) settings = dict( url=HTTPBIN_POST, secret_token='secret', username=None, password=None, custom_header_key=None, custom_header_val=None, method_type='post', events=[events.RepoPushEvent.name], log_data=True ) IntegrationModel().update_integration( integration, name='IntegrationWebhookTest', enabled=True, settings=settings, repo=None, repo_group=None, child_repos_only=False, ) Session().commit() integration_id = integration.integration_id @request.addfinalizer def cleanup(): integration = Integration.get(integration_id) Session().delete(integration) Session().commit() @pytest.fixture() def branch_permission_setter(request): """ def my_test(branch_permission_setter) branch_permission_setter(repo_name, username, pattern='*', permission='branch.push') """ rule_id = None write_perm_id = None write_perm = None rule = None def _branch_permissions_setter( repo_name, username, pattern='*', permission='branch.push_force'): global rule_id, write_perm_id global rule, write_perm repo = Repository.get_by_repo_name(repo_name) repo_id = repo.repo_id user = User.get_by_username(username) user_id = user.user_id rule_perm_obj = Permission.get_by_key(permission) # add new entry, based on existing perm entry perm = UserRepoToPerm.query() \ .filter(UserRepoToPerm.repository_id == repo_id) \ .filter(UserRepoToPerm.user_id == user_id) \ .first() if not perm: # such user isn't defined in Permissions for repository # we now on-the-fly add new permission write_perm = UserRepoToPerm() write_perm.permission = Permission.get_by_key('repository.write') write_perm.repository_id = repo_id write_perm.user_id = user_id Session().add(write_perm) Session().flush() perm = write_perm rule = UserToRepoBranchPermission() rule.rule_to_perm_id = perm.repo_to_perm_id rule.branch_pattern = pattern rule.rule_order = 10 rule.permission = rule_perm_obj rule.repository_id = repo_id Session().add(rule) Session().commit() return rule @request.addfinalizer def cleanup(): if rule: Session().delete(rule) Session().commit() if write_perm: Session().delete(write_perm) Session().commit() return _branch_permissions_setter