|
|
|
|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
py.test config for test suite for making push/pull operations.
|
|
|
|
|
|
.. important::
|
|
|
|
|
|
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
|
|
|
to redirect things to stderr instead of stdout.
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import tempfile
|
|
|
import textwrap
|
|
|
import pytest
|
|
|
import logging
|
|
|
import requests
|
|
|
|
|
|
from rhodecode import events
|
|
|
from rhodecode.lib.str_utils import safe_bytes
|
|
|
from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \
|
|
|
UserToRepoBranchPermission, User
|
|
|
from rhodecode.model.integration import IntegrationModel
|
|
|
from rhodecode.model.db import Repository
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.integrations.types.webhook import WebhookIntegrationType
|
|
|
|
|
|
from rhodecode.tests import GIT_REPO, HG_REPO
|
|
|
from rhodecode.tests.conftest import HTTPBIN_DOMAIN, HTTPBIN_POST
|
|
|
from rhodecode.tests.fixture import Fixture
|
|
|
from rhodecode.tests.server_utils import RcWebServer
|
|
|
|
|
|
|
|
|
REPO_GROUP = 'a_repo_group'
|
|
|
HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
|
|
|
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
def check_httpbin_connection():
|
|
|
try:
|
|
|
response = requests.get(HTTPBIN_DOMAIN)
|
|
|
return response.status_code == 200
|
|
|
except Exception as e:
|
|
|
print(e)
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
def rcextensions(request, db_connection, tmpdir_factory):
|
|
|
"""
|
|
|
Installs a testing rcextensions pack to ensure they work as expected.
|
|
|
"""
|
|
|
init_content = textwrap.dedent("""
|
|
|
# Forward import the example rcextensions to make it
|
|
|
# active for our tests.
|
|
|
from rhodecode.tests.other.example_rcextensions import *
|
|
|
""")
|
|
|
|
|
|
# Note: rcextensions are looked up based on the path of the ini file
|
|
|
root_path = tmpdir_factory.getbasetemp()
|
|
|
rcextensions_path = root_path.join('rcextensions')
|
|
|
init_path = rcextensions_path.join('__init__.py')
|
|
|
|
|
|
if rcextensions_path.check():
|
|
|
pytest.fail(
|
|
|
"Path for rcextensions already exists, please clean up before "
|
|
|
"test run this path: %s" % (rcextensions_path, ))
|
|
|
else:
|
|
|
request.addfinalizer(rcextensions_path.remove)
|
|
|
init_path.write_binary(safe_bytes(init_content), ensure=True)
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
def repos(request, db_connection):
|
|
|
"""Create a copy of each test repo in a repo group."""
|
|
|
fixture = Fixture()
|
|
|
repo_group = fixture.create_repo_group(REPO_GROUP)
|
|
|
repo_group_id = repo_group.group_id
|
|
|
fixture.create_fork(HG_REPO, HG_REPO,
|
|
|
repo_name_full=HG_REPO_WITH_GROUP,
|
|
|
repo_group=repo_group_id)
|
|
|
fixture.create_fork(GIT_REPO, GIT_REPO,
|
|
|
repo_name_full=GIT_REPO_WITH_GROUP,
|
|
|
repo_group=repo_group_id)
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
fixture.destroy_repo(HG_REPO_WITH_GROUP)
|
|
|
fixture.destroy_repo(GIT_REPO_WITH_GROUP)
|
|
|
fixture.destroy_repo_group(repo_group_id)
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
def rc_web_server_config_modification():
|
|
|
return []
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
def rc_web_server_config_factory(testini_factory, rc_web_server_config_modification):
|
|
|
"""
|
|
|
Configuration file used for the fixture `rc_web_server`.
|
|
|
"""
|
|
|
|
|
|
def factory(rcweb_port, vcsserver_port):
|
|
|
custom_params = [
|
|
|
{'handler_console': {'level': 'DEBUG'}},
|
|
|
{'server:main': {'port': rcweb_port}},
|
|
|
{'app:main': {'vcs.server': 'localhost:%s' % vcsserver_port}}
|
|
|
]
|
|
|
custom_params.extend(rc_web_server_config_modification)
|
|
|
return testini_factory(custom_params)
|
|
|
return factory
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
def rc_web_server(
|
|
|
request, vcsserver_factory, available_port_factory,
|
|
|
rc_web_server_config_factory, repos, rcextensions):
|
|
|
"""
|
|
|
Run the web server as a subprocess. with its own instance of vcsserver
|
|
|
"""
|
|
|
rcweb_port = available_port_factory()
|
|
|
log.info('Using rcweb ops test port {}'.format(rcweb_port))
|
|
|
|
|
|
vcsserver_port = available_port_factory()
|
|
|
log.info('Using vcsserver ops test port {}'.format(vcsserver_port))
|
|
|
|
|
|
vcs_log = os.path.join(tempfile.gettempdir(), 'rc_op_vcs.log')
|
|
|
vcsserver_factory(
|
|
|
request, vcsserver_port=vcsserver_port,
|
|
|
log_file=vcs_log,
|
|
|
overrides=(
|
|
|
{'server:main': {'workers': 2}},
|
|
|
{'server:main': {'graceful_timeout': 10}},
|
|
|
))
|
|
|
|
|
|
rc_log = os.path.join(tempfile.gettempdir(), 'rc_op_web.log')
|
|
|
rc_web_server_config = rc_web_server_config_factory(
|
|
|
rcweb_port=rcweb_port,
|
|
|
vcsserver_port=vcsserver_port)
|
|
|
server = RcWebServer(rc_web_server_config, log_file=rc_log)
|
|
|
server.start()
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
server.shutdown()
|
|
|
|
|
|
server.wait_until_ready()
|
|
|
return server
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def disable_locking(baseapp):
|
|
|
r = Repository.get_by_repo_name(GIT_REPO)
|
|
|
Repository.unlock(r)
|
|
|
r.enable_locking = False
|
|
|
Session().add(r)
|
|
|
Session().commit()
|
|
|
|
|
|
r = Repository.get_by_repo_name(HG_REPO)
|
|
|
Repository.unlock(r)
|
|
|
r.enable_locking = False
|
|
|
Session().add(r)
|
|
|
Session().commit()
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def fs_repo_only(request, rhodecode_fixtures):
|
|
|
def fs_repo_fabric(repo_name, repo_type):
|
|
|
rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type)
|
|
|
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False)
|
|
|
|
|
|
def cleanup():
|
|
|
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True)
|
|
|
rhodecode_fixtures.destroy_repo_on_filesystem(repo_name)
|
|
|
|
|
|
request.addfinalizer(cleanup)
|
|
|
|
|
|
return fs_repo_fabric
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def enable_webhook_push_integration(request):
|
|
|
integration = Integration()
|
|
|
integration.integration_type = WebhookIntegrationType.key
|
|
|
Session().add(integration)
|
|
|
|
|
|
settings = dict(
|
|
|
url=HTTPBIN_POST,
|
|
|
secret_token='secret',
|
|
|
username=None,
|
|
|
password=None,
|
|
|
custom_header_key=None,
|
|
|
custom_header_val=None,
|
|
|
method_type='post',
|
|
|
events=[events.RepoPushEvent.name],
|
|
|
log_data=True
|
|
|
)
|
|
|
|
|
|
IntegrationModel().update_integration(
|
|
|
integration,
|
|
|
name='IntegrationWebhookTest',
|
|
|
enabled=True,
|
|
|
settings=settings,
|
|
|
repo=None,
|
|
|
repo_group=None,
|
|
|
child_repos_only=False,
|
|
|
)
|
|
|
Session().commit()
|
|
|
integration_id = integration.integration_id
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
integration = Integration.get(integration_id)
|
|
|
Session().delete(integration)
|
|
|
Session().commit()
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def branch_permission_setter(request):
|
|
|
"""
|
|
|
|
|
|
def my_test(branch_permission_setter)
|
|
|
branch_permission_setter(repo_name, username, pattern='*', permission='branch.push')
|
|
|
|
|
|
"""
|
|
|
|
|
|
rule_id = None
|
|
|
write_perm_id = None
|
|
|
write_perm = None
|
|
|
rule = None
|
|
|
|
|
|
def _branch_permissions_setter(
|
|
|
repo_name, username, pattern='*', permission='branch.push_force'):
|
|
|
global rule_id, write_perm_id
|
|
|
global rule, write_perm
|
|
|
|
|
|
repo = Repository.get_by_repo_name(repo_name)
|
|
|
repo_id = repo.repo_id
|
|
|
|
|
|
user = User.get_by_username(username)
|
|
|
user_id = user.user_id
|
|
|
|
|
|
rule_perm_obj = Permission.get_by_key(permission)
|
|
|
|
|
|
# add new entry, based on existing perm entry
|
|
|
perm = UserRepoToPerm.query() \
|
|
|
.filter(UserRepoToPerm.repository_id == repo_id) \
|
|
|
.filter(UserRepoToPerm.user_id == user_id) \
|
|
|
.first()
|
|
|
|
|
|
if not perm:
|
|
|
# such user isn't defined in Permissions for repository
|
|
|
# we now on-the-fly add new permission
|
|
|
|
|
|
write_perm = UserRepoToPerm()
|
|
|
write_perm.permission = Permission.get_by_key('repository.write')
|
|
|
write_perm.repository_id = repo_id
|
|
|
write_perm.user_id = user_id
|
|
|
Session().add(write_perm)
|
|
|
Session().flush()
|
|
|
|
|
|
perm = write_perm
|
|
|
|
|
|
rule = UserToRepoBranchPermission()
|
|
|
rule.rule_to_perm_id = perm.repo_to_perm_id
|
|
|
rule.branch_pattern = pattern
|
|
|
rule.rule_order = 10
|
|
|
rule.permission = rule_perm_obj
|
|
|
rule.repository_id = repo_id
|
|
|
Session().add(rule)
|
|
|
Session().commit()
|
|
|
|
|
|
return rule
|
|
|
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
if rule:
|
|
|
Session().delete(rule)
|
|
|
Session().commit()
|
|
|
if write_perm:
|
|
|
Session().delete(write_perm)
|
|
|
Session().commit()
|
|
|
|
|
|
return _branch_permissions_setter
|
|
|
|
|
|
|
|
|
|