##// END OF EJS Templates
added latest changes
added latest changes

File last commit:

r5608:6d33e504 default
r5658:a109f5ac merge default
Show More
server_utils.py
276 lines | 9.2 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import time
import tempfile
import pytest
import subprocess
import logging
import requests
import configparser
from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
from rhodecode.tests.utils import is_url_reachable
from rhodecode.tests import console_printer
log = logging.getLogger(__name__)
def get_port(pyramid_config):
config = configparser.ConfigParser()
config.read(pyramid_config)
return config.get('server:main', 'port')
def get_host_url(pyramid_config):
"""Construct the host url using the port in the test configuration."""
port = get_port(pyramid_config)
return f'127.0.0.1:{port}'
def assert_no_running_instance(url):
if is_url_reachable(url):
console_printer(f"Hint: Usually this means another instance of server "
f"is running in the background at {url}.")
pytest.fail(f"Port is not free at {url}, cannot start server at")
class ServerBase(object):
_args = []
log_file_name = 'NOT_DEFINED.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
console_marker = " :warning: [green]pytest-setup[/green] "
def __init__(self, config_file, log_file, env):
self.config_file = config_file
config = configparser.ConfigParser()
config.read(config_file)
self._config = {k: v for k, v in config['server:main'].items()}
self._args = []
self.log_file = log_file or os.path.join(
tempfile.gettempdir(), self.log_file_name)
self.env = env
self.process = None
self.server_out = None
log.info(f"Using the {self.__class__.__name__} configuration:{config_file}")
if not os.path.isfile(config_file):
raise RuntimeError(f'Failed to get config at {config_file}')
@property
def command(self):
return ' '.join(self._args)
@property
def bind_addr(self):
return '{host}:{port}'.format(**self._config)
@property
def http_url(self):
template = 'http://{host}:{port}/'
return template.format(**self._config)
def host_url(self):
host = get_host_url(self.config_file)
return f'http://{host}'
def get_rc_log(self):
with open(self.log_file) as f:
return f.read()
def assert_message_in_server_logs(self, message):
server_logs = self.get_rc_log()
assert message in server_logs
def wait_until_ready(self, timeout=30):
host = self._config['host']
port = self._config['port']
status_url = self.status_url_tmpl.format(host=host, port=port)
start = time.time()
while time.time() - start < timeout:
try:
requests.get(status_url)
break
except requests.exceptions.ConnectionError:
time.sleep(0.2)
else:
pytest.fail(
f"Starting the {self.__class__.__name__} failed or took more than {timeout} seconds."
f"cmd: `{self.command}`"
)
log.info(f'Server of {self.__class__.__name__} ready at url {status_url}')
def shutdown(self):
self.process.kill()
self.server_out.flush()
self.server_out.close()
def get_log_file_with_port(self):
log_file = list(self.log_file.partition('.log'))
log_file.insert(1, f'-{get_port(self.config_file)}')
log_file = ''.join(log_file)
return log_file
class RcVCSServer(ServerBase):
"""
Represents a running VCSServer instance.
"""
log_file_name = 'rhodecode-vcsserver.log'
status_url_tmpl = 'http://{host}:{port}/status'
def __init__(self, config_file, log_file=None, workers='3', env=None, info_prefix=''):
super(RcVCSServer, self).__init__(config_file, log_file, env)
self.info_prefix = info_prefix
self._args = [
'gunicorn',
'--bind', self.bind_addr,
'--worker-class', 'sync',
'--threads', '1',
'--backlog', '8',
'--timeout', '300',
'--workers', workers,
'--paste', self.config_file]
def start(self):
env = os.environ.copy()
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-vcsserver starting at: {host_url}')
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-vcsserver command: {self.command}')
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-vcsserver logfile: {self.log_file}')
console_printer()
self.process = subprocess.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
class RcWebServer(ServerBase):
"""
Represents a running RCE web server used as a test fixture.
"""
log_file_name = 'rhodecode-ce.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
def __init__(self, config_file, log_file=None, workers='2', env=None, info_prefix=''):
super(RcWebServer, self).__init__(config_file, log_file, env)
self.info_prefix = info_prefix
self._args = [
'gunicorn',
'--bind', self.bind_addr,
'--worker-class', 'gthread',
'--threads', '4',
'--backlog', '8',
'--timeout', '300',
'--workers', workers,
'--paste', self.config_file]
def start(self):
env = os.environ.copy()
if self.env:
env.update(self.env)
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-ce starting at: {host_url}')
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-ce command: {self.command}')
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-ce logfile: {self.log_file}')
console_printer()
self.process = subprocess.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
def repo_clone_url(self, repo_name, **kwargs):
params = {
'user': TEST_USER_ADMIN_LOGIN,
'passwd': TEST_USER_ADMIN_PASS,
'host': get_host_url(self.config_file),
'cloned_repo': repo_name,
}
params.update(**kwargs)
_url = f"http://{params['user']}:{params['passwd']}@{params['host']}/{params['cloned_repo']}"
return _url
def repo_clone_credentials(self, **kwargs):
params = {
'user': TEST_USER_ADMIN_LOGIN,
'passwd': TEST_USER_ADMIN_PASS,
}
params.update(**kwargs)
return params['user'], params['passwd']
class CeleryServer(ServerBase):
log_file_name = 'rhodecode-celery.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
def __init__(self, config_file, log_file=None, workers='2', env=None, info_prefix=''):
super(CeleryServer, self).__init__(config_file, log_file, env)
self.info_prefix = info_prefix
self._args = \
['celery',
'--no-color',
'--app=rhodecode.lib.celerylib.loader',
'worker',
'--autoscale=4,2',
'--max-tasks-per-child=30',
'--task-events',
'--loglevel=DEBUG',
'--ini=' + self.config_file]
def start(self):
env = os.environ.copy()
env['RC_NO_TEST_ENV'] = '1'
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = "Celery" #self.host_url()
#assert_no_running_instance(host_url)
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-celery starting at: {host_url}')
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-celery command: {self.command}')
console_printer(f'{self.console_marker}{self.info_prefix}rhodecode-celery logfile: {self.log_file}')
console_printer()
self.process = subprocess.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
def wait_until_ready(self, timeout=30):
time.sleep(2)