conftest.py
311 lines
| 9.6 KiB
| text/x-python
|
PythonLexer
r2456 | ||||
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r2456 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
py.test config for test suite for making push/pull operations. | ||||
.. important:: | ||||
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started | ||||
to redirect things to stderr instead of stdout. | ||||
""" | ||||
import os | ||||
import tempfile | ||||
import textwrap | ||||
import pytest | ||||
r4879 | import logging | |||
r5087 | import requests | |||
r2456 | ||||
r2457 | from rhodecode import events | |||
r4992 | from rhodecode.lib.str_utils import safe_bytes | |||
r2979 | from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \ | |||
UserToRepoBranchPermission, User | ||||
r2457 | from rhodecode.model.integration import IntegrationModel | |||
r2456 | from rhodecode.model.db import Repository | |||
from rhodecode.model.meta import Session | ||||
r2457 | from rhodecode.integrations.types.webhook import WebhookIntegrationType | |||
r5459 | from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO | |||
r5154 | from rhodecode.tests.conftest import HTTPBIN_DOMAIN, HTTPBIN_POST | |||
r2456 | from rhodecode.tests.fixture import Fixture | |||
r2457 | from rhodecode.tests.server_utils import RcWebServer | |||
r2456 | ||||
r5087 | ||||
r2456 | REPO_GROUP = 'a_repo_group' | |||
r5087 | HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}' | |||
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}' | ||||
r5459 | SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}' | |||
r2456 | ||||
r4879 | log = logging.getLogger(__name__) | |||
r5087 | ||||
def check_httpbin_connection(): | ||||
r5459 | log.debug('Checking if HTTPBIN_DOMAIN: %s is available', HTTPBIN_DOMAIN) | |||
r5087 | try: | |||
r5459 | response = requests.get(HTTPBIN_DOMAIN, timeout=5) | |||
r5087 | return response.status_code == 200 | |||
except Exception as e: | ||||
print(e) | ||||
return False | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r2457 | def rcextensions(request, db_connection, tmpdir_factory): | |||
r2456 | """ | |||
Installs a testing rcextensions pack to ensure they work as expected. | ||||
""" | ||||
init_content = textwrap.dedent(""" | ||||
# Forward import the example rcextensions to make it | ||||
# active for our tests. | ||||
from rhodecode.tests.other.example_rcextensions import * | ||||
""") | ||||
# Note: rcextensions are looked up based on the path of the ini file | ||||
root_path = tmpdir_factory.getbasetemp() | ||||
rcextensions_path = root_path.join('rcextensions') | ||||
init_path = rcextensions_path.join('__init__.py') | ||||
if rcextensions_path.check(): | ||||
pytest.fail( | ||||
"Path for rcextensions already exists, please clean up before " | ||||
"test run this path: %s" % (rcextensions_path, )) | ||||
r5087 | else: | |||
request.addfinalizer(rcextensions_path.remove) | ||||
init_path.write_binary(safe_bytes(init_content), ensure=True) | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r2457 | def repos(request, db_connection): | |||
r2456 | """Create a copy of each test repo in a repo group.""" | |||
fixture = Fixture() | ||||
repo_group = fixture.create_repo_group(REPO_GROUP) | ||||
repo_group_id = repo_group.group_id | ||||
fixture.create_fork(HG_REPO, HG_REPO, | ||||
repo_name_full=HG_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
fixture.create_fork(GIT_REPO, GIT_REPO, | ||||
repo_name_full=GIT_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
r5459 | fixture.create_fork(SVN_REPO, SVN_REPO, | |||
repo_name_full=SVN_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
r2456 | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
fixture.destroy_repo(HG_REPO_WITH_GROUP) | ||||
fixture.destroy_repo(GIT_REPO_WITH_GROUP) | ||||
r5459 | fixture.destroy_repo(SVN_REPO_WITH_GROUP) | |||
r2456 | fixture.destroy_repo_group(repo_group_id) | |||
r2457 | @pytest.fixture(scope="module") | |||
def rc_web_server_config_modification(): | ||||
return [] | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r2457 | def rc_web_server_config_factory(testini_factory, rc_web_server_config_modification): | |||
r2456 | """ | |||
Configuration file used for the fixture `rc_web_server`. | ||||
""" | ||||
r2457 | ||||
r2614 | def factory(rcweb_port, vcsserver_port): | |||
r2457 | custom_params = [ | |||
{'handler_console': {'level': 'DEBUG'}}, | ||||
r2614 | {'server:main': {'port': rcweb_port}}, | |||
r2457 | {'app:main': {'vcs.server': 'localhost:%s' % vcsserver_port}} | |||
] | ||||
custom_params.extend(rc_web_server_config_modification) | ||||
return testini_factory(custom_params) | ||||
return factory | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
def rc_web_server( | ||||
r2457 | request, vcsserver_factory, available_port_factory, | |||
rc_web_server_config_factory, repos, rcextensions): | ||||
r2456 | """ | |||
r5087 | Run the web server as a subprocess. with its own instance of vcsserver | |||
r2457 | """ | |||
r5459 | rcweb_port: int = available_port_factory() | |||
log.info('Using rcweb ops test port %s', rcweb_port) | ||||
r2456 | ||||
r5459 | vcsserver_port: int = available_port_factory() | |||
log.info('Using vcsserver ops test port %s', vcsserver_port) | ||||
r2456 | ||||
r2457 | vcs_log = os.path.join(tempfile.gettempdir(), 'rc_op_vcs.log') | |||
vcsserver_factory( | ||||
request, vcsserver_port=vcsserver_port, | ||||
log_file=vcs_log, | ||||
r2462 | overrides=( | |||
{'server:main': {'workers': 2}}, | ||||
{'server:main': {'graceful_timeout': 10}}, | ||||
)) | ||||
r2456 | ||||
r2457 | rc_log = os.path.join(tempfile.gettempdir(), 'rc_op_web.log') | |||
rc_web_server_config = rc_web_server_config_factory( | ||||
r2614 | rcweb_port=rcweb_port, | |||
r2457 | vcsserver_port=vcsserver_port) | |||
server = RcWebServer(rc_web_server_config, log_file=rc_log) | ||||
server.start() | ||||
r2456 | ||||
@request.addfinalizer | ||||
r2457 | def cleanup(): | |||
server.shutdown() | ||||
r2456 | ||||
r2457 | server.wait_until_ready() | |||
return server | ||||
r2456 | ||||
r3946 | @pytest.fixture() | |||
r2456 | def disable_locking(baseapp): | |||
r = Repository.get_by_repo_name(GIT_REPO) | ||||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
r3946 | @pytest.fixture() | |||
r2456 | def fs_repo_only(request, rhodecode_fixtures): | |||
def fs_repo_fabric(repo_name, repo_type): | ||||
rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type) | ||||
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False) | ||||
def cleanup(): | ||||
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True) | ||||
rhodecode_fixtures.destroy_repo_on_filesystem(repo_name) | ||||
request.addfinalizer(cleanup) | ||||
return fs_repo_fabric | ||||
r2457 | ||||
r3946 | @pytest.fixture() | |||
r2457 | def enable_webhook_push_integration(request): | |||
integration = Integration() | ||||
integration.integration_type = WebhookIntegrationType.key | ||||
Session().add(integration) | ||||
settings = dict( | ||||
r5087 | url=HTTPBIN_POST, | |||
r2457 | secret_token='secret', | |||
username=None, | ||||
password=None, | ||||
custom_header_key=None, | ||||
custom_header_val=None, | ||||
r2832 | method_type='post', | |||
r2457 | events=[events.RepoPushEvent.name], | |||
log_data=True | ||||
) | ||||
IntegrationModel().update_integration( | ||||
integration, | ||||
name='IntegrationWebhookTest', | ||||
enabled=True, | ||||
settings=settings, | ||||
repo=None, | ||||
repo_group=None, | ||||
child_repos_only=False, | ||||
) | ||||
Session().commit() | ||||
integration_id = integration.integration_id | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
integration = Integration.get(integration_id) | ||||
Session().delete(integration) | ||||
Session().commit() | ||||
r2979 | ||||
r3946 | @pytest.fixture() | |||
r2979 | def branch_permission_setter(request): | |||
""" | ||||
def my_test(branch_permission_setter) | ||||
branch_permission_setter(repo_name, username, pattern='*', permission='branch.push') | ||||
""" | ||||
rule_id = None | ||||
write_perm_id = None | ||||
r4009 | write_perm = None | |||
rule = None | ||||
r2979 | ||||
def _branch_permissions_setter( | ||||
repo_name, username, pattern='*', permission='branch.push_force'): | ||||
global rule_id, write_perm_id | ||||
r4009 | global rule, write_perm | |||
r2979 | ||||
repo = Repository.get_by_repo_name(repo_name) | ||||
repo_id = repo.repo_id | ||||
user = User.get_by_username(username) | ||||
user_id = user.user_id | ||||
rule_perm_obj = Permission.get_by_key(permission) | ||||
# add new entry, based on existing perm entry | ||||
perm = UserRepoToPerm.query() \ | ||||
.filter(UserRepoToPerm.repository_id == repo_id) \ | ||||
.filter(UserRepoToPerm.user_id == user_id) \ | ||||
.first() | ||||
if not perm: | ||||
# such user isn't defined in Permissions for repository | ||||
# we now on-the-fly add new permission | ||||
write_perm = UserRepoToPerm() | ||||
write_perm.permission = Permission.get_by_key('repository.write') | ||||
write_perm.repository_id = repo_id | ||||
write_perm.user_id = user_id | ||||
Session().add(write_perm) | ||||
Session().flush() | ||||
perm = write_perm | ||||
rule = UserToRepoBranchPermission() | ||||
rule.rule_to_perm_id = perm.repo_to_perm_id | ||||
rule.branch_pattern = pattern | ||||
rule.rule_order = 10 | ||||
rule.permission = rule_perm_obj | ||||
rule.repository_id = repo_id | ||||
Session().add(rule) | ||||
Session().commit() | ||||
return rule | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
if rule: | ||||
Session().delete(rule) | ||||
Session().commit() | ||||
if write_perm: | ||||
Session().delete(write_perm) | ||||
Session().commit() | ||||
return _branch_permissions_setter | ||||