##// END OF EJS Templates
deps: bumped psycopg2==2.10.0 for python 3.13 compat
deps: bumped psycopg2==2.10.0 for python 3.13 compat

File last commit:

r5607:39b20522 default
r5621:c6f79eaa default
Show More
conftest.py
382 lines | 11.6 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
py.test config for test suite for making push/pull operations.
"""
import os
import pyramid.paster
import pytest
import logging
import requests
from rhodecode import events
from rhodecode.lib.type_utils import AttributeDict
from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \
UserToRepoBranchPermission, User
from rhodecode.model.integration import IntegrationModel
from rhodecode.model.db import Repository
from rhodecode.model.meta import Session
from rhodecode.integrations.types.webhook import WebhookIntegrationType
from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
from rhodecode.tests.conftest import HTTPBIN_DOMAIN, HTTPBIN_POST
from rhodecode.tests.fixtures.rc_fixture import Fixture
from rhodecode.tests.fixtures.fixture_utils import backend_base
from rhodecode.tests.utils import set_anonymous_access, AuthPluginManager
from rhodecode.tests import console_printer
REPO_GROUP = 'a_repo_group'
HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
log = logging.getLogger(__name__)
def check_httpbin_connection():
log.debug('Checking if HTTPBIN_DOMAIN: %s is available', HTTPBIN_DOMAIN)
try:
response = requests.get(HTTPBIN_DOMAIN, timeout=5)
return response.status_code == 200
except Exception as e:
console_printer(e)
return False
#overrides backend_N with init_pyramid_app instead of baseapp
@pytest.fixture()
def vcs_backend_git(request, init_pyramid_app, test_repo):
return backend_base(request, 'git', test_repo)
@pytest.fixture()
def vcs_backend_hg(request, init_pyramid_app, test_repo):
return backend_base(request, 'hg', test_repo)
@pytest.fixture()
def vcs_backend_svn(request, init_pyramid_app, test_repo):
return backend_base(request, 'svn', test_repo)
@pytest.fixture(scope="module")
def tmp_storage_location(request, tmpdir_factory):
"""
Defines a module level storage_location, used mostly to define per-test persistent repo storage
shared across vcsserver, rhodecode and celery
"""
dest = tmpdir_factory.mktemp('tmp_storage_location_', numbered=True)
log.info("Creating test TMP directory at %s", dest)
return dest
@pytest.fixture(scope="module")
def repo_group_repos(request):
"""Create a copy of each test repo in a repo group."""
fixture = Fixture()
repo_group = fixture.create_repo_group(REPO_GROUP)
repo_group_id = repo_group.group_id
fixture.create_fork(HG_REPO, HG_REPO,
repo_name_full=HG_REPO_WITH_GROUP,
repo_group=repo_group_id)
fixture.create_fork(GIT_REPO, GIT_REPO,
repo_name_full=GIT_REPO_WITH_GROUP,
repo_group=repo_group_id)
fixture.create_fork(SVN_REPO, SVN_REPO,
repo_name_full=SVN_REPO_WITH_GROUP,
repo_group=repo_group_id)
@request.addfinalizer
def cleanup():
fixture.destroy_repo(HG_REPO_WITH_GROUP)
fixture.destroy_repo(GIT_REPO_WITH_GROUP)
fixture.destroy_repo(SVN_REPO_WITH_GROUP)
fixture.destroy_repo_group(repo_group_id)
@pytest.fixture(scope='module')
def rcstack_vcsserver_factory(vcsserver_factory):
return vcsserver_factory
@pytest.fixture(scope='module')
def rcstack_celery_factory(celery_factory):
return celery_factory
@pytest.fixture(scope='module')
def rcstack_rhodecode_factory(rhodecode_factory):
return rhodecode_factory
@pytest.fixture(scope='module')
def init_pyramid_app(request, available_port_factory, ini_config_factory, rcstack_vcsserver_factory, tmp_storage_location):
from rhodecode.lib.config_utils import get_app_config
from rhodecode.config.middleware import make_pyramid_app
store_dir = tmp_storage_location
port = available_port_factory()
rcstack_vcsserver_factory(
request,
store_dir=store_dir,
port=port,
info_prefix='init-app-'
)
app_ini_config = ini_config_factory(store_dir)
pyramid.paster.setup_logging(app_ini_config)
settings = get_app_config(app_ini_config)
settings['startup.import_repos'] = True
settings['vcs.server'] = f'localhost:{port}'
settings['repo_store.path'] = str(store_dir)
pyramid_app = make_pyramid_app({'__file__': app_ini_config}, **settings)
return pyramid_app
@pytest.fixture(scope='module')
def rcstack(request, tmp_storage_location, rcextensions, available_port_factory, rcstack_vcsserver_factory, rcstack_celery_factory, rcstack_rhodecode_factory):
"""
Runs minimal rcstack, i.e vcsserver, celery, rhodecode unpacks rcextensions and repos to a shared location
"""
rcstack_data = AttributeDict()
store_dir = tmp_storage_location
vcsserver_port: int = available_port_factory()
vcsserver_log = os.path.join(tmp_storage_location, 'vcsserver.log')
log.info('Using vcsserver test port %s and log %s', vcsserver_port, vcsserver_log) # start vcsserver
_factory = rcstack_vcsserver_factory(
request,
store_dir=store_dir,
port=vcsserver_port,
log_file=vcsserver_log,
overrides=(
{'handler_console': {'level': 'DEBUG'}},
))
rcstack_data.vcsserver_port = vcsserver_port
rcstack_data.vcsserver_log = _factory.log_file
celery_log = os.path.join(tmp_storage_location, 'celery.log')
log.info('Using celery log %s', celery_log)
# start celery
_factory = rcstack_celery_factory(
request,
store_dir=store_dir,
port=None,
log_file=celery_log,
overrides=(
{'handler_console': {'level': 'DEBUG'}},
{'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}},
{'app:main': {'repo_store.path': store_dir}}
))
rcstack_data.celery_log = _factory.log_file
rhodecode_port: int = available_port_factory()
rhodecode_log = os.path.join(tmp_storage_location, 'rhodecode.log')
log.info('Using rhodecode test port %s and log %s', rhodecode_port, rhodecode_port)
# start rhodecode
rc = rcstack_rhodecode_factory(
request,
store_dir=store_dir,
port=rhodecode_port,
log_file=rhodecode_log,
overrides=(
{'handler_console': {'level': 'DEBUG'}},
{'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}},
{'app:main': {'repo_store.path': store_dir}}
))
rcstack_data.rhodecode_port = rhodecode_port
rcstack_data.rhodecode_log = rc.log_file
rc.rcstack_data = rcstack_data
return rc
@pytest.fixture()
def disable_locking(init_pyramid_app):
r = Repository.get_by_repo_name(GIT_REPO)
Repository.unlock(r)
r.enable_locking = False
Session().add(r)
Session().commit()
r = Repository.get_by_repo_name(HG_REPO)
Repository.unlock(r)
r.enable_locking = False
Session().add(r)
Session().commit()
@pytest.fixture()
def disable_anonymous_user(request, init_pyramid_app, db_connection):
set_anonymous_access(False)
@request.addfinalizer
def cleanup():
set_anonymous_access(True)
@pytest.fixture(scope='module')
def enable_auth_plugins(request, init_pyramid_app):
"""
Return a factory object that when called, allows to control which
authentication plugins are enabled.
"""
enabler = AuthPluginManager()
request.addfinalizer(enabler.cleanup)
return enabler
@pytest.fixture()
def fs_repo_only(request, rhodecode_fixtures):
def fs_repo_fabric(repo_name, repo_type):
rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type)
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False)
def cleanup():
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True)
rhodecode_fixtures.destroy_repo_on_filesystem(repo_name)
request.addfinalizer(cleanup)
return fs_repo_fabric
@pytest.fixture()
def enable_webhook_push_integration(request):
integration = Integration()
integration.integration_type = WebhookIntegrationType.key
Session().add(integration)
settings = dict(
url=HTTPBIN_POST,
secret_token='secret',
username=None,
password=None,
custom_header_key=None,
custom_header_val=None,
method_type='post',
events=[events.RepoPushEvent.name],
log_data=True
)
IntegrationModel().update_integration(
integration,
name='IntegrationWebhookTest',
enabled=True,
settings=settings,
repo=None,
repo_group=None,
child_repos_only=False,
)
Session().commit()
integration_id = integration.integration_id
@request.addfinalizer
def cleanup():
integration = Integration.get(integration_id)
Session().delete(integration)
Session().commit()
@pytest.fixture()
def branch_permission_setter(request):
"""
def my_test(branch_permission_setter)
branch_permission_setter(repo_name, username, pattern='*', permission='branch.push')
"""
rule_id = None
write_perm_id = None
write_perm = None
rule = None
def _branch_permissions_setter(
repo_name, username, pattern='*', permission='branch.push_force'):
global rule_id, write_perm_id
global rule, write_perm
repo = Repository.get_by_repo_name(repo_name)
repo_id = repo.repo_id
user = User.get_by_username(username)
user_id = user.user_id
rule_perm_obj = Permission.get_by_key(permission)
# add new entry, based on existing perm entry
perm = UserRepoToPerm.query() \
.filter(UserRepoToPerm.repository_id == repo_id) \
.filter(UserRepoToPerm.user_id == user_id) \
.first()
if not perm:
# such user isn't defined in Permissions for repository
# we now on-the-fly add new permission
write_perm = UserRepoToPerm()
write_perm.permission = Permission.get_by_key('repository.write')
write_perm.repository_id = repo_id
write_perm.user_id = user_id
Session().add(write_perm)
Session().flush()
perm = write_perm
rule = UserToRepoBranchPermission()
rule.rule_to_perm_id = perm.repo_to_perm_id
rule.branch_pattern = pattern
rule.rule_order = 10
rule.permission = rule_perm_obj
rule.repository_id = repo_id
Session().add(rule)
Session().commit()
return rule
@request.addfinalizer
def cleanup():
if rule:
Session().delete(rule)
Session().commit()
if write_perm:
Session().delete(write_perm)
Session().commit()
return _branch_permissions_setter