|
|
# Copyright (C) 2010-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
py.test config for test suite for making push/pull operations.
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
|
|
|
import pyramid.paster
|
|
|
import pytest
|
|
|
import logging
|
|
|
import requests
|
|
|
|
|
|
from rhodecode import events
|
|
|
from rhodecode.lib.type_utils import AttributeDict
|
|
|
from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \
|
|
|
UserToRepoBranchPermission, User
|
|
|
from rhodecode.model.integration import IntegrationModel
|
|
|
from rhodecode.model.db import Repository
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.integrations.types.webhook import WebhookIntegrationType
|
|
|
|
|
|
|
|
|
from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
|
|
|
from rhodecode.tests.conftest import HTTPBIN_DOMAIN, HTTPBIN_POST
|
|
|
from rhodecode.tests.fixtures.rc_fixture import Fixture
|
|
|
from rhodecode.tests.fixtures.fixture_utils import backend_base
|
|
|
from rhodecode.tests.utils import set_anonymous_access, AuthPluginManager
|
|
|
from rhodecode.tests import console_printer
|
|
|
|
|
|
REPO_GROUP = 'a_repo_group'
|
|
|
HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
|
|
|
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
|
|
|
SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
def check_httpbin_connection():
|
|
|
log.debug('Checking if HTTPBIN_DOMAIN: %s is available', HTTPBIN_DOMAIN)
|
|
|
try:
|
|
|
response = requests.get(HTTPBIN_DOMAIN, timeout=5)
|
|
|
return response.status_code == 200
|
|
|
except Exception as e:
|
|
|
console_printer(e)
|
|
|
|
|
|
return False
|
|
|
|
|
|
#overrides backend_N with init_pyramid_app instead of baseapp
|
|
|
@pytest.fixture()
|
|
|
def vcs_backend_git(request, init_pyramid_app, test_repo):
|
|
|
return backend_base(request, 'git', test_repo)
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def vcs_backend_hg(request, init_pyramid_app, test_repo):
|
|
|
return backend_base(request, 'hg', test_repo)
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def vcs_backend_svn(request, init_pyramid_app, test_repo):
|
|
|
return backend_base(request, 'svn', test_repo)
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
def tmp_storage_location(request, tmpdir_factory):
|
|
|
"""
|
|
|
Defines a module level storage_location, used mostly to define per-test persistent repo storage
|
|
|
shared across vcsserver, rhodecode and celery
|
|
|
"""
|
|
|
|
|
|
dest = tmpdir_factory.mktemp('tmp_storage_location_', numbered=True)
|
|
|
log.info("Creating test TMP directory at %s", dest)
|
|
|
return dest
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
def repo_group_repos(request):
|
|
|
"""Create a copy of each test repo in a repo group."""
|
|
|
|
|
|
fixture = Fixture()
|
|
|
repo_group = fixture.create_repo_group(REPO_GROUP)
|
|
|
repo_group_id = repo_group.group_id
|
|
|
fixture.create_fork(HG_REPO, HG_REPO,
|
|
|
repo_name_full=HG_REPO_WITH_GROUP,
|
|
|
repo_group=repo_group_id)
|
|
|
fixture.create_fork(GIT_REPO, GIT_REPO,
|
|
|
repo_name_full=GIT_REPO_WITH_GROUP,
|
|
|
repo_group=repo_group_id)
|
|
|
fixture.create_fork(SVN_REPO, SVN_REPO,
|
|
|
repo_name_full=SVN_REPO_WITH_GROUP,
|
|
|
repo_group=repo_group_id)
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
fixture.destroy_repo(HG_REPO_WITH_GROUP)
|
|
|
fixture.destroy_repo(GIT_REPO_WITH_GROUP)
|
|
|
fixture.destroy_repo(SVN_REPO_WITH_GROUP)
|
|
|
fixture.destroy_repo_group(repo_group_id)
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
|
def rcstack_vcsserver_factory(vcsserver_factory):
|
|
|
return vcsserver_factory
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
|
def rcstack_celery_factory(celery_factory):
|
|
|
return celery_factory
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
|
def rcstack_rhodecode_factory(rhodecode_factory):
|
|
|
return rhodecode_factory
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
|
def init_pyramid_app(request, available_port_factory, ini_config_factory, rcstack_vcsserver_factory, tmp_storage_location):
|
|
|
from rhodecode.lib.config_utils import get_app_config
|
|
|
from rhodecode.config.middleware import make_pyramid_app
|
|
|
|
|
|
store_dir = tmp_storage_location
|
|
|
port = available_port_factory()
|
|
|
rcstack_vcsserver_factory(
|
|
|
request,
|
|
|
store_dir=store_dir,
|
|
|
port=port,
|
|
|
info_prefix='init-app-'
|
|
|
)
|
|
|
|
|
|
app_ini_config = ini_config_factory(store_dir)
|
|
|
|
|
|
pyramid.paster.setup_logging(app_ini_config)
|
|
|
|
|
|
settings = get_app_config(app_ini_config)
|
|
|
settings['startup.import_repos'] = True
|
|
|
settings['vcs.server'] = f'localhost:{port}'
|
|
|
settings['repo_store.path'] = str(store_dir)
|
|
|
pyramid_app = make_pyramid_app({'__file__': app_ini_config}, **settings)
|
|
|
|
|
|
return pyramid_app
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
|
def rcstack(request, tmp_storage_location, rcextensions, available_port_factory, rcstack_vcsserver_factory, rcstack_celery_factory, rcstack_rhodecode_factory):
|
|
|
"""
|
|
|
Runs minimal rcstack, i.e vcsserver, celery, rhodecode unpacks rcextensions and repos to a shared location
|
|
|
"""
|
|
|
rcstack_data = AttributeDict()
|
|
|
store_dir = tmp_storage_location
|
|
|
|
|
|
vcsserver_port: int = available_port_factory()
|
|
|
vcsserver_log = os.path.join(tmp_storage_location, 'vcsserver.log')
|
|
|
|
|
|
log.info('Using vcsserver test port %s and log %s', vcsserver_port, vcsserver_log) # start vcsserver
|
|
|
_factory = rcstack_vcsserver_factory(
|
|
|
request,
|
|
|
store_dir=store_dir,
|
|
|
port=vcsserver_port,
|
|
|
log_file=vcsserver_log,
|
|
|
overrides=(
|
|
|
{'handler_console': {'level': 'DEBUG'}},
|
|
|
))
|
|
|
rcstack_data.vcsserver_port = vcsserver_port
|
|
|
rcstack_data.vcsserver_log = _factory.log_file
|
|
|
|
|
|
|
|
|
celery_log = os.path.join(tmp_storage_location, 'celery.log')
|
|
|
|
|
|
|
|
|
log.info('Using celery log %s', celery_log)
|
|
|
# start celery
|
|
|
_factory = rcstack_celery_factory(
|
|
|
request,
|
|
|
store_dir=store_dir,
|
|
|
port=None,
|
|
|
log_file=celery_log,
|
|
|
overrides=(
|
|
|
{'handler_console': {'level': 'DEBUG'}},
|
|
|
{'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}},
|
|
|
{'app:main': {'repo_store.path': store_dir}}
|
|
|
))
|
|
|
|
|
|
rcstack_data.celery_log = _factory.log_file
|
|
|
|
|
|
rhodecode_port: int = available_port_factory()
|
|
|
rhodecode_log = os.path.join(tmp_storage_location, 'rhodecode.log')
|
|
|
|
|
|
|
|
|
log.info('Using rhodecode test port %s and log %s', rhodecode_port, rhodecode_port)
|
|
|
|
|
|
# start rhodecode
|
|
|
rc = rcstack_rhodecode_factory(
|
|
|
request,
|
|
|
store_dir=store_dir,
|
|
|
port=rhodecode_port,
|
|
|
log_file=rhodecode_log,
|
|
|
overrides=(
|
|
|
{'handler_console': {'level': 'DEBUG'}},
|
|
|
{'app:main': {'vcs.server': f'localhost:{vcsserver_port}'}},
|
|
|
{'app:main': {'repo_store.path': store_dir}}
|
|
|
))
|
|
|
|
|
|
rcstack_data.rhodecode_port = rhodecode_port
|
|
|
rcstack_data.rhodecode_log = rc.log_file
|
|
|
|
|
|
rc.rcstack_data = rcstack_data
|
|
|
return rc
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def disable_locking(init_pyramid_app):
|
|
|
r = Repository.get_by_repo_name(GIT_REPO)
|
|
|
Repository.unlock(r)
|
|
|
r.enable_locking = False
|
|
|
Session().add(r)
|
|
|
Session().commit()
|
|
|
|
|
|
r = Repository.get_by_repo_name(HG_REPO)
|
|
|
Repository.unlock(r)
|
|
|
r.enable_locking = False
|
|
|
Session().add(r)
|
|
|
Session().commit()
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def disable_anonymous_user(request, init_pyramid_app, db_connection):
|
|
|
set_anonymous_access(False)
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
set_anonymous_access(True)
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
|
def enable_auth_plugins(request, init_pyramid_app):
|
|
|
"""
|
|
|
Return a factory object that when called, allows to control which
|
|
|
authentication plugins are enabled.
|
|
|
"""
|
|
|
|
|
|
enabler = AuthPluginManager()
|
|
|
request.addfinalizer(enabler.cleanup)
|
|
|
|
|
|
return enabler
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def fs_repo_only(request, rhodecode_fixtures):
|
|
|
def fs_repo_fabric(repo_name, repo_type):
|
|
|
rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type)
|
|
|
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False)
|
|
|
|
|
|
def cleanup():
|
|
|
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True)
|
|
|
rhodecode_fixtures.destroy_repo_on_filesystem(repo_name)
|
|
|
|
|
|
request.addfinalizer(cleanup)
|
|
|
|
|
|
return fs_repo_fabric
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def enable_webhook_push_integration(request):
|
|
|
integration = Integration()
|
|
|
integration.integration_type = WebhookIntegrationType.key
|
|
|
Session().add(integration)
|
|
|
|
|
|
settings = dict(
|
|
|
url=HTTPBIN_POST,
|
|
|
secret_token='secret',
|
|
|
username=None,
|
|
|
password=None,
|
|
|
custom_header_key=None,
|
|
|
custom_header_val=None,
|
|
|
method_type='post',
|
|
|
events=[events.RepoPushEvent.name],
|
|
|
log_data=True
|
|
|
)
|
|
|
|
|
|
IntegrationModel().update_integration(
|
|
|
integration,
|
|
|
name='IntegrationWebhookTest',
|
|
|
enabled=True,
|
|
|
settings=settings,
|
|
|
repo=None,
|
|
|
repo_group=None,
|
|
|
child_repos_only=False,
|
|
|
)
|
|
|
Session().commit()
|
|
|
integration_id = integration.integration_id
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
integration = Integration.get(integration_id)
|
|
|
Session().delete(integration)
|
|
|
Session().commit()
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def branch_permission_setter(request):
|
|
|
"""
|
|
|
|
|
|
def my_test(branch_permission_setter)
|
|
|
branch_permission_setter(repo_name, username, pattern='*', permission='branch.push')
|
|
|
|
|
|
"""
|
|
|
|
|
|
rule_id = None
|
|
|
write_perm_id = None
|
|
|
write_perm = None
|
|
|
rule = None
|
|
|
|
|
|
def _branch_permissions_setter(
|
|
|
repo_name, username, pattern='*', permission='branch.push_force'):
|
|
|
global rule_id, write_perm_id
|
|
|
global rule, write_perm
|
|
|
|
|
|
repo = Repository.get_by_repo_name(repo_name)
|
|
|
repo_id = repo.repo_id
|
|
|
|
|
|
user = User.get_by_username(username)
|
|
|
user_id = user.user_id
|
|
|
|
|
|
rule_perm_obj = Permission.get_by_key(permission)
|
|
|
|
|
|
# add new entry, based on existing perm entry
|
|
|
perm = UserRepoToPerm.query() \
|
|
|
.filter(UserRepoToPerm.repository_id == repo_id) \
|
|
|
.filter(UserRepoToPerm.user_id == user_id) \
|
|
|
.first()
|
|
|
|
|
|
if not perm:
|
|
|
# such user isn't defined in Permissions for repository
|
|
|
# we now on-the-fly add new permission
|
|
|
|
|
|
write_perm = UserRepoToPerm()
|
|
|
write_perm.permission = Permission.get_by_key('repository.write')
|
|
|
write_perm.repository_id = repo_id
|
|
|
write_perm.user_id = user_id
|
|
|
Session().add(write_perm)
|
|
|
Session().flush()
|
|
|
|
|
|
perm = write_perm
|
|
|
|
|
|
rule = UserToRepoBranchPermission()
|
|
|
rule.rule_to_perm_id = perm.repo_to_perm_id
|
|
|
rule.branch_pattern = pattern
|
|
|
rule.rule_order = 10
|
|
|
rule.permission = rule_perm_obj
|
|
|
rule.repository_id = repo_id
|
|
|
Session().add(rule)
|
|
|
Session().commit()
|
|
|
|
|
|
return rule
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
if rule:
|
|
|
Session().delete(rule)
|
|
|
Session().commit()
|
|
|
if write_perm:
|
|
|
Session().delete(write_perm)
|
|
|
Session().commit()
|
|
|
|
|
|
return _branch_permissions_setter
|
|
|
|