Show More
@@ -0,0 +1,46 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import logging | |
|
4 | ||
|
5 | from .stream import TCPStatsClient, UnixSocketStatsClient # noqa | |
|
6 | from .udp import StatsClient # noqa | |
|
7 | ||
|
8 | HOST = 'localhost' | |
|
9 | PORT = 8125 | |
|
10 | IPV6 = False | |
|
11 | PREFIX = None | |
|
12 | MAXUDPSIZE = 512 | |
|
13 | ||
|
14 | log = logging.getLogger('rhodecode.statsd') | |
|
15 | ||
|
16 | ||
|
17 | def statsd_config(config, prefix='statsd.'): | |
|
18 | _config = {} | |
|
19 | for key in config.keys(): | |
|
20 | if key.startswith(prefix): | |
|
21 | _config[key[len(prefix):]] = config[key] | |
|
22 | return _config | |
|
23 | ||
|
24 | ||
|
25 | def client_from_config(configuration, prefix='statsd.', **kwargs): | |
|
26 | from pyramid.settings import asbool | |
|
27 | ||
|
28 | _config = statsd_config(configuration, prefix) | |
|
29 | statsd_enabled = asbool(_config.pop('enabled', False)) | |
|
30 | if not statsd_enabled: | |
|
31 | log.debug('statsd client not enabled by statsd.enabled = flag, skipping...') | |
|
32 | return | |
|
33 | ||
|
34 | host = _config.pop('statsd_host', HOST) | |
|
35 | port = _config.pop('statsd_port', PORT) | |
|
36 | prefix = _config.pop('statsd_prefix', PREFIX) | |
|
37 | maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE) | |
|
38 | ipv6 = asbool(_config.pop('statsd_ipv6', IPV6)) | |
|
39 | log.debug('configured statsd client %s:%s', host, port) | |
|
40 | ||
|
41 | return StatsClient( | |
|
42 | host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6) | |
|
43 | ||
|
44 | ||
|
45 | def get_statsd_client(request): | |
|
46 | return client_from_config(request.registry.settings) |
@@ -0,0 +1,107 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import random | |
|
4 | from collections import deque | |
|
5 | from datetime import timedelta | |
|
6 | ||
|
7 | from .timer import Timer | |
|
8 | ||
|
9 | ||
|
10 | class StatsClientBase(object): | |
|
11 | """A Base class for various statsd clients.""" | |
|
12 | ||
|
13 | def close(self): | |
|
14 | """Used to close and clean up any underlying resources.""" | |
|
15 | raise NotImplementedError() | |
|
16 | ||
|
17 | def _send(self): | |
|
18 | raise NotImplementedError() | |
|
19 | ||
|
20 | def pipeline(self): | |
|
21 | raise NotImplementedError() | |
|
22 | ||
|
23 | def timer(self, stat, rate=1): | |
|
24 | return Timer(self, stat, rate) | |
|
25 | ||
|
26 | def timing(self, stat, delta, rate=1): | |
|
27 | """ | |
|
28 | Send new timing information. | |
|
29 | ||
|
30 | `delta` can be either a number of milliseconds or a timedelta. | |
|
31 | """ | |
|
32 | if isinstance(delta, timedelta): | |
|
33 | # Convert timedelta to number of milliseconds. | |
|
34 | delta = delta.total_seconds() * 1000. | |
|
35 | self._send_stat(stat, '%0.6f|ms' % delta, rate) | |
|
36 | ||
|
37 | def incr(self, stat, count=1, rate=1): | |
|
38 | """Increment a stat by `count`.""" | |
|
39 | self._send_stat(stat, '%s|c' % count, rate) | |
|
40 | ||
|
41 | def decr(self, stat, count=1, rate=1): | |
|
42 | """Decrement a stat by `count`.""" | |
|
43 | self.incr(stat, -count, rate) | |
|
44 | ||
|
45 | def gauge(self, stat, value, rate=1, delta=False): | |
|
46 | """Set a gauge value.""" | |
|
47 | if value < 0 and not delta: | |
|
48 | if rate < 1: | |
|
49 | if random.random() > rate: | |
|
50 | return | |
|
51 | with self.pipeline() as pipe: | |
|
52 | pipe._send_stat(stat, '0|g', 1) | |
|
53 | pipe._send_stat(stat, '%s|g' % value, 1) | |
|
54 | else: | |
|
55 | prefix = '+' if delta and value >= 0 else '' | |
|
56 | self._send_stat(stat, '%s%s|g' % (prefix, value), rate) | |
|
57 | ||
|
58 | def set(self, stat, value, rate=1): | |
|
59 | """Set a set value.""" | |
|
60 | self._send_stat(stat, '%s|s' % value, rate) | |
|
61 | ||
|
62 | def _send_stat(self, stat, value, rate): | |
|
63 | self._after(self._prepare(stat, value, rate)) | |
|
64 | ||
|
65 | def _prepare(self, stat, value, rate): | |
|
66 | if rate < 1: | |
|
67 | if random.random() > rate: | |
|
68 | return | |
|
69 | value = '%s|@%s' % (value, rate) | |
|
70 | ||
|
71 | if self._prefix: | |
|
72 | stat = '%s.%s' % (self._prefix, stat) | |
|
73 | ||
|
74 | return '%s:%s' % (stat, value) | |
|
75 | ||
|
76 | def _after(self, data): | |
|
77 | if data: | |
|
78 | self._send(data) | |
|
79 | ||
|
80 | ||
|
81 | class PipelineBase(StatsClientBase): | |
|
82 | ||
|
83 | def __init__(self, client): | |
|
84 | self._client = client | |
|
85 | self._prefix = client._prefix | |
|
86 | self._stats = deque() | |
|
87 | ||
|
88 | def _send(self): | |
|
89 | raise NotImplementedError() | |
|
90 | ||
|
91 | def _after(self, data): | |
|
92 | if data is not None: | |
|
93 | self._stats.append(data) | |
|
94 | ||
|
95 | def __enter__(self): | |
|
96 | return self | |
|
97 | ||
|
98 | def __exit__(self, typ, value, tb): | |
|
99 | self.send() | |
|
100 | ||
|
101 | def send(self): | |
|
102 | if not self._stats: | |
|
103 | return | |
|
104 | self._send() | |
|
105 | ||
|
106 | def pipeline(self): | |
|
107 | return self.__class__(self) |
@@ -0,0 +1,75 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import socket | |
|
4 | ||
|
5 | from .base import StatsClientBase, PipelineBase | |
|
6 | ||
|
7 | ||
|
8 | class StreamPipeline(PipelineBase): | |
|
9 | def _send(self): | |
|
10 | self._client._after('\n'.join(self._stats)) | |
|
11 | self._stats.clear() | |
|
12 | ||
|
13 | ||
|
14 | class StreamClientBase(StatsClientBase): | |
|
15 | def connect(self): | |
|
16 | raise NotImplementedError() | |
|
17 | ||
|
18 | def close(self): | |
|
19 | if self._sock and hasattr(self._sock, 'close'): | |
|
20 | self._sock.close() | |
|
21 | self._sock = None | |
|
22 | ||
|
23 | def reconnect(self): | |
|
24 | self.close() | |
|
25 | self.connect() | |
|
26 | ||
|
27 | def pipeline(self): | |
|
28 | return StreamPipeline(self) | |
|
29 | ||
|
30 | def _send(self, data): | |
|
31 | """Send data to statsd.""" | |
|
32 | if not self._sock: | |
|
33 | self.connect() | |
|
34 | self._do_send(data) | |
|
35 | ||
|
36 | def _do_send(self, data): | |
|
37 | self._sock.sendall(data.encode('ascii') + b'\n') | |
|
38 | ||
|
39 | ||
|
40 | class TCPStatsClient(StreamClientBase): | |
|
41 | """TCP version of StatsClient.""" | |
|
42 | ||
|
43 | def __init__(self, host='localhost', port=8125, prefix=None, | |
|
44 | timeout=None, ipv6=False): | |
|
45 | """Create a new client.""" | |
|
46 | self._host = host | |
|
47 | self._port = port | |
|
48 | self._ipv6 = ipv6 | |
|
49 | self._timeout = timeout | |
|
50 | self._prefix = prefix | |
|
51 | self._sock = None | |
|
52 | ||
|
53 | def connect(self): | |
|
54 | fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET | |
|
55 | family, _, _, _, addr = socket.getaddrinfo( | |
|
56 | self._host, self._port, fam, socket.SOCK_STREAM)[0] | |
|
57 | self._sock = socket.socket(family, socket.SOCK_STREAM) | |
|
58 | self._sock.settimeout(self._timeout) | |
|
59 | self._sock.connect(addr) | |
|
60 | ||
|
61 | ||
|
62 | class UnixSocketStatsClient(StreamClientBase): | |
|
63 | """Unix domain socket version of StatsClient.""" | |
|
64 | ||
|
65 | def __init__(self, socket_path, prefix=None, timeout=None): | |
|
66 | """Create a new client.""" | |
|
67 | self._socket_path = socket_path | |
|
68 | self._timeout = timeout | |
|
69 | self._prefix = prefix | |
|
70 | self._sock = None | |
|
71 | ||
|
72 | def connect(self): | |
|
73 | self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
|
74 | self._sock.settimeout(self._timeout) | |
|
75 | self._sock.connect(self._socket_path) |
@@ -0,0 +1,71 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import functools | |
|
4 | ||
|
5 | # Use timer that's not susceptible to time of day adjustments. | |
|
6 | try: | |
|
7 | # perf_counter is only present on Py3.3+ | |
|
8 | from time import perf_counter as time_now | |
|
9 | except ImportError: | |
|
10 | # fall back to using time | |
|
11 | from time import time as time_now | |
|
12 | ||
|
13 | ||
|
14 | def safe_wraps(wrapper, *args, **kwargs): | |
|
15 | """Safely wraps partial functions.""" | |
|
16 | while isinstance(wrapper, functools.partial): | |
|
17 | wrapper = wrapper.func | |
|
18 | return functools.wraps(wrapper, *args, **kwargs) | |
|
19 | ||
|
20 | ||
|
21 | class Timer(object): | |
|
22 | """A context manager/decorator for statsd.timing().""" | |
|
23 | ||
|
24 | def __init__(self, client, stat, rate=1): | |
|
25 | self.client = client | |
|
26 | self.stat = stat | |
|
27 | self.rate = rate | |
|
28 | self.ms = None | |
|
29 | self._sent = False | |
|
30 | self._start_time = None | |
|
31 | ||
|
32 | def __call__(self, f): | |
|
33 | """Thread-safe timing function decorator.""" | |
|
34 | @safe_wraps(f) | |
|
35 | def _wrapped(*args, **kwargs): | |
|
36 | start_time = time_now() | |
|
37 | try: | |
|
38 | return f(*args, **kwargs) | |
|
39 | finally: | |
|
40 | elapsed_time_ms = 1000.0 * (time_now() - start_time) | |
|
41 | self.client.timing(self.stat, elapsed_time_ms, self.rate) | |
|
42 | return _wrapped | |
|
43 | ||
|
44 | def __enter__(self): | |
|
45 | return self.start() | |
|
46 | ||
|
47 | def __exit__(self, typ, value, tb): | |
|
48 | self.stop() | |
|
49 | ||
|
50 | def start(self): | |
|
51 | self.ms = None | |
|
52 | self._sent = False | |
|
53 | self._start_time = time_now() | |
|
54 | return self | |
|
55 | ||
|
56 | def stop(self, send=True): | |
|
57 | if self._start_time is None: | |
|
58 | raise RuntimeError('Timer has not started.') | |
|
59 | dt = time_now() - self._start_time | |
|
60 | self.ms = 1000.0 * dt # Convert to milliseconds. | |
|
61 | if send: | |
|
62 | self.send() | |
|
63 | return self | |
|
64 | ||
|
65 | def send(self): | |
|
66 | if self.ms is None: | |
|
67 | raise RuntimeError('No data recorded.') | |
|
68 | if self._sent: | |
|
69 | raise RuntimeError('Already sent data.') | |
|
70 | self._sent = True | |
|
71 | self.client.timing(self.stat, self.ms, self.rate) |
@@ -0,0 +1,55 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import socket | |
|
4 | ||
|
5 | from .base import StatsClientBase, PipelineBase | |
|
6 | ||
|
7 | ||
|
8 | class Pipeline(PipelineBase): | |
|
9 | ||
|
10 | def __init__(self, client): | |
|
11 | super(Pipeline, self).__init__(client) | |
|
12 | self._maxudpsize = client._maxudpsize | |
|
13 | ||
|
14 | def _send(self): | |
|
15 | data = self._stats.popleft() | |
|
16 | while self._stats: | |
|
17 | # Use popleft to preserve the order of the stats. | |
|
18 | stat = self._stats.popleft() | |
|
19 | if len(stat) + len(data) + 1 >= self._maxudpsize: | |
|
20 | self._client._after(data) | |
|
21 | data = stat | |
|
22 | else: | |
|
23 | data += '\n' + stat | |
|
24 | self._client._after(data) | |
|
25 | ||
|
26 | ||
|
27 | class StatsClient(StatsClientBase): | |
|
28 | """A client for statsd.""" | |
|
29 | ||
|
30 | def __init__(self, host='localhost', port=8125, prefix=None, | |
|
31 | maxudpsize=512, ipv6=False): | |
|
32 | """Create a new client.""" | |
|
33 | fam = socket.AF_INET6 if ipv6 else socket.AF_INET | |
|
34 | family, _, _, _, addr = socket.getaddrinfo( | |
|
35 | host, port, fam, socket.SOCK_DGRAM)[0] | |
|
36 | self._addr = addr | |
|
37 | self._sock = socket.socket(family, socket.SOCK_DGRAM) | |
|
38 | self._prefix = prefix | |
|
39 | self._maxudpsize = maxudpsize | |
|
40 | ||
|
41 | def _send(self, data): | |
|
42 | """Send data to statsd.""" | |
|
43 | try: | |
|
44 | self._sock.sendto(data.encode('ascii'), self._addr) | |
|
45 | except (socket.error, RuntimeError): | |
|
46 | # No time for love, Dr. Jones! | |
|
47 | pass | |
|
48 | ||
|
49 | def close(self): | |
|
50 | if self._sock and hasattr(self._sock, 'close'): | |
|
51 | self._sock.close() | |
|
52 | self._sock = None | |
|
53 | ||
|
54 | def pipeline(self): | |
|
55 | return Pipeline(self) |
@@ -340,6 +340,10 b' def includeme(config, auth_resources=Non' | |||
|
340 | 340 | 'rhodecode.lib.request_counter.get_request_counter', |
|
341 | 341 | 'request_count') |
|
342 | 342 | |
|
343 | config.add_request_method( | |
|
344 | 'rhodecode.lib._vendor.statsd.get_statsd_client', | |
|
345 | 'statsd', reify=True) | |
|
346 | ||
|
343 | 347 | # Set the authorization policy. |
|
344 | 348 | authz_policy = ACLAuthorizationPolicy() |
|
345 | 349 | config.set_authorization_policy(authz_policy) |
@@ -37,21 +37,29 b' class RequestWrapperTween(object):' | |||
|
37 | 37 | |
|
38 | 38 | # one-time configuration code goes here |
|
39 | 39 | |
|
40 | def _get_user_info(self, request): | |
|
41 | user = get_current_rhodecode_user(request) | |
|
42 | if not user: | |
|
43 | user = AuthUser.repr_user(ip=get_ip_addr(request.environ)) | |
|
44 | return user | |
|
45 | ||
|
40 | 46 | def __call__(self, request): |
|
41 | 47 | start = time.time() |
|
42 | 48 | log.debug('Starting request time measurement') |
|
43 | 49 | try: |
|
44 | 50 | response = self.handler(request) |
|
45 | 51 | finally: |
|
46 | end = time.time() | |
|
47 | total = end - start | |
|
48 | 52 | count = request.request_count() |
|
49 | 53 | _ver_ = rhodecode.__version__ |
|
50 | default_user_info = AuthUser.repr_user(ip=get_ip_addr(request.environ)) | |
|
51 | user_info = get_current_rhodecode_user(request) or default_user_info | |
|
54 | statsd = request.statsd | |
|
55 | total = time.time() - start | |
|
56 | if statsd: | |
|
57 | statsd.timing('rhodecode.req.timing', total) | |
|
58 | statsd.incr('rhodecode.req.count') | |
|
59 | ||
|
52 | 60 | log.info( |
|
53 | 61 | 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s', |
|
54 | count, user_info, request.environ.get('REQUEST_METHOD'), | |
|
62 | count, self._get_user_info(request), request.environ.get('REQUEST_METHOD'), | |
|
55 | 63 | safe_str(get_access_path(request.environ)), total, |
|
56 | 64 | get_user_agent(request. environ), _ver_ |
|
57 | 65 | ) |
General Comments 0
You need to be logged in to leave comments.
Login now