diff --git a/configs/gunicorn_config.py b/configs/gunicorn_config.py --- a/configs/gunicorn_config.py +++ b/configs/gunicorn_config.py @@ -3,24 +3,21 @@ Gunicorn config extension and hooks. Thi Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer """ -import contextlib -import dataclasses import gc +import os +import sys import math -import os +import time +import threading +import traceback import random import socket -import sys -import threading -import time -import traceback - +import dataclasses from gunicorn.glogging import Logger def get_workers(): import multiprocessing - return multiprocessing.cpu_count() * 2 + 1 @@ -28,10 +25,10 @@ bind = "127.0.0.1:10010" # Error logging output for gunicorn (-) is stdout -errorlog = "-" +errorlog = '-' # Access logging output for gunicorn (-) is stdout -accesslog = "-" +accesslog = '-' # SERVER MECHANICS @@ -41,39 +38,50 @@ worker_tmp_dir = None tmp_upload_dir = None # use re-use port logic -# reuse_port = True +#reuse_port = True # Custom log format -# access_log_format = ( +#access_log_format = ( # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"') # loki format for easier parsing in grafana -access_log_format = 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"' - - -# Sets the number of process workers. More workers means more concurrent connections -# RhodeCode can handle at the same time. Each additional worker also it increases -# memory usage as each has it's own set of caches. -# Recommended value is (2 * NUMBER_OF_CPUS + 1), eg 2CPU = 5 workers, but no more -# than 8-10 unless for huge deployments .e.g 700-1000 users. -# `instance_id = *` must be set in the [app:main] section below (which is the default) -# when using more than 1 worker. -workers = 6 +access_log_format = ( + 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"') # self adjust workers based on CPU count, to use maximum of CPU and not overquota the resources # workers = get_workers() # Gunicorn access log level -loglevel = "info" +loglevel = 'info' # Process name visible in a process list proc_name = "rhodecode_vcsserver" -# Type of worker class, one of `sync`, `gevent` -# currently `sync` is the only option allowed. -worker_class = "sync" +# Type of worker class, one of `sync`, `gevent` or `gthread` +# currently `sync` is the only option allowed for vcsserver and for rhodecode all of 3 are allowed +# gevent: +# In this case, the maximum number of concurrent requests is (N workers * X worker_connections) +# e.g. workers =3 worker_connections=10 = 3*10, 30 concurrent requests can be handled +# gthread: +# In this case, the maximum number of concurrent requests is (N workers * X threads) +# e.g. workers = 3 threads=3 = 3*3, 9 concurrent requests can be handled +worker_class = 'sync' + +# Sets the number of process workers. More workers means more concurrent connections +# RhodeCode can handle at the same time. Each additional worker also it increases +# memory usage as each has its own set of caches. +# The Recommended value is (2 * NUMBER_OF_CPUS + 1), eg 2CPU = 5 workers, but no more +# than 8-10 unless for huge deployments .e.g 700-1000 users. +# `instance_id = *` must be set in the [app:main] section below (which is the default) +# when using more than 1 worker. +workers = 2 + +# Threads numbers for worker class gthread +threads = 1 # The maximum number of simultaneous clients. Valid only for gevent +# In this case, the maximum number of concurrent requests is (N workers * X worker_connections) +# e.g workers =3 worker_connections=10 = 3*10, 30 concurrent requests can be handled worker_connections = 10 # Max number of requests that worker will handle before being gracefully restarted. @@ -138,27 +146,26 @@ class MemoryCheckConfig: def _get_process_rss(pid=None): - with contextlib.suppress(Exception): + try: import psutil - if pid: proc = psutil.Process(pid) else: proc = psutil.Process() return proc.memory_info().rss - - return None + except Exception: + return None def _get_config(ini_path): import configparser - with contextlib.suppress(Exception): + try: config = configparser.RawConfigParser() config.read(ini_path) return config - - return None + except Exception: + return None def get_memory_usage_params(config=None): @@ -171,30 +178,30 @@ def get_memory_usage_params(config=None) ini_path = os.path.abspath(config) conf = _get_config(ini_path) - section = "server:main" + section = 'server:main' if conf and conf.has_section(section): - if conf.has_option(section, "memory_max_usage"): - _memory_max_usage = conf.getint(section, "memory_max_usage") - if conf.has_option(section, "memory_usage_check_interval"): - _memory_usage_check_interval = conf.getint(section, "memory_usage_check_interval") + if conf.has_option(section, 'memory_max_usage'): + _memory_max_usage = conf.getint(section, 'memory_max_usage') + + if conf.has_option(section, 'memory_usage_check_interval'): + _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval') - if conf.has_option(section, "memory_usage_recovery_threshold"): - _memory_usage_recovery_threshold = conf.getfloat(section, "memory_usage_recovery_threshold") + if conf.has_option(section, 'memory_usage_recovery_threshold'): + _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold') - _memory_max_usage = int(os.environ.get("RC_GUNICORN_MEMORY_MAX_USAGE", "") or _memory_max_usage) - _memory_usage_check_interval = int( - os.environ.get("RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL", "") or _memory_usage_check_interval - ) - _memory_usage_recovery_threshold = float( - os.environ.get("RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD", "") or _memory_usage_recovery_threshold - ) + _memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '') + or _memory_max_usage) + _memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '') + or _memory_usage_check_interval) + _memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '') + or _memory_usage_recovery_threshold) return MemoryCheckConfig(_memory_max_usage, _memory_usage_check_interval, _memory_usage_recovery_threshold) def _time_with_offset(check_interval): - return time.time() - random.randint(0, check_interval / 2.0) + return time.time() - random.randint(0, check_interval/2.0) def pre_fork(server, worker): @@ -202,27 +209,26 @@ def pre_fork(server, worker): def post_fork(server, worker): + memory_conf = get_memory_usage_params() _memory_max_usage = memory_conf.max_usage _memory_usage_check_interval = memory_conf.check_interval _memory_usage_recovery_threshold = memory_conf.recovery_threshold - worker._memory_max_usage = int(os.environ.get("RC_GUNICORN_MEMORY_MAX_USAGE", "") or _memory_max_usage) - worker._memory_usage_check_interval = int( - os.environ.get("RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL", "") or _memory_usage_check_interval - ) - worker._memory_usage_recovery_threshold = float( - os.environ.get("RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD", "") or _memory_usage_recovery_threshold - ) + worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '') + or _memory_max_usage) + worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '') + or _memory_usage_check_interval) + worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '') + or _memory_usage_recovery_threshold) # register memory last check time, with some random offset so we don't recycle all # at once worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval) if _memory_max_usage: - server.log.info( - "pid=[%-10s] WORKER spawned with max memory set at %s", worker.pid, _format_data_size(_memory_max_usage) - ) + server.log.info("pid=[%-10s] WORKER spawned with max memory set at %s", worker.pid, + _format_data_size(_memory_max_usage)) else: server.log.info("pid=[%-10s] WORKER spawned", worker.pid) @@ -232,9 +238,9 @@ def pre_exec(server): def on_starting(server): - server_lbl = "{} {}".format(server.proc_name, server.address) + server_lbl = '{} {}'.format(server.proc_name, server.address) server.log.info("Server %s is starting.", server_lbl) - server.log.info("Config:") + server.log.info('Config:') server.log.info(f"\n{server.cfg}") server.log.info(get_memory_usage_params()) @@ -268,10 +274,10 @@ def _format_data_size(size, unit="B", pr if not binary: base = 1000 - multiples = ("", "k", "M", "G", "T", "P", "E", "Z", "Y") + multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') else: base = 1024 - multiples = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi") + multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi') sign = "" if size > 0: @@ -286,13 +292,13 @@ def _format_data_size(size, unit="B", pr m = 8 if m == 0: - precision = "%.0f" + precision = '%.0f' else: - precision = "%%.%df" % precision + precision = '%%.%df' % precision size = precision % (size / math.pow(base, m)) - return "%s%s %s%s" % (sign, size.strip(), multiples[m], unit) + return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit) def _check_memory_usage(worker): @@ -308,8 +314,8 @@ def _check_memory_usage(worker): mem_usage = _get_process_rss() if mem_usage and mem_usage > _memory_max_usage: worker.log.info( - "memory usage %s > %s, forcing gc", _format_data_size(mem_usage), _format_data_size(_memory_max_usage) - ) + "memory usage %s > %s, forcing gc", + _format_data_size(mem_usage), _format_data_size(_memory_max_usage)) # Try to clean it up by forcing a full collection. gc.collect() mem_usage = _get_process_rss() @@ -317,9 +323,7 @@ def _check_memory_usage(worker): # Didn't clean up enough, we'll have to terminate. worker.log.warning( "memory usage %s > %s after gc, quitting", - _format_data_size(mem_usage), - _format_data_size(_memory_max_usage), - ) + _format_data_size(mem_usage), _format_data_size(_memory_max_usage)) # This will cause worker to auto-restart itself worker.alive = False worker._last_memory_check_time = time.time() @@ -335,7 +339,8 @@ def worker_int(worker): code = [] for thread_id, stack in sys._current_frames().items(): # noqa - code.append("\n# Thread: %s(%d)" % (get_thread_id(thread_id), thread_id)) + code.append( + "\n# Thread: %s(%d)" % (get_thread_id(thread_id), thread_id)) for fname, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (fname, lineno, name)) if line: @@ -357,21 +362,17 @@ def child_exit(server, worker): def pre_request(worker, req): worker.start_time = time.time() - worker.log.debug("GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path) + worker.log.debug( + "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path) def post_request(worker, req, environ, resp): total_time = time.time() - worker.start_time # Gunicorn sometimes has problems with reading the status_code - status_code = getattr(resp, "status_code", "") + status_code = getattr(resp, 'status_code', '') worker.log.debug( "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs", - worker.nr, - req.method, - req.path, - status_code, - total_time, - ) + worker.nr, req.method, req.path, status_code, total_time) _check_memory_usage(worker) @@ -384,8 +385,8 @@ def _filter_proxy(ip): :param ip: ip string from headers """ - if "," in ip: - _ips = ip.split(",") + if ',' in ip: + _ips = ip.split(',') _first_ip = _ips[0].strip() return _first_ip return ip @@ -401,9 +402,8 @@ def _filter_port(ip): :param ip: """ - def is_ipv6(ip_addr): - if hasattr(socket, "inet_pton"): + if hasattr(socket, 'inet_pton'): try: socket.inet_pton(socket.AF_INET6, ip_addr) except socket.error: @@ -412,24 +412,24 @@ def _filter_port(ip): return False return True - if ":" not in ip: # must be ipv4 pure ip + if ':' not in ip: # must be ipv4 pure ip return ip - if "[" in ip and "]" in ip: # ipv6 with port - return ip.split("]")[0][1:].lower() + if '[' in ip and ']' in ip: # ipv6 with port + return ip.split(']')[0][1:].lower() # must be ipv6 or ipv4 with port if is_ipv6(ip): return ip else: - ip, _port = ip.split(":")[:2] # means ipv4+port + ip, _port = ip.split(':')[:2] # means ipv4+port return ip def get_ip_addr(environ): - proxy_key = "HTTP_X_REAL_IP" - proxy_key2 = "HTTP_X_FORWARDED_FOR" - def_key = "REMOTE_ADDR" + proxy_key = 'HTTP_X_REAL_IP' + proxy_key2 = 'HTTP_X_FORWARDED_FOR' + def_key = 'REMOTE_ADDR' def _filters(x): return _filter_port(_filter_proxy(x)) @@ -442,7 +442,7 @@ def get_ip_addr(environ): if ip: return _filters(ip) - ip = environ.get(def_key, "0.0.0.0") + ip = environ.get(def_key, '0.0.0.0') return _filters(ip) @@ -457,40 +457,43 @@ class RhodeCodeLogger(Logger): Logger.__init__(self, cfg) def now(self): - """return date in RhodeCode Log format""" + """ return date in RhodeCode Log format """ now = time.time() msecs = int((now - int(now)) * 1000) - return time.strftime(self.datefmt, time.localtime(now)) + ".{0:03d}".format(msecs) + return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs) def atoms(self, resp, req, environ, request_time): - """Gets atoms for log formatting.""" + """ Gets atoms for log formatting. + """ status = resp.status if isinstance(status, str): status = status.split(None, 1)[0] atoms = { - "h": get_ip_addr(environ), - "l": "-", - "u": self._get_user(environ) or "-", - "t": self.now(), - "r": "%s %s %s" % (environ["REQUEST_METHOD"], environ["RAW_URI"], environ["SERVER_PROTOCOL"]), - "s": status, - "m": environ.get("REQUEST_METHOD"), - "U": environ.get("PATH_INFO"), - "q": environ.get("QUERY_STRING"), - "H": environ.get("SERVER_PROTOCOL"), - "b": getattr(resp, "sent", None) is not None and str(resp.sent) or "-", - "B": getattr(resp, "sent", None), - "f": environ.get("HTTP_REFERER", "-"), - "a": environ.get("HTTP_USER_AGENT", "-"), - "T": request_time.seconds, - "D": (request_time.seconds * 1000000) + request_time.microseconds, - "M": (request_time.seconds * 1000) + int(request_time.microseconds / 1000), - "L": "%d.%06d" % (request_time.seconds, request_time.microseconds), - "p": "<%s>" % os.getpid(), + 'h': get_ip_addr(environ), + 'l': '-', + 'u': self._get_user(environ) or '-', + 't': self.now(), + 'r': "%s %s %s" % (environ['REQUEST_METHOD'], + environ['RAW_URI'], + environ["SERVER_PROTOCOL"]), + 's': status, + 'm': environ.get('REQUEST_METHOD'), + 'U': environ.get('PATH_INFO'), + 'q': environ.get('QUERY_STRING'), + 'H': environ.get('SERVER_PROTOCOL'), + 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-', + 'B': getattr(resp, 'sent', None), + 'f': environ.get('HTTP_REFERER', '-'), + 'a': environ.get('HTTP_USER_AGENT', '-'), + 'T': request_time.seconds, + 'D': (request_time.seconds * 1000000) + request_time.microseconds, + 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000), + 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds), + 'p': "<%s>" % os.getpid() } # add request headers - if hasattr(req, "headers"): + if hasattr(req, 'headers'): req_headers = req.headers else: req_headers = req