##// END OF EJS Templates
fix: startup, report nicer error in case DB is damaged or empty when starting services
fix: startup, report nicer error in case DB is damaged or empty when starting services

File last commit:

r5459:7f730862 default
r5462:6ef12567 default
Show More
conftest.py
311 lines | 9.6 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
py.test config for test suite for making push/pull operations.
.. important::
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
to redirect things to stderr instead of stdout.
"""
import os
import tempfile
import textwrap
import pytest
import logging
import requests
from rhodecode import events
from rhodecode.lib.str_utils import safe_bytes
from rhodecode.model.db import Integration, UserRepoToPerm, Permission, \
UserToRepoBranchPermission, User
from rhodecode.model.integration import IntegrationModel
from rhodecode.model.db import Repository
from rhodecode.model.meta import Session
from rhodecode.integrations.types.webhook import WebhookIntegrationType
from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
from rhodecode.tests.conftest import HTTPBIN_DOMAIN, HTTPBIN_POST
from rhodecode.tests.fixture import Fixture
from rhodecode.tests.server_utils import RcWebServer
REPO_GROUP = 'a_repo_group'
HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
log = logging.getLogger(__name__)
def check_httpbin_connection():
log.debug('Checking if HTTPBIN_DOMAIN: %s is available', HTTPBIN_DOMAIN)
try:
response = requests.get(HTTPBIN_DOMAIN, timeout=5)
return response.status_code == 200
except Exception as e:
print(e)
return False
@pytest.fixture(scope="module")
def rcextensions(request, db_connection, tmpdir_factory):
"""
Installs a testing rcextensions pack to ensure they work as expected.
"""
init_content = textwrap.dedent("""
# Forward import the example rcextensions to make it
# active for our tests.
from rhodecode.tests.other.example_rcextensions import *
""")
# Note: rcextensions are looked up based on the path of the ini file
root_path = tmpdir_factory.getbasetemp()
rcextensions_path = root_path.join('rcextensions')
init_path = rcextensions_path.join('__init__.py')
if rcextensions_path.check():
pytest.fail(
"Path for rcextensions already exists, please clean up before "
"test run this path: %s" % (rcextensions_path, ))
else:
request.addfinalizer(rcextensions_path.remove)
init_path.write_binary(safe_bytes(init_content), ensure=True)
@pytest.fixture(scope="module")
def repos(request, db_connection):
"""Create a copy of each test repo in a repo group."""
fixture = Fixture()
repo_group = fixture.create_repo_group(REPO_GROUP)
repo_group_id = repo_group.group_id
fixture.create_fork(HG_REPO, HG_REPO,
repo_name_full=HG_REPO_WITH_GROUP,
repo_group=repo_group_id)
fixture.create_fork(GIT_REPO, GIT_REPO,
repo_name_full=GIT_REPO_WITH_GROUP,
repo_group=repo_group_id)
fixture.create_fork(SVN_REPO, SVN_REPO,
repo_name_full=SVN_REPO_WITH_GROUP,
repo_group=repo_group_id)
@request.addfinalizer
def cleanup():
fixture.destroy_repo(HG_REPO_WITH_GROUP)
fixture.destroy_repo(GIT_REPO_WITH_GROUP)
fixture.destroy_repo(SVN_REPO_WITH_GROUP)
fixture.destroy_repo_group(repo_group_id)
@pytest.fixture(scope="module")
def rc_web_server_config_modification():
return []
@pytest.fixture(scope="module")
def rc_web_server_config_factory(testini_factory, rc_web_server_config_modification):
"""
Configuration file used for the fixture `rc_web_server`.
"""
def factory(rcweb_port, vcsserver_port):
custom_params = [
{'handler_console': {'level': 'DEBUG'}},
{'server:main': {'port': rcweb_port}},
{'app:main': {'vcs.server': 'localhost:%s' % vcsserver_port}}
]
custom_params.extend(rc_web_server_config_modification)
return testini_factory(custom_params)
return factory
@pytest.fixture(scope="module")
def rc_web_server(
request, vcsserver_factory, available_port_factory,
rc_web_server_config_factory, repos, rcextensions):
"""
Run the web server as a subprocess. with its own instance of vcsserver
"""
rcweb_port: int = available_port_factory()
log.info('Using rcweb ops test port %s', rcweb_port)
vcsserver_port: int = available_port_factory()
log.info('Using vcsserver ops test port %s', vcsserver_port)
vcs_log = os.path.join(tempfile.gettempdir(), 'rc_op_vcs.log')
vcsserver_factory(
request, vcsserver_port=vcsserver_port,
log_file=vcs_log,
overrides=(
{'server:main': {'workers': 2}},
{'server:main': {'graceful_timeout': 10}},
))
rc_log = os.path.join(tempfile.gettempdir(), 'rc_op_web.log')
rc_web_server_config = rc_web_server_config_factory(
rcweb_port=rcweb_port,
vcsserver_port=vcsserver_port)
server = RcWebServer(rc_web_server_config, log_file=rc_log)
server.start()
@request.addfinalizer
def cleanup():
server.shutdown()
server.wait_until_ready()
return server
@pytest.fixture()
def disable_locking(baseapp):
r = Repository.get_by_repo_name(GIT_REPO)
Repository.unlock(r)
r.enable_locking = False
Session().add(r)
Session().commit()
r = Repository.get_by_repo_name(HG_REPO)
Repository.unlock(r)
r.enable_locking = False
Session().add(r)
Session().commit()
@pytest.fixture()
def fs_repo_only(request, rhodecode_fixtures):
def fs_repo_fabric(repo_name, repo_type):
rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type)
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False)
def cleanup():
rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True)
rhodecode_fixtures.destroy_repo_on_filesystem(repo_name)
request.addfinalizer(cleanup)
return fs_repo_fabric
@pytest.fixture()
def enable_webhook_push_integration(request):
integration = Integration()
integration.integration_type = WebhookIntegrationType.key
Session().add(integration)
settings = dict(
url=HTTPBIN_POST,
secret_token='secret',
username=None,
password=None,
custom_header_key=None,
custom_header_val=None,
method_type='post',
events=[events.RepoPushEvent.name],
log_data=True
)
IntegrationModel().update_integration(
integration,
name='IntegrationWebhookTest',
enabled=True,
settings=settings,
repo=None,
repo_group=None,
child_repos_only=False,
)
Session().commit()
integration_id = integration.integration_id
@request.addfinalizer
def cleanup():
integration = Integration.get(integration_id)
Session().delete(integration)
Session().commit()
@pytest.fixture()
def branch_permission_setter(request):
"""
def my_test(branch_permission_setter)
branch_permission_setter(repo_name, username, pattern='*', permission='branch.push')
"""
rule_id = None
write_perm_id = None
write_perm = None
rule = None
def _branch_permissions_setter(
repo_name, username, pattern='*', permission='branch.push_force'):
global rule_id, write_perm_id
global rule, write_perm
repo = Repository.get_by_repo_name(repo_name)
repo_id = repo.repo_id
user = User.get_by_username(username)
user_id = user.user_id
rule_perm_obj = Permission.get_by_key(permission)
# add new entry, based on existing perm entry
perm = UserRepoToPerm.query() \
.filter(UserRepoToPerm.repository_id == repo_id) \
.filter(UserRepoToPerm.user_id == user_id) \
.first()
if not perm:
# such user isn't defined in Permissions for repository
# we now on-the-fly add new permission
write_perm = UserRepoToPerm()
write_perm.permission = Permission.get_by_key('repository.write')
write_perm.repository_id = repo_id
write_perm.user_id = user_id
Session().add(write_perm)
Session().flush()
perm = write_perm
rule = UserToRepoBranchPermission()
rule.rule_to_perm_id = perm.repo_to_perm_id
rule.branch_pattern = pattern
rule.rule_order = 10
rule.permission = rule_perm_obj
rule.repository_id = repo_id
Session().add(rule)
Session().commit()
return rule
@request.addfinalizer
def cleanup():
if rule:
Session().delete(rule)
Session().commit()
if write_perm:
Session().delete(write_perm)
Session().commit()
return _branch_permissions_setter