# HG changeset patch # User Milka Kuzminski # Date 2021-02-03 07:06:48 # Node ID 1ce348496a4b98f7f2aa2b66b2773ad2346d1cbd # Parent 648a96970c23c056b81a1243b3b2f6c66a9840f1 application: added statsd client for sending usage statistics. diff --git a/configs/development.ini b/configs/development.ini --- a/configs/development.ini +++ b/configs/development.ini @@ -191,6 +191,12 @@ rc_cache.repo_object.expiration_time = 2 ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends #rc_cache.repo_object.arguments.distributed_lock = true +; Statsd client config +#statsd.enabled = false +#statsd.statsd_host = 0.0.0.0 +#statsd.statsd_port = 8125 +#statsd.statsd_prefix = +#statsd.statsd_ipv6 = false ; ##################### ; LOGGING CONFIGURATION diff --git a/configs/production.ini b/configs/production.ini --- a/configs/production.ini +++ b/configs/production.ini @@ -154,6 +154,12 @@ rc_cache.repo_object.expiration_time = 2 ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends #rc_cache.repo_object.arguments.distributed_lock = true +; Statsd client config +#statsd.enabled = false +#statsd.statsd_host = 0.0.0.0 +#statsd.statsd_port = 8125 +#statsd.statsd_prefix = +#statsd.statsd_ipv6 = false ; ##################### ; LOGGING CONFIGURATION diff --git a/vcsserver/http_main.py b/vcsserver/http_main.py --- a/vcsserver/http_main.py +++ b/vcsserver/http_main.py @@ -24,7 +24,6 @@ import uuid import wsgiref.util import traceback import tempfile -import resource import psutil from itertools import chain from cStringIO import StringIO @@ -261,6 +260,7 @@ class HTTPApplication(object): self.remote_wsgi = remote_wsgi_stub self._configure_settings(global_config, settings) + self._configure() def _configure_settings(self, global_config, app_settings): @@ -359,6 +359,10 @@ class HTTPApplication(object): 'vcsserver.lib.request_counter.get_request_counter', 'request_count') + self.config.add_request_method( + 'vcsserver.lib._vendor.statsd.get_statsd_client', + 'statsd', reify=True) + def wsgi_app(self): return self.config.make_wsgi_app() diff --git a/vcsserver/lib/_vendor/__init__.py b/vcsserver/lib/_vendor/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/__init__.py @@ -0,0 +1,26 @@ +# RhodeCode VCSServer provides access to different vcs backends via network. +# Copyright (C) 2014-2020 RhodeCode GmbH +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +# This package contains non rhodecode licensed packages that are +# vendored for various reasons + +import os +import sys + +vendor_dir = os.path.abspath(os.path.dirname(__file__)) + +sys.path.append(vendor_dir) diff --git a/vcsserver/lib/_vendor/statsd/__init__.py b/vcsserver/lib/_vendor/statsd/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/__init__.py @@ -0,0 +1,46 @@ +from __future__ import absolute_import, division, unicode_literals + +import logging + +from .stream import TCPStatsClient, UnixSocketStatsClient # noqa +from .udp import StatsClient # noqa + +HOST = 'localhost' +PORT = 8125 +IPV6 = False +PREFIX = None +MAXUDPSIZE = 512 + +log = logging.getLogger('rhodecode.statsd') + + +def statsd_config(config, prefix='statsd.'): + _config = {} + for key in config.keys(): + if key.startswith(prefix): + _config[key[len(prefix):]] = config[key] + return _config + + +def client_from_config(configuration, prefix='statsd.', **kwargs): + from pyramid.settings import asbool + + _config = statsd_config(configuration, prefix) + statsd_enabled = asbool(_config.pop('enabled', False)) + if not statsd_enabled: + log.debug('statsd client not enabled by statsd.enabled = flag, skipping...') + return + + host = _config.pop('statsd_host', HOST) + port = _config.pop('statsd_port', PORT) + prefix = _config.pop('statsd_prefix', PREFIX) + maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE) + ipv6 = asbool(_config.pop('statsd_ipv6', IPV6)) + log.debug('configured statsd client %s:%s', host, port) + + return StatsClient( + host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6) + + +def get_statsd_client(request): + return client_from_config(request.registry.settings) diff --git a/vcsserver/lib/_vendor/statsd/base.py b/vcsserver/lib/_vendor/statsd/base.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/base.py @@ -0,0 +1,107 @@ +from __future__ import absolute_import, division, unicode_literals + +import random +from collections import deque +from datetime import timedelta + +from .timer import Timer + + +class StatsClientBase(object): + """A Base class for various statsd clients.""" + + def close(self): + """Used to close and clean up any underlying resources.""" + raise NotImplementedError() + + def _send(self): + raise NotImplementedError() + + def pipeline(self): + raise NotImplementedError() + + def timer(self, stat, rate=1): + return Timer(self, stat, rate) + + def timing(self, stat, delta, rate=1): + """ + Send new timing information. + + `delta` can be either a number of milliseconds or a timedelta. + """ + if isinstance(delta, timedelta): + # Convert timedelta to number of milliseconds. + delta = delta.total_seconds() * 1000. + self._send_stat(stat, '%0.6f|ms' % delta, rate) + + def incr(self, stat, count=1, rate=1): + """Increment a stat by `count`.""" + self._send_stat(stat, '%s|c' % count, rate) + + def decr(self, stat, count=1, rate=1): + """Decrement a stat by `count`.""" + self.incr(stat, -count, rate) + + def gauge(self, stat, value, rate=1, delta=False): + """Set a gauge value.""" + if value < 0 and not delta: + if rate < 1: + if random.random() > rate: + return + with self.pipeline() as pipe: + pipe._send_stat(stat, '0|g', 1) + pipe._send_stat(stat, '%s|g' % value, 1) + else: + prefix = '+' if delta and value >= 0 else '' + self._send_stat(stat, '%s%s|g' % (prefix, value), rate) + + def set(self, stat, value, rate=1): + """Set a set value.""" + self._send_stat(stat, '%s|s' % value, rate) + + def _send_stat(self, stat, value, rate): + self._after(self._prepare(stat, value, rate)) + + def _prepare(self, stat, value, rate): + if rate < 1: + if random.random() > rate: + return + value = '%s|@%s' % (value, rate) + + if self._prefix: + stat = '%s.%s' % (self._prefix, stat) + + return '%s:%s' % (stat, value) + + def _after(self, data): + if data: + self._send(data) + + +class PipelineBase(StatsClientBase): + + def __init__(self, client): + self._client = client + self._prefix = client._prefix + self._stats = deque() + + def _send(self): + raise NotImplementedError() + + def _after(self, data): + if data is not None: + self._stats.append(data) + + def __enter__(self): + return self + + def __exit__(self, typ, value, tb): + self.send() + + def send(self): + if not self._stats: + return + self._send() + + def pipeline(self): + return self.__class__(self) diff --git a/vcsserver/lib/_vendor/statsd/stream.py b/vcsserver/lib/_vendor/statsd/stream.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/stream.py @@ -0,0 +1,75 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class StreamPipeline(PipelineBase): + def _send(self): + self._client._after('\n'.join(self._stats)) + self._stats.clear() + + +class StreamClientBase(StatsClientBase): + def connect(self): + raise NotImplementedError() + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def reconnect(self): + self.close() + self.connect() + + def pipeline(self): + return StreamPipeline(self) + + def _send(self, data): + """Send data to statsd.""" + if not self._sock: + self.connect() + self._do_send(data) + + def _do_send(self, data): + self._sock.sendall(data.encode('ascii') + b'\n') + + +class TCPStatsClient(StreamClientBase): + """TCP version of StatsClient.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + timeout=None, ipv6=False): + """Create a new client.""" + self._host = host + self._port = port + self._ipv6 = ipv6 + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + self._host, self._port, fam, socket.SOCK_STREAM)[0] + self._sock = socket.socket(family, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(addr) + + +class UnixSocketStatsClient(StreamClientBase): + """Unix domain socket version of StatsClient.""" + + def __init__(self, socket_path, prefix=None, timeout=None): + """Create a new client.""" + self._socket_path = socket_path + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(self._socket_path) diff --git a/vcsserver/lib/_vendor/statsd/timer.py b/vcsserver/lib/_vendor/statsd/timer.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/timer.py @@ -0,0 +1,71 @@ +from __future__ import absolute_import, division, unicode_literals + +import functools + +# Use timer that's not susceptible to time of day adjustments. +try: + # perf_counter is only present on Py3.3+ + from time import perf_counter as time_now +except ImportError: + # fall back to using time + from time import time as time_now + + +def safe_wraps(wrapper, *args, **kwargs): + """Safely wraps partial functions.""" + while isinstance(wrapper, functools.partial): + wrapper = wrapper.func + return functools.wraps(wrapper, *args, **kwargs) + + +class Timer(object): + """A context manager/decorator for statsd.timing().""" + + def __init__(self, client, stat, rate=1): + self.client = client + self.stat = stat + self.rate = rate + self.ms = None + self._sent = False + self._start_time = None + + def __call__(self, f): + """Thread-safe timing function decorator.""" + @safe_wraps(f) + def _wrapped(*args, **kwargs): + start_time = time_now() + try: + return f(*args, **kwargs) + finally: + elapsed_time_ms = 1000.0 * (time_now() - start_time) + self.client.timing(self.stat, elapsed_time_ms, self.rate) + return _wrapped + + def __enter__(self): + return self.start() + + def __exit__(self, typ, value, tb): + self.stop() + + def start(self): + self.ms = None + self._sent = False + self._start_time = time_now() + return self + + def stop(self, send=True): + if self._start_time is None: + raise RuntimeError('Timer has not started.') + dt = time_now() - self._start_time + self.ms = 1000.0 * dt # Convert to milliseconds. + if send: + self.send() + return self + + def send(self): + if self.ms is None: + raise RuntimeError('No data recorded.') + if self._sent: + raise RuntimeError('Already sent data.') + self._sent = True + self.client.timing(self.stat, self.ms, self.rate) diff --git a/vcsserver/lib/_vendor/statsd/udp.py b/vcsserver/lib/_vendor/statsd/udp.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/udp.py @@ -0,0 +1,55 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class Pipeline(PipelineBase): + + def __init__(self, client): + super(Pipeline, self).__init__(client) + self._maxudpsize = client._maxudpsize + + def _send(self): + data = self._stats.popleft() + while self._stats: + # Use popleft to preserve the order of the stats. + stat = self._stats.popleft() + if len(stat) + len(data) + 1 >= self._maxudpsize: + self._client._after(data) + data = stat + else: + data += '\n' + stat + self._client._after(data) + + +class StatsClient(StatsClientBase): + """A client for statsd.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + maxudpsize=512, ipv6=False): + """Create a new client.""" + fam = socket.AF_INET6 if ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + host, port, fam, socket.SOCK_DGRAM)[0] + self._addr = addr + self._sock = socket.socket(family, socket.SOCK_DGRAM) + self._prefix = prefix + self._maxudpsize = maxudpsize + + def _send(self, data): + """Send data to statsd.""" + try: + self._sock.sendto(data.encode('ascii'), self._addr) + except (socket.error, RuntimeError): + # No time for love, Dr. Jones! + pass + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def pipeline(self): + return Pipeline(self) diff --git a/vcsserver/tweens/request_wrapper.py b/vcsserver/tweens/request_wrapper.py --- a/vcsserver/tweens/request_wrapper.py +++ b/vcsserver/tweens/request_wrapper.py @@ -43,17 +43,23 @@ class RequestWrapperTween(object): def __call__(self, request): start = time.time() + log.debug('Starting request time measurement') try: response = self.handler(request) finally: - end = time.time() - total = end - start count = request.request_count() _ver_ = vcsserver.__version__ + statsd = request.statsd + total = time.time() - start + if statsd: + statsd.timing('vcsserver.req.timing', total) + statsd.incr('vcsserver.req.count') log.info( 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s', count, '127.0.0.1', request.environ.get('REQUEST_METHOD'), - safe_str(get_access_path(request)), total, get_user_agent(request.environ), _ver_) + safe_str(get_access_path(request)), total, + get_user_agent(request.environ), _ver_ + ) return response