conftest.py
346 lines
| 10.5 KiB
| text/x-python
|
PythonLexer
r2456 | # -*- coding: utf-8 -*- | |||
r4306 | # Copyright (C) 2010-2020 RhodeCode GmbH | |||
r2456 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
py.test config for test suite for making push/pull operations. | ||||
.. important:: | ||||
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started | ||||
to redirect things to stderr instead of stdout. | ||||
""" | ||||
import os | ||||
import tempfile | ||||
import textwrap | ||||
import pytest | ||||
r4879 | import logging | |||
r2456 | ||||
r2457 | from rhodecode import events | |||
r4992 | from rhodecode.lib.str_utils import safe_bytes | |||
r2979 | from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \ | |||
UserToRepoBranchPermission, User | ||||
r2457 | from rhodecode.model.integration import IntegrationModel | |||
r2456 | from rhodecode.model.db import Repository | |||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.settings import SettingsModel | ||||
r2457 | from rhodecode.integrations.types.webhook import WebhookIntegrationType | |||
from rhodecode.tests import GIT_REPO, HG_REPO | ||||
r2456 | from rhodecode.tests.fixture import Fixture | |||
r2457 | from rhodecode.tests.server_utils import RcWebServer | |||
r2456 | ||||
REPO_GROUP = 'a_repo_group' | ||||
HG_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, HG_REPO) | ||||
GIT_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, GIT_REPO) | ||||
r4879 | log = logging.getLogger(__name__) | |||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r2457 | def rcextensions(request, db_connection, tmpdir_factory): | |||
r2456 | """ | |||
Installs a testing rcextensions pack to ensure they work as expected. | ||||
""" | ||||
init_content = textwrap.dedent(""" | ||||
# Forward import the example rcextensions to make it | ||||
# active for our tests. | ||||
from rhodecode.tests.other.example_rcextensions import * | ||||
""") | ||||
# Note: rcextensions are looked up based on the path of the ini file | ||||
root_path = tmpdir_factory.getbasetemp() | ||||
rcextensions_path = root_path.join('rcextensions') | ||||
init_path = rcextensions_path.join('__init__.py') | ||||
if rcextensions_path.check(): | ||||
pytest.fail( | ||||
"Path for rcextensions already exists, please clean up before " | ||||
"test run this path: %s" % (rcextensions_path, )) | ||||
return | ||||
request.addfinalizer(rcextensions_path.remove) | ||||
r4992 | init_path.write_binary(safe_bytes(init_content), ensure=True) | |||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r2457 | def repos(request, db_connection): | |||
r2456 | """Create a copy of each test repo in a repo group.""" | |||
fixture = Fixture() | ||||
repo_group = fixture.create_repo_group(REPO_GROUP) | ||||
repo_group_id = repo_group.group_id | ||||
fixture.create_fork(HG_REPO, HG_REPO, | ||||
repo_name_full=HG_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
fixture.create_fork(GIT_REPO, GIT_REPO, | ||||
repo_name_full=GIT_REPO_WITH_GROUP, | ||||
repo_group=repo_group_id) | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
fixture.destroy_repo(HG_REPO_WITH_GROUP) | ||||
fixture.destroy_repo(GIT_REPO_WITH_GROUP) | ||||
fixture.destroy_repo_group(repo_group_id) | ||||
r2457 | @pytest.fixture(scope="module") | |||
def rc_web_server_config_modification(): | ||||
return [] | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
r2457 | def rc_web_server_config_factory(testini_factory, rc_web_server_config_modification): | |||
r2456 | """ | |||
Configuration file used for the fixture `rc_web_server`. | ||||
""" | ||||
r2457 | ||||
r2614 | def factory(rcweb_port, vcsserver_port): | |||
r2457 | custom_params = [ | |||
{'handler_console': {'level': 'DEBUG'}}, | ||||
r2614 | {'server:main': {'port': rcweb_port}}, | |||
r2457 | {'app:main': {'vcs.server': 'localhost:%s' % vcsserver_port}} | |||
] | ||||
custom_params.extend(rc_web_server_config_modification) | ||||
return testini_factory(custom_params) | ||||
return factory | ||||
r2456 | ||||
@pytest.fixture(scope="module") | ||||
def rc_web_server( | ||||
r2457 | request, vcsserver_factory, available_port_factory, | |||
rc_web_server_config_factory, repos, rcextensions): | ||||
r2456 | """ | |||
r2457 | Run the web server as a subprocess. with it's own instance of vcsserver | |||
""" | ||||
r2614 | rcweb_port = available_port_factory() | |||
r4879 | log.info('Using rcweb ops test port {}'.format(rcweb_port)) | |||
r2456 | ||||
r2457 | vcsserver_port = available_port_factory() | |||
r4879 | log.info('Using vcsserver ops test port {}'.format(vcsserver_port)) | |||
r2456 | ||||
r2457 | vcs_log = os.path.join(tempfile.gettempdir(), 'rc_op_vcs.log') | |||
vcsserver_factory( | ||||
request, vcsserver_port=vcsserver_port, | ||||
log_file=vcs_log, | ||||
r2462 | overrides=( | |||
{'server:main': {'workers': 2}}, | ||||
{'server:main': {'graceful_timeout': 10}}, | ||||
)) | ||||
r2456 | ||||
r2457 | rc_log = os.path.join(tempfile.gettempdir(), 'rc_op_web.log') | |||
rc_web_server_config = rc_web_server_config_factory( | ||||
r2614 | rcweb_port=rcweb_port, | |||
r2457 | vcsserver_port=vcsserver_port) | |||
server = RcWebServer(rc_web_server_config, log_file=rc_log) | ||||
server.start() | ||||
r2456 | ||||
@request.addfinalizer | ||||
r2457 | def cleanup(): | |||
server.shutdown() | ||||
r2456 | ||||
r2457 | server.wait_until_ready() | |||
return server | ||||
r2456 | ||||
r3946 | @pytest.fixture() | |||
r2456 | def disable_locking(baseapp): | |||
r = Repository.get_by_repo_name(GIT_REPO) | ||||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
r3946 | @pytest.fixture() | |||
r2456 | def enable_auth_plugins(request, baseapp, csrf_token): | |||
""" | ||||
Return a factory object that when called, allows to control which | ||||
authentication plugins are enabled. | ||||
""" | ||||
def _enable_plugins(plugins_list, override=None): | ||||
override = override or {} | ||||
params = { | ||||
'auth_plugins': ','.join(plugins_list), | ||||
} | ||||
# helper translate some names to others | ||||
name_map = { | ||||
'token': 'authtoken' | ||||
} | ||||
for module in plugins_list: | ||||
plugin_name = module.partition('#')[-1] | ||||
if plugin_name in name_map: | ||||
plugin_name = name_map[plugin_name] | ||||
enabled_plugin = 'auth_%s_enabled' % plugin_name | ||||
cache_ttl = 'auth_%s_cache_ttl' % plugin_name | ||||
# default params that are needed for each plugin, | ||||
# `enabled` and `cache_ttl` | ||||
params.update({ | ||||
enabled_plugin: True, | ||||
cache_ttl: 0 | ||||
}) | ||||
if override.get: | ||||
params.update(override.get(module, {})) | ||||
validated_params = params | ||||
for k, v in validated_params.items(): | ||||
setting = SettingsModel().create_or_update_setting(k, v) | ||||
Session().add(setting) | ||||
Session().commit() | ||||
r4220 | SettingsModel().invalidate_settings_cache() | |||
r2456 | def cleanup(): | |||
_enable_plugins(['egg:rhodecode-enterprise-ce#rhodecode']) | ||||
request.addfinalizer(cleanup) | ||||
return _enable_plugins | ||||
r3946 | @pytest.fixture() | |||
r2456 | def fs_repo_only(request, rhodecode_fixtures): | |||
def fs_repo_fabric(repo_name, repo_type): | ||||
rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type) | ||||
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False) | ||||
def cleanup(): | ||||
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True) | ||||
rhodecode_fixtures.destroy_repo_on_filesystem(repo_name) | ||||
request.addfinalizer(cleanup) | ||||
return fs_repo_fabric | ||||
r2457 | ||||
r3946 | @pytest.fixture() | |||
r2457 | def enable_webhook_push_integration(request): | |||
integration = Integration() | ||||
integration.integration_type = WebhookIntegrationType.key | ||||
Session().add(integration) | ||||
settings = dict( | ||||
r2832 | url='http://httpbin.org/post', | |||
r2457 | secret_token='secret', | |||
username=None, | ||||
password=None, | ||||
custom_header_key=None, | ||||
custom_header_val=None, | ||||
r2832 | method_type='post', | |||
r2457 | events=[events.RepoPushEvent.name], | |||
log_data=True | ||||
) | ||||
IntegrationModel().update_integration( | ||||
integration, | ||||
name='IntegrationWebhookTest', | ||||
enabled=True, | ||||
settings=settings, | ||||
repo=None, | ||||
repo_group=None, | ||||
child_repos_only=False, | ||||
) | ||||
Session().commit() | ||||
integration_id = integration.integration_id | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
integration = Integration.get(integration_id) | ||||
Session().delete(integration) | ||||
Session().commit() | ||||
r2979 | ||||
r3946 | @pytest.fixture() | |||
r2979 | def branch_permission_setter(request): | |||
""" | ||||
def my_test(branch_permission_setter) | ||||
branch_permission_setter(repo_name, username, pattern='*', permission='branch.push') | ||||
""" | ||||
rule_id = None | ||||
write_perm_id = None | ||||
r4009 | write_perm = None | |||
rule = None | ||||
r2979 | ||||
def _branch_permissions_setter( | ||||
repo_name, username, pattern='*', permission='branch.push_force'): | ||||
global rule_id, write_perm_id | ||||
r4009 | global rule, write_perm | |||
r2979 | ||||
repo = Repository.get_by_repo_name(repo_name) | ||||
repo_id = repo.repo_id | ||||
user = User.get_by_username(username) | ||||
user_id = user.user_id | ||||
rule_perm_obj = Permission.get_by_key(permission) | ||||
# add new entry, based on existing perm entry | ||||
perm = UserRepoToPerm.query() \ | ||||
.filter(UserRepoToPerm.repository_id == repo_id) \ | ||||
.filter(UserRepoToPerm.user_id == user_id) \ | ||||
.first() | ||||
if not perm: | ||||
# such user isn't defined in Permissions for repository | ||||
# we now on-the-fly add new permission | ||||
write_perm = UserRepoToPerm() | ||||
write_perm.permission = Permission.get_by_key('repository.write') | ||||
write_perm.repository_id = repo_id | ||||
write_perm.user_id = user_id | ||||
Session().add(write_perm) | ||||
Session().flush() | ||||
perm = write_perm | ||||
rule = UserToRepoBranchPermission() | ||||
rule.rule_to_perm_id = perm.repo_to_perm_id | ||||
rule.branch_pattern = pattern | ||||
rule.rule_order = 10 | ||||
rule.permission = rule_perm_obj | ||||
rule.repository_id = repo_id | ||||
Session().add(rule) | ||||
Session().commit() | ||||
return rule | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
if rule: | ||||
Session().delete(rule) | ||||
Session().commit() | ||||
if write_perm: | ||||
Session().delete(write_perm) | ||||
Session().commit() | ||||
return _branch_permissions_setter | ||||