conftest.py
265 lines
| 9.4 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r5607 | import subprocess | |||
r1 | import os | |||
import sys | ||||
import tempfile | ||||
import pytest | ||||
from sqlalchemy.engine import url | ||||
r5087 | from rhodecode.lib.str_utils import safe_str, safe_bytes | |||
r5607 | from rhodecode.tests.fixtures.rc_fixture import TestINI | |||
r1 | ||||
def _get_dbs_from_metafunc(metafunc): | ||||
r5607 | dbs_mark = metafunc.definition.get_closest_marker("dbs") | |||
r3951 | ||||
if dbs_mark: | ||||
# Supported backends by this test function, created from pytest.mark.dbs | ||||
backends = dbs_mark.args | ||||
r1 | else: | |||
r5607 | backends = metafunc.config.getoption("--dbs") | |||
r1 | return backends | |||
def pytest_generate_tests(metafunc): | ||||
# Support test generation based on --dbs parameter | ||||
r5607 | if "db_backend" in metafunc.fixturenames: | |||
requested_backends = set(metafunc.config.getoption("--dbs")) | ||||
r1 | backends = _get_dbs_from_metafunc(metafunc) | |||
backends = requested_backends.intersection(backends) | ||||
# TODO: johbo: Disabling a backend did not work out with | ||||
# parametrization, find better way to achieve this. | ||||
if not backends: | ||||
metafunc.function._skip = True | ||||
r5607 | metafunc.parametrize("db_backend_name", backends) | |||
r1 | ||||
def pytest_collection_modifyitems(session, config, items): | ||||
r5607 | remaining = [i for i in items if not getattr(i.obj, "_skip", False)] | |||
r1 | items[:] = remaining | |||
r3946 | @pytest.fixture() | |||
r5607 | def db_backend(request, db_backend_name, ini_config, tmpdir_factory): | |||
r1 | basetemp = tmpdir_factory.getbasetemp().strpath | |||
klass = _get_backend(db_backend_name) | ||||
r5607 | option_name = "--{}-connection-string".format(db_backend_name) | |||
r1 | connection_string = request.config.getoption(option_name) or None | |||
r5607 | return klass(config_file=ini_config, basetemp=basetemp, connection_string=connection_string) | |||
r1 | ||||
def _get_backend(backend_type): | ||||
r5607 | return {"sqlite": SQLiteDBBackend, "postgres": PostgresDBBackend, "mysql": MySQLDBBackend, "": EmptyDBBackend}[ | |||
backend_type | ||||
] | ||||
r1 | ||||
class DBBackend(object): | ||||
_store = os.path.dirname(os.path.abspath(__file__)) | ||||
_type = None | ||||
r5607 | _base_ini_config = [{"app:main": {"vcs.start_server": "false", "startup.import_repos": "false"}}] | |||
_db_url = [{"app:main": {"sqlalchemy.db1.url": ""}}] | ||||
_base_db_name = "rhodecode_test_db_backend" | ||||
std_env = {"RC_TEST": "0"} | ||||
r1 | ||||
r5607 | def __init__(self, config_file, db_name=None, basetemp=None, connection_string=None): | |||
r1 | self.fixture_store = os.path.join(self._store, self._type) | |||
self.db_name = db_name or self._base_db_name | ||||
self._base_ini_file = config_file | ||||
r5607 | self.stderr = "" | |||
self.stdout = "" | ||||
r1 | self._basetemp = basetemp or tempfile.gettempdir() | |||
r5607 | self._repos_location = os.path.join(self._basetemp, "rc_test_repos") | |||
r1 | self.connection_string = connection_string | |||
@property | ||||
def connection_string(self): | ||||
return self._connection_string | ||||
@connection_string.setter | ||||
def connection_string(self, new_connection_string): | ||||
if not new_connection_string: | ||||
new_connection_string = self.get_default_connection_string() | ||||
else: | ||||
r5607 | new_connection_string = new_connection_string.format(db_name=self.db_name) | |||
r1 | url_parts = url.make_url(new_connection_string) | |||
self._connection_string = new_connection_string | ||||
self.user = url_parts.username | ||||
self.password = url_parts.password | ||||
self.host = url_parts.host | ||||
def get_default_connection_string(self): | ||||
r5607 | raise NotImplementedError("default connection_string is required.") | |||
r1 | ||||
def execute(self, cmd, env=None, *args): | ||||
""" | ||||
Runs command on the system with given ``args``. | ||||
""" | ||||
r5607 | command = cmd + " " + " ".join(args) | |||
sys.stdout.write(f"CMD: {command}") | ||||
r1 | ||||
# Tell Python to use UTF-8 encoding out stdout | ||||
_env = os.environ.copy() | ||||
r5607 | _env["PYTHONIOENCODING"] = "UTF-8" | |||
r5351 | _env.update(self.std_env) | |||
r1 | if env: | |||
_env.update(env) | ||||
r5351 | ||||
r5607 | self.p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=_env) | |||
r1 | self.stdout, self.stderr = self.p.communicate() | |||
r5087 | stdout_str = safe_str(self.stdout) | |||
r5607 | sys.stdout.write(f"COMMAND:{command}\n") | |||
r5087 | sys.stdout.write(stdout_str) | |||
r1 | return self.stdout, self.stderr | |||
def assert_returncode_success(self): | ||||
r5087 | from rich import print as pprint | |||
r5607 | ||||
r1293 | if not self.p.returncode == 0: | |||
r5087 | pprint(safe_str(self.stderr)) | |||
r5607 | raise AssertionError(f"non 0 retcode:{self.p.returncode}") | |||
r1 | ||||
r2653 | def assert_correct_output(self, stdout, version): | |||
r5607 | assert b"UPGRADE FOR STEP %b COMPLETED" % safe_bytes(version) in stdout | |||
r2653 | ||||
r1 | def setup_rhodecode_db(self, ini_params=None, env=None): | |||
if not ini_params: | ||||
ini_params = self._base_ini_config | ||||
ini_params.extend(self._db_url) | ||||
r5607 | with TestINI(self._base_ini_file, ini_params, self._type, destroy=True) as _ini_file: | |||
r1 | if not os.path.isdir(self._repos_location): | |||
os.makedirs(self._repos_location) | ||||
r1562 | ||||
r2653 | return self.execute( | |||
r2341 | "rc-setup-app {0} --user=marcink " | |||
r1 | "--email=marcin@rhodeocode.com --password={1} " | |||
r5607 | "--repos={2} --force-yes".format(_ini_file, "qweqwe", self._repos_location), | |||
env=env, | ||||
) | ||||
r1 | ||||
def upgrade_database(self, ini_params=None): | ||||
if not ini_params: | ||||
ini_params = self._base_ini_config | ||||
ini_params.extend(self._db_url) | ||||
r5607 | test_ini = TestINI(self._base_ini_file, ini_params, self._type, destroy=True) | |||
r1 | with test_ini as ini_file: | |||
if not os.path.isdir(self._repos_location): | ||||
os.makedirs(self._repos_location) | ||||
r2653 | ||||
r5607 | return self.execute("rc-upgrade-db {0} --force-yes".format(ini_file)) | |||
r1 | ||||
def setup_db(self): | ||||
raise NotImplementedError | ||||
def teardown_db(self): | ||||
raise NotImplementedError | ||||
def import_dump(self, dumpname): | ||||
raise NotImplementedError | ||||
class EmptyDBBackend(DBBackend): | ||||
r5607 | _type = "" | |||
r1 | ||||
def setup_db(self): | ||||
pass | ||||
def teardown_db(self): | ||||
pass | ||||
def import_dump(self, dumpname): | ||||
pass | ||||
def assert_returncode_success(self): | ||||
assert True | ||||
class SQLiteDBBackend(DBBackend): | ||||
r5607 | _type = "sqlite" | |||
r1 | ||||
def get_default_connection_string(self): | ||||
r5607 | return "sqlite:///{}/{}.sqlite".format(self._basetemp, self.db_name) | |||
r1 | ||||
def setup_db(self): | ||||
# dump schema for tests | ||||
# cp -v $TEST_DB_NAME | ||||
r5607 | self._db_url = [{"app:main": {"sqlalchemy.db1.url": self.connection_string}}] | |||
r1 | ||||
def import_dump(self, dumpname): | ||||
dump = os.path.join(self.fixture_store, dumpname) | ||||
r5607 | target = os.path.join(self._basetemp, "{0.db_name}.sqlite".format(self)) | |||
return self.execute(f"cp -v {dump} {target}") | ||||
r1 | ||||
def teardown_db(self): | ||||
r5087 | target_db = os.path.join(self._basetemp, self.db_name) | |||
return self.execute(f"rm -rf {target_db}.sqlite") | ||||
r1 | ||||
class MySQLDBBackend(DBBackend): | ||||
r5607 | _type = "mysql" | |||
r1 | ||||
def get_default_connection_string(self): | ||||
r5607 | return "mysql://root:qweqwe@127.0.0.1/{}".format(self.db_name) | |||
r1 | ||||
def setup_db(self): | ||||
# dump schema for tests | ||||
# mysqldump -uroot -pqweqwe $TEST_DB_NAME | ||||
r5607 | self._db_url = [{"app:main": {"sqlalchemy.db1.url": self.connection_string}}] | |||
return self.execute( | ||||
"mysql -v -u{} -p{} -e 'create database '{}';'".format(self.user, self.password, self.db_name) | ||||
) | ||||
r1 | ||||
def import_dump(self, dumpname): | ||||
dump = os.path.join(self.fixture_store, dumpname) | ||||
r5607 | return self.execute("mysql -u{} -p{} {} < {}".format(self.user, self.password, self.db_name, dump)) | |||
r1 | ||||
def teardown_db(self): | ||||
r5607 | return self.execute( | |||
"mysql -v -u{} -p{} -e 'drop database '{}';'".format(self.user, self.password, self.db_name) | ||||
) | ||||
r1 | ||||
class PostgresDBBackend(DBBackend): | ||||
r5607 | _type = "postgres" | |||
r1 | ||||
def get_default_connection_string(self): | ||||
r5607 | return "postgresql://postgres:qweqwe@localhost/{}".format(self.db_name) | |||
r1 | ||||
def setup_db(self): | ||||
# dump schema for tests | ||||
# pg_dump -U postgres -h localhost $TEST_DB_NAME | ||||
r5607 | self._db_url = [{"app:main": {"sqlalchemy.db1.url": self.connection_string}}] | |||
r5087 | cmd = f"PGPASSWORD={self.password} psql -U {self.user} -h localhost -c 'create database '{self.db_name}';'" | |||
return self.execute(cmd) | ||||
r1 | ||||
def teardown_db(self): | ||||
r5087 | cmd = f"PGPASSWORD={self.password} psql -U {self.user} -h localhost -c 'drop database if exists '{self.db_name}';'" | |||
return self.execute(cmd) | ||||
r1 | ||||
def import_dump(self, dumpname): | ||||
dump = os.path.join(self.fixture_store, dumpname) | ||||
r5087 | cmd = f"PGPASSWORD={self.password} psql -U {self.user} -h localhost -d {self.db_name} -1 -f {dump}" | |||
return self.execute(cmd) | ||||