# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import time import tempfile import pytest import subprocess import logging import requests import configparser from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS from rhodecode.tests.utils import is_url_reachable from rhodecode.tests import console_printer log = logging.getLogger(__name__) def get_port(pyramid_config): config = configparser.ConfigParser() config.read(pyramid_config) return config.get('server:main', 'port') def get_host_url(pyramid_config): """Construct the host url using the port in the test configuration.""" port = get_port(pyramid_config) return f'127.0.0.1:{port}' def assert_no_running_instance(url): if is_url_reachable(url): console_printer(f"Hint: Usually this means another instance of server " f"is running in the background at {url}.") pytest.fail(f"Port is not free at {url}, cannot start server at") class ServerBase(object): _args = [] log_file_name = 'NOT_DEFINED.log' status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping' console_marker = " :warning: [green]pytest-setup[/green] " def __init__(self, config_file, log_file, env): self.config_file = config_file config = configparser.ConfigParser() config.read(config_file) self._config = {k: v for k, v in config['server:main'].items()} self._args = [] self.log_file = log_file or os.path.join( tempfile.gettempdir(), self.log_file_name) self.env = env self.process = None self.server_out = None log.info(f"Using the {self.__class__.__name__} configuration:{config_file}") if not os.path.isfile(config_file): raise RuntimeError(f'Failed to get config at {config_file}') @property def command(self): return ' '.join(self._args) @property def bind_addr(self): return '{host}:{port}'.format(**self._config) @property def http_url(self): template = 'http://{host}:{port}/' return template.format(**self._config) def host_url(self): host = get_host_url(self.config_file) return f'http://{host}' def get_rc_log(self): with open(self.log_file) as f: return f.read() def assert_message_in_server_logs(self, message): server_logs = self.get_rc_log() assert message in server_logs def wait_until_ready(self, timeout=30): host = self._config['host'] port = self._config['port'] status_url = self.status_url_tmpl.format(host=host, port=port) start = time.time() while time.time() - start < timeout: try: requests.get(status_url) break except requests.exceptions.ConnectionError: time.sleep(0.2) else: pytest.fail( f"Starting the {self.__class__.__name__} failed or took more than {timeout} seconds." f"cmd: `{self.command}`" ) log.info(f'Server of {self.__class__.__name__} ready at url {status_url}') def shutdown(self): self.process.kill() self.server_out.flush() self.server_out.close() def get_log_file_with_port(self): log_file = list(self.log_file.partition('.log')) log_file.insert(1, f'-{get_port(self.config_file)}') log_file = ''.join(log_file) return log_file class RcVCSServer(ServerBase): """ Represents a running VCSServer instance. """ log_file_name = 'rhodecode-vcsserver.log' status_url_tmpl = 'http://{host}:{port}/status' def __init__(self, config_file, log_file=None, workers='3', env=None, info_prefix=''): super(RcVCSServer, self).__init__(config_file, log_file, env) self.info_prefix = info_prefix self._args = [ 'gunicorn', '--bind', self.bind_addr, '--worker-class', 'sync', '--threads', '1', '--backlog', '8', '--timeout', '300', '--workers', workers, '--paste', self.config_file] def start(self): env = os.environ.copy() self.log_file = self.get_log_file_with_port() self.server_out = open(self.log_file, 'w') host_url = self.host_url() assert_no_running_instance(host_url) console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-vcsserver starting at: {host_url}') console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-vcsserver command: {self.command}') console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-vcsserver logfile: {self.log_file}') console_printer() self.process = subprocess.Popen( self._args, bufsize=0, env=env, stdout=self.server_out, stderr=self.server_out) class RcWebServer(ServerBase): """ Represents a running RCE web server used as a test fixture. """ log_file_name = 'rhodecode-ce.log' status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping' def __init__(self, config_file, log_file=None, workers='2', env=None, info_prefix=''): super(RcWebServer, self).__init__(config_file, log_file, env) self.info_prefix = info_prefix self._args = [ 'gunicorn', '--bind', self.bind_addr, '--worker-class', 'gthread', '--threads', '4', '--backlog', '8', '--timeout', '300', '--workers', workers, '--paste', self.config_file] def start(self): env = os.environ.copy() if self.env: env.update(self.env) self.log_file = self.get_log_file_with_port() self.server_out = open(self.log_file, 'w') host_url = self.host_url() assert_no_running_instance(host_url) console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-ce starting at: {host_url}') console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-ce command: {self.command}') console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-ce logfile: {self.log_file}') console_printer() self.process = subprocess.Popen( self._args, bufsize=0, env=env, stdout=self.server_out, stderr=self.server_out) def repo_clone_url(self, repo_name, **kwargs): params = { 'user': TEST_USER_ADMIN_LOGIN, 'passwd': TEST_USER_ADMIN_PASS, 'host': get_host_url(self.config_file), 'cloned_repo': repo_name, } params.update(**kwargs) _url = f"http://{params['user']}:{params['passwd']}@{params['host']}/{params['cloned_repo']}" return _url def repo_clone_credentials(self, **kwargs): params = { 'user': TEST_USER_ADMIN_LOGIN, 'passwd': TEST_USER_ADMIN_PASS, } params.update(**kwargs) return params['user'], params['passwd'] class CeleryServer(ServerBase): log_file_name = 'rhodecode-celery.log' status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping' def __init__(self, config_file, log_file=None, workers='2', env=None, info_prefix=''): super(CeleryServer, self).__init__(config_file, log_file, env) self.info_prefix = info_prefix self._args = \ ['celery', '--no-color', '--app=rhodecode.lib.celerylib.loader', 'worker', '--autoscale=4,2', '--max-tasks-per-child=30', '--task-events', '--loglevel=DEBUG', '--ini=' + self.config_file] def start(self): env = os.environ.copy() env['RC_NO_TEST_ENV'] = '1' self.log_file = self.get_log_file_with_port() self.server_out = open(self.log_file, 'w') host_url = "Celery" #self.host_url() #assert_no_running_instance(host_url) console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-celery starting at: {host_url}') console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-celery command: {self.command}') console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-celery logfile: {self.log_file}') console_printer() self.process = subprocess.Popen( self._args, bufsize=0, env=env, stdout=self.server_out, stderr=self.server_out) def wait_until_ready(self, timeout=30): time.sleep(2)