##// END OF EJS Templates
feat(ssh-wrapper): added pre/post pull hooks on top of git for ssh backend....
feat(ssh-wrapper): added pre/post pull hooks on top of git for ssh backend. - this makes it consistent with the http backend and actually adds audit logging into the ssh pulls for git

File last commit:

r5088:8f6d1ed6 default
r5302:399d1dbe default
Show More
server_utils.py
221 lines | 6.9 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import time
import tempfile
import pytest
import subprocess
import logging
from urllib.request import urlopen
from urllib.error import URLError
import configparser
from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
from rhodecode.tests.utils import is_url_reachable
log = logging.getLogger(__name__)
def get_port(pyramid_config):
config = configparser.ConfigParser()
config.read(pyramid_config)
return config.get('server:main', 'port')
def get_host_url(pyramid_config):
"""Construct the host url using the port in the test configuration."""
port = get_port(pyramid_config)
return f'127.0.0.1:{port}'
def assert_no_running_instance(url):
if is_url_reachable(url):
print(f"Hint: Usually this means another instance of server "
f"is running in the background at {url}.")
pytest.fail(f"Port is not free at {url}, cannot start server at")
class ServerBase(object):
_args = []
log_file_name = 'NOT_DEFINED.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
def __init__(self, config_file, log_file):
self.config_file = config_file
config = configparser.ConfigParser()
config.read(config_file)
self._config = {k: v for k, v in config['server:main'].items()}
self._args = []
self.log_file = log_file or os.path.join(
tempfile.gettempdir(), self.log_file_name)
self.process = None
self.server_out = None
log.info("Using the {} configuration:{}".format(
self.__class__.__name__, config_file))
if not os.path.isfile(config_file):
raise RuntimeError(f'Failed to get config at {config_file}')
@property
def command(self):
return ' '.join(self._args)
@property
def bind_addr(self):
return '{host}:{port}'.format(**self._config)
@property
def http_url(self):
template = 'http://{host}:{port}/'
return template.format(**self._config)
def host_url(self):
host = get_host_url(self.config_file)
return f'http://{host}'
def get_rc_log(self):
with open(self.log_file) as f:
return f.read()
def assert_message_in_server_logs(self, message):
server_logs = self.get_rc_log()
assert message in server_logs
def wait_until_ready(self, timeout=30):
host = self._config['host']
port = self._config['port']
status_url = self.status_url_tmpl.format(host=host, port=port)
start = time.time()
while time.time() - start < timeout:
try:
urlopen(status_url)
break
except URLError:
time.sleep(0.2)
else:
pytest.fail(
"Starting the {} failed or took more than {} "
"seconds. cmd: `{}`".format(
self.__class__.__name__, timeout, self.command))
log.info('Server of {} ready at url {}'.format(
self.__class__.__name__, status_url))
def shutdown(self):
self.process.kill()
self.server_out.flush()
self.server_out.close()
def get_log_file_with_port(self):
log_file = list(self.log_file.partition('.log'))
log_file.insert(1, get_port(self.config_file))
log_file = ''.join(log_file)
return log_file
class RcVCSServer(ServerBase):
"""
Represents a running VCSServer instance.
"""
log_file_name = 'rc-vcsserver.log'
status_url_tmpl = 'http://{host}:{port}/status'
def __init__(self, config_file, log_file=None, workers='2'):
super(RcVCSServer, self).__init__(config_file, log_file)
self._args = [
'gunicorn',
'--bind', self.bind_addr,
'--worker-class', 'gevent',
'--backlog', '16',
'--timeout', '300',
'--workers', workers,
'--paste', self.config_file]
def start(self):
env = os.environ.copy()
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
print(f'rhodecode-vcsserver starting at: {host_url}')
print(f'rhodecode-vcsserver command: {self.command}')
print(f'rhodecode-vcsserver logfile: {self.log_file}')
self.process = subprocess.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
class RcWebServer(ServerBase):
"""
Represents a running RCE web server used as a test fixture.
"""
log_file_name = 'rc-web.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
def __init__(self, config_file, log_file=None, workers='1'):
super(RcWebServer, self).__init__(config_file, log_file)
self._args = [
'gunicorn',
'--bind', self.bind_addr,
'--worker-class', 'gevent',
'--backlog', '16',
'--timeout', '300',
'--workers', workers,
'--paste', self.config_file]
def start(self):
env = os.environ.copy()
env['RC_NO_TMP_PATH'] = '1'
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
print(f'rhodecode-web starting at: {host_url}')
print(f'rhodecode-web command: {self.command}')
print(f'rhodecode-web logfile: {self.log_file}')
self.process = subprocess.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
def repo_clone_url(self, repo_name, **kwargs):
params = {
'user': TEST_USER_ADMIN_LOGIN,
'passwd': TEST_USER_ADMIN_PASS,
'host': get_host_url(self.config_file),
'cloned_repo': repo_name,
}
params.update(**kwargs)
_url = f"http://{params['user']}:{params['passwd']}@{params['host']}/{params['cloned_repo']}"
return _url