##// END OF EJS Templates
celery: improve errors handling and logging
celery: improve errors handling and logging

File last commit:

r4306:09801de9 default
r4868:862a2784 default
Show More
server_utils.py
197 lines | 6.3 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2020 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import time
import tempfile
import pytest
import subprocess32
import configobj
from urllib2 import urlopen, URLError
from pyramid.compat import configparser
from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
from rhodecode.tests.utils import is_url_reachable
def get_port(pyramid_config):
config = configparser.ConfigParser()
config.read(pyramid_config)
return config.get('server:main', 'port')
def get_host_url(pyramid_config):
"""Construct the host url using the port in the test configuration."""
return '127.0.0.1:%s' % get_port(pyramid_config)
def assert_no_running_instance(url):
if is_url_reachable(url):
print("Hint: Usually this means another instance of server "
"is running in the background at %s." % url)
pytest.fail(
"Port is not free at %s, cannot start server at" % url)
class ServerBase(object):
_args = []
log_file_name = 'NOT_DEFINED.log'
status_url_tmpl = 'http://{host}:{port}'
def __init__(self, config_file, log_file):
self.config_file = config_file
config_data = configobj.ConfigObj(config_file)
self._config = config_data['server:main']
self._args = []
self.log_file = log_file or os.path.join(
tempfile.gettempdir(), self.log_file_name)
self.process = None
self.server_out = None
print("Using the {} configuration:{}".format(
self.__class__.__name__, config_file))
if not os.path.isfile(config_file):
raise RuntimeError('Failed to get config at {}'.format(config_file))
@property
def command(self):
return ' '.join(self._args)
@property
def http_url(self):
template = 'http://{host}:{port}/'
return template.format(**self._config)
def host_url(self):
return 'http://' + get_host_url(self.config_file)
def get_rc_log(self):
with open(self.log_file) as f:
return f.read()
def wait_until_ready(self, timeout=30):
host = self._config['host']
port = self._config['port']
status_url = self.status_url_tmpl.format(host=host, port=port)
start = time.time()
while time.time() - start < timeout:
try:
urlopen(status_url)
break
except URLError:
time.sleep(0.2)
else:
pytest.fail(
"Starting the {} failed or took more than {} "
"seconds. cmd: `{}`".format(
self.__class__.__name__, timeout, self.command))
print('Server of {} ready at url {}'.format(
self.__class__.__name__, status_url))
def shutdown(self):
self.process.kill()
self.server_out.flush()
self.server_out.close()
def get_log_file_with_port(self):
log_file = list(self.log_file.partition('.log'))
log_file.insert(1, get_port(self.config_file))
log_file = ''.join(log_file)
return log_file
class RcVCSServer(ServerBase):
"""
Represents a running VCSServer instance.
"""
log_file_name = 'rc-vcsserver.log'
status_url_tmpl = 'http://{host}:{port}/status'
def __init__(self, config_file, log_file=None):
super(RcVCSServer, self).__init__(config_file, log_file)
self._args = ['gunicorn', '--paste', self.config_file]
def start(self):
env = os.environ.copy()
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
print('rhodecode-vcsserver start command: {}'.format(' '.join(self._args)))
print('rhodecode-vcsserver starting at: {}'.format(host_url))
print('rhodecode-vcsserver command: {}'.format(self.command))
print('rhodecode-vcsserver logfile: {}'.format(self.log_file))
self.process = subprocess32.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
class RcWebServer(ServerBase):
"""
Represents a running RCE web server used as a test fixture.
"""
log_file_name = 'rc-web.log'
status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
def __init__(self, config_file, log_file=None):
super(RcWebServer, self).__init__(config_file, log_file)
self._args = [
'gunicorn', '--worker-class', 'gevent', '--paste', config_file]
def start(self):
env = os.environ.copy()
env['RC_NO_TMP_PATH'] = '1'
self.log_file = self.get_log_file_with_port()
self.server_out = open(self.log_file, 'w')
host_url = self.host_url()
assert_no_running_instance(host_url)
print('rhodecode-web starting at: {}'.format(host_url))
print('rhodecode-web command: {}'.format(self.command))
print('rhodecode-web logfile: {}'.format(self.log_file))
self.process = subprocess32.Popen(
self._args, bufsize=0, env=env,
stdout=self.server_out, stderr=self.server_out)
def repo_clone_url(self, repo_name, **kwargs):
params = {
'user': TEST_USER_ADMIN_LOGIN,
'passwd': TEST_USER_ADMIN_PASS,
'host': get_host_url(self.config_file),
'cloned_repo': repo_name,
}
params.update(**kwargs)
_url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s' % params
return _url