diff --git a/rhodecode/config/middleware.py b/rhodecode/config/middleware.py --- a/rhodecode/config/middleware.py +++ b/rhodecode/config/middleware.py @@ -340,6 +340,10 @@ def includeme(config, auth_resources=Non 'rhodecode.lib.request_counter.get_request_counter', 'request_count') + config.add_request_method( + 'rhodecode.lib._vendor.statsd.get_statsd_client', + 'statsd', reify=True) + # Set the authorization policy. authz_policy = ACLAuthorizationPolicy() config.set_authorization_policy(authz_policy) diff --git a/rhodecode/lib/_vendor/statsd/__init__.py b/rhodecode/lib/_vendor/statsd/__init__.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/_vendor/statsd/__init__.py @@ -0,0 +1,46 @@ +from __future__ import absolute_import, division, unicode_literals + +import logging + +from .stream import TCPStatsClient, UnixSocketStatsClient # noqa +from .udp import StatsClient # noqa + +HOST = 'localhost' +PORT = 8125 +IPV6 = False +PREFIX = None +MAXUDPSIZE = 512 + +log = logging.getLogger('rhodecode.statsd') + + +def statsd_config(config, prefix='statsd.'): + _config = {} + for key in config.keys(): + if key.startswith(prefix): + _config[key[len(prefix):]] = config[key] + return _config + + +def client_from_config(configuration, prefix='statsd.', **kwargs): + from pyramid.settings import asbool + + _config = statsd_config(configuration, prefix) + statsd_enabled = asbool(_config.pop('enabled', False)) + if not statsd_enabled: + log.debug('statsd client not enabled by statsd.enabled = flag, skipping...') + return + + host = _config.pop('statsd_host', HOST) + port = _config.pop('statsd_port', PORT) + prefix = _config.pop('statsd_prefix', PREFIX) + maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE) + ipv6 = asbool(_config.pop('statsd_ipv6', IPV6)) + log.debug('configured statsd client %s:%s', host, port) + + return StatsClient( + host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6) + + +def get_statsd_client(request): + return client_from_config(request.registry.settings) diff --git a/rhodecode/lib/_vendor/statsd/base.py b/rhodecode/lib/_vendor/statsd/base.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/_vendor/statsd/base.py @@ -0,0 +1,107 @@ +from __future__ import absolute_import, division, unicode_literals + +import random +from collections import deque +from datetime import timedelta + +from .timer import Timer + + +class StatsClientBase(object): + """A Base class for various statsd clients.""" + + def close(self): + """Used to close and clean up any underlying resources.""" + raise NotImplementedError() + + def _send(self): + raise NotImplementedError() + + def pipeline(self): + raise NotImplementedError() + + def timer(self, stat, rate=1): + return Timer(self, stat, rate) + + def timing(self, stat, delta, rate=1): + """ + Send new timing information. + + `delta` can be either a number of milliseconds or a timedelta. + """ + if isinstance(delta, timedelta): + # Convert timedelta to number of milliseconds. + delta = delta.total_seconds() * 1000. + self._send_stat(stat, '%0.6f|ms' % delta, rate) + + def incr(self, stat, count=1, rate=1): + """Increment a stat by `count`.""" + self._send_stat(stat, '%s|c' % count, rate) + + def decr(self, stat, count=1, rate=1): + """Decrement a stat by `count`.""" + self.incr(stat, -count, rate) + + def gauge(self, stat, value, rate=1, delta=False): + """Set a gauge value.""" + if value < 0 and not delta: + if rate < 1: + if random.random() > rate: + return + with self.pipeline() as pipe: + pipe._send_stat(stat, '0|g', 1) + pipe._send_stat(stat, '%s|g' % value, 1) + else: + prefix = '+' if delta and value >= 0 else '' + self._send_stat(stat, '%s%s|g' % (prefix, value), rate) + + def set(self, stat, value, rate=1): + """Set a set value.""" + self._send_stat(stat, '%s|s' % value, rate) + + def _send_stat(self, stat, value, rate): + self._after(self._prepare(stat, value, rate)) + + def _prepare(self, stat, value, rate): + if rate < 1: + if random.random() > rate: + return + value = '%s|@%s' % (value, rate) + + if self._prefix: + stat = '%s.%s' % (self._prefix, stat) + + return '%s:%s' % (stat, value) + + def _after(self, data): + if data: + self._send(data) + + +class PipelineBase(StatsClientBase): + + def __init__(self, client): + self._client = client + self._prefix = client._prefix + self._stats = deque() + + def _send(self): + raise NotImplementedError() + + def _after(self, data): + if data is not None: + self._stats.append(data) + + def __enter__(self): + return self + + def __exit__(self, typ, value, tb): + self.send() + + def send(self): + if not self._stats: + return + self._send() + + def pipeline(self): + return self.__class__(self) diff --git a/rhodecode/lib/_vendor/statsd/stream.py b/rhodecode/lib/_vendor/statsd/stream.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/_vendor/statsd/stream.py @@ -0,0 +1,75 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class StreamPipeline(PipelineBase): + def _send(self): + self._client._after('\n'.join(self._stats)) + self._stats.clear() + + +class StreamClientBase(StatsClientBase): + def connect(self): + raise NotImplementedError() + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def reconnect(self): + self.close() + self.connect() + + def pipeline(self): + return StreamPipeline(self) + + def _send(self, data): + """Send data to statsd.""" + if not self._sock: + self.connect() + self._do_send(data) + + def _do_send(self, data): + self._sock.sendall(data.encode('ascii') + b'\n') + + +class TCPStatsClient(StreamClientBase): + """TCP version of StatsClient.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + timeout=None, ipv6=False): + """Create a new client.""" + self._host = host + self._port = port + self._ipv6 = ipv6 + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + self._host, self._port, fam, socket.SOCK_STREAM)[0] + self._sock = socket.socket(family, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(addr) + + +class UnixSocketStatsClient(StreamClientBase): + """Unix domain socket version of StatsClient.""" + + def __init__(self, socket_path, prefix=None, timeout=None): + """Create a new client.""" + self._socket_path = socket_path + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(self._socket_path) diff --git a/rhodecode/lib/_vendor/statsd/timer.py b/rhodecode/lib/_vendor/statsd/timer.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/_vendor/statsd/timer.py @@ -0,0 +1,71 @@ +from __future__ import absolute_import, division, unicode_literals + +import functools + +# Use timer that's not susceptible to time of day adjustments. +try: + # perf_counter is only present on Py3.3+ + from time import perf_counter as time_now +except ImportError: + # fall back to using time + from time import time as time_now + + +def safe_wraps(wrapper, *args, **kwargs): + """Safely wraps partial functions.""" + while isinstance(wrapper, functools.partial): + wrapper = wrapper.func + return functools.wraps(wrapper, *args, **kwargs) + + +class Timer(object): + """A context manager/decorator for statsd.timing().""" + + def __init__(self, client, stat, rate=1): + self.client = client + self.stat = stat + self.rate = rate + self.ms = None + self._sent = False + self._start_time = None + + def __call__(self, f): + """Thread-safe timing function decorator.""" + @safe_wraps(f) + def _wrapped(*args, **kwargs): + start_time = time_now() + try: + return f(*args, **kwargs) + finally: + elapsed_time_ms = 1000.0 * (time_now() - start_time) + self.client.timing(self.stat, elapsed_time_ms, self.rate) + return _wrapped + + def __enter__(self): + return self.start() + + def __exit__(self, typ, value, tb): + self.stop() + + def start(self): + self.ms = None + self._sent = False + self._start_time = time_now() + return self + + def stop(self, send=True): + if self._start_time is None: + raise RuntimeError('Timer has not started.') + dt = time_now() - self._start_time + self.ms = 1000.0 * dt # Convert to milliseconds. + if send: + self.send() + return self + + def send(self): + if self.ms is None: + raise RuntimeError('No data recorded.') + if self._sent: + raise RuntimeError('Already sent data.') + self._sent = True + self.client.timing(self.stat, self.ms, self.rate) diff --git a/rhodecode/lib/_vendor/statsd/udp.py b/rhodecode/lib/_vendor/statsd/udp.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/_vendor/statsd/udp.py @@ -0,0 +1,55 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class Pipeline(PipelineBase): + + def __init__(self, client): + super(Pipeline, self).__init__(client) + self._maxudpsize = client._maxudpsize + + def _send(self): + data = self._stats.popleft() + while self._stats: + # Use popleft to preserve the order of the stats. + stat = self._stats.popleft() + if len(stat) + len(data) + 1 >= self._maxudpsize: + self._client._after(data) + data = stat + else: + data += '\n' + stat + self._client._after(data) + + +class StatsClient(StatsClientBase): + """A client for statsd.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + maxudpsize=512, ipv6=False): + """Create a new client.""" + fam = socket.AF_INET6 if ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + host, port, fam, socket.SOCK_DGRAM)[0] + self._addr = addr + self._sock = socket.socket(family, socket.SOCK_DGRAM) + self._prefix = prefix + self._maxudpsize = maxudpsize + + def _send(self, data): + """Send data to statsd.""" + try: + self._sock.sendto(data.encode('ascii'), self._addr) + except (socket.error, RuntimeError): + # No time for love, Dr. Jones! + pass + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def pipeline(self): + return Pipeline(self) diff --git a/rhodecode/lib/middleware/request_wrapper.py b/rhodecode/lib/middleware/request_wrapper.py --- a/rhodecode/lib/middleware/request_wrapper.py +++ b/rhodecode/lib/middleware/request_wrapper.py @@ -37,21 +37,29 @@ class RequestWrapperTween(object): # one-time configuration code goes here + def _get_user_info(self, request): + user = get_current_rhodecode_user(request) + if not user: + user = AuthUser.repr_user(ip=get_ip_addr(request.environ)) + return user + def __call__(self, request): start = time.time() log.debug('Starting request time measurement') try: response = self.handler(request) finally: - end = time.time() - total = end - start count = request.request_count() _ver_ = rhodecode.__version__ - default_user_info = AuthUser.repr_user(ip=get_ip_addr(request.environ)) - user_info = get_current_rhodecode_user(request) or default_user_info + statsd = request.statsd + total = time.time() - start + if statsd: + statsd.timing('rhodecode.req.timing', total) + statsd.incr('rhodecode.req.count') + log.info( 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s', - count, user_info, request.environ.get('REQUEST_METHOD'), + count, self._get_user_info(request), request.environ.get('REQUEST_METHOD'), safe_str(get_access_path(request.environ)), total, get_user_agent(request. environ), _ver_ )