##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5468:bafb4fbd default
r5496:cab50adf default
Show More
server_utils.py
231 lines | 7.2 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import time
import tempfile
import pytest
import subprocess
import logging
from urllib.request import urlopen
from urllib.error import URLError
import configparser
from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
from rhodecode.tests.utils import is_url_reachable
log = logging.getLogger(__name__)
def get_port(pyramid_config):
config = configparser.ConfigParser()
config.read(pyramid_config)
return config.get('server:main', 'port')
def get_host_url(pyramid_config):
"""Construct the host url using the port in the test configuration."""
port = get_port(pyramid_config)
return f'127.0.0.1:{port}'
def assert_no_running_instance(url):
if is_url_reachable(url):
print(f"Hint: Usually this means another instance of server "
f"is running in the background at {url}.")
pytest.fail(f"Port is not free at {url}, cannot start server at")
class ServerBase(object):
_args = []
log_file_name = 'NOT_DEFINED.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
def __init__(self, config_file, log_file):
self.config_file = config_file
config = configparser.ConfigParser()
config.read(config_file)
self._config = {k: v for k, v in config['server:main'].items()}
self._args = []
self.log_file = log_file or os.path.join(
tempfile.gettempdir(), self.log_file_name)
self.process = None
self.server_out = None
log.info("Using the {} configuration:{}".format(
self.__class__.__name__, config_file))
if not os.path.isfile(config_file):
raise RuntimeError(f'Failed to get config at {config_file}')
@property
def command(self):
return ' '.join(self._args)
@property
def bind_addr(self):
return '{host}:{port}'.format(**self._config)
@property
def http_url(self):
template = 'http://{host}:{port}/'
return template.format(**self._config)
def host_url(self):
host = get_host_url(self.config_file)
return f'http://{host}'
def get_rc_log(self):
with open(self.log_file) as f:
return f.read()
def assert_message_in_server_logs(self, message):
server_logs = self.get_rc_log()
assert message in server_logs
def wait_until_ready(self, timeout=30):
host = self._config['host']
port = self._config['port']
status_url = self.status_url_tmpl.format(host=host, port=port)
start = time.time()
while time.time() - start < timeout:
try:
urlopen(status_url)
break
except URLError:
time.sleep(0.2)
else:
pytest.fail(
"Starting the {} failed or took more than {} "
"seconds. cmd: `{}`".format(
self.__class__.__name__, timeout, self.command))
log.info('Server of {} ready at url {}'.format(
self.__class__.__name__, status_url))
def shutdown(self):
self.process.kill()
self.server_out.flush()
self.server_out.close()
def get_log_file_with_port(self):
log_file = list(self.log_file.partition('.log'))
log_file.insert(1, get_port(self.config_file))
log_file = ''.join(log_file)
return log_file
class RcVCSServer(ServerBase):
"""
Represents a running VCSServer instance.
"""
log_file_name = 'rc-vcsserver.log'
status_url_tmpl = 'http://{host}:{port}/status'
def __init__(self, config_file, log_file=None, workers='3'):
super(RcVCSServer, self).__init__(config_file, log_file)
self._args = [
'gunicorn',
'--bind', self.bind_addr,
'--worker-class', 'sync',
'--threads', '1',
'--backlog', '8',
'--timeout', '300',
'--workers', workers,
'--paste', self.config_file]
def start(self):
env = os.environ.copy()
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
print(f'rhodecode-vcsserver starting at: {host_url}')
print(f'rhodecode-vcsserver command: {self.command}')
print(f'rhodecode-vcsserver logfile: {self.log_file}')
self.process = subprocess.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
class RcWebServer(ServerBase):
"""
Represents a running RCE web server used as a test fixture.
"""
log_file_name = 'rc-web.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
def __init__(self, config_file, log_file=None, workers='2'):
super(RcWebServer, self).__init__(config_file, log_file)
self._args = [
'gunicorn',
'--bind', self.bind_addr,
'--worker-class', 'gthread',
'--threads', '4',
'--backlog', '8',
'--timeout', '300',
'--workers', workers,
'--paste', self.config_file]
def start(self):
env = os.environ.copy()
env['RC_NO_TMP_PATH'] = '1'
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
print(f'rhodecode-web starting at: {host_url}')
print(f'rhodecode-web command: {self.command}')
print(f'rhodecode-web logfile: {self.log_file}')
self.process = subprocess.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
def repo_clone_url(self, repo_name, **kwargs):
params = {
'user': TEST_USER_ADMIN_LOGIN,
'passwd': TEST_USER_ADMIN_PASS,
'host': get_host_url(self.config_file),
'cloned_repo': repo_name,
}
params.update(**kwargs)
_url = f"http://{params['user']}:{params['passwd']}@{params['host']}/{params['cloned_repo']}"
return _url
def repo_clone_credentials(self, **kwargs):
params = {
'user': TEST_USER_ADMIN_LOGIN,
'passwd': TEST_USER_ADMIN_PASS,
}
params.update(**kwargs)
return params['user'], params['passwd']