Show More
@@ -0,0 +1,46 b'' | |||||
|
1 | from __future__ import absolute_import, division, unicode_literals | |||
|
2 | ||||
|
3 | import logging | |||
|
4 | ||||
|
5 | from .stream import TCPStatsClient, UnixSocketStatsClient # noqa | |||
|
6 | from .udp import StatsClient # noqa | |||
|
7 | ||||
|
8 | HOST = 'localhost' | |||
|
9 | PORT = 8125 | |||
|
10 | IPV6 = False | |||
|
11 | PREFIX = None | |||
|
12 | MAXUDPSIZE = 512 | |||
|
13 | ||||
|
14 | log = logging.getLogger('rhodecode.statsd') | |||
|
15 | ||||
|
16 | ||||
|
17 | def statsd_config(config, prefix='statsd.'): | |||
|
18 | _config = {} | |||
|
19 | for key in config.keys(): | |||
|
20 | if key.startswith(prefix): | |||
|
21 | _config[key[len(prefix):]] = config[key] | |||
|
22 | return _config | |||
|
23 | ||||
|
24 | ||||
|
25 | def client_from_config(configuration, prefix='statsd.', **kwargs): | |||
|
26 | from pyramid.settings import asbool | |||
|
27 | ||||
|
28 | _config = statsd_config(configuration, prefix) | |||
|
29 | statsd_enabled = asbool(_config.pop('enabled', False)) | |||
|
30 | if not statsd_enabled: | |||
|
31 | log.debug('statsd client not enabled by statsd.enabled = flag, skipping...') | |||
|
32 | return | |||
|
33 | ||||
|
34 | host = _config.pop('statsd_host', HOST) | |||
|
35 | port = _config.pop('statsd_port', PORT) | |||
|
36 | prefix = _config.pop('statsd_prefix', PREFIX) | |||
|
37 | maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE) | |||
|
38 | ipv6 = asbool(_config.pop('statsd_ipv6', IPV6)) | |||
|
39 | log.debug('configured statsd client %s:%s', host, port) | |||
|
40 | ||||
|
41 | return StatsClient( | |||
|
42 | host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6) | |||
|
43 | ||||
|
44 | ||||
|
45 | def get_statsd_client(request): | |||
|
46 | return client_from_config(request.registry.settings) |
@@ -0,0 +1,107 b'' | |||||
|
1 | from __future__ import absolute_import, division, unicode_literals | |||
|
2 | ||||
|
3 | import random | |||
|
4 | from collections import deque | |||
|
5 | from datetime import timedelta | |||
|
6 | ||||
|
7 | from .timer import Timer | |||
|
8 | ||||
|
9 | ||||
|
10 | class StatsClientBase(object): | |||
|
11 | """A Base class for various statsd clients.""" | |||
|
12 | ||||
|
13 | def close(self): | |||
|
14 | """Used to close and clean up any underlying resources.""" | |||
|
15 | raise NotImplementedError() | |||
|
16 | ||||
|
17 | def _send(self): | |||
|
18 | raise NotImplementedError() | |||
|
19 | ||||
|
20 | def pipeline(self): | |||
|
21 | raise NotImplementedError() | |||
|
22 | ||||
|
23 | def timer(self, stat, rate=1): | |||
|
24 | return Timer(self, stat, rate) | |||
|
25 | ||||
|
26 | def timing(self, stat, delta, rate=1): | |||
|
27 | """ | |||
|
28 | Send new timing information. | |||
|
29 | ||||
|
30 | `delta` can be either a number of milliseconds or a timedelta. | |||
|
31 | """ | |||
|
32 | if isinstance(delta, timedelta): | |||
|
33 | # Convert timedelta to number of milliseconds. | |||
|
34 | delta = delta.total_seconds() * 1000. | |||
|
35 | self._send_stat(stat, '%0.6f|ms' % delta, rate) | |||
|
36 | ||||
|
37 | def incr(self, stat, count=1, rate=1): | |||
|
38 | """Increment a stat by `count`.""" | |||
|
39 | self._send_stat(stat, '%s|c' % count, rate) | |||
|
40 | ||||
|
41 | def decr(self, stat, count=1, rate=1): | |||
|
42 | """Decrement a stat by `count`.""" | |||
|
43 | self.incr(stat, -count, rate) | |||
|
44 | ||||
|
45 | def gauge(self, stat, value, rate=1, delta=False): | |||
|
46 | """Set a gauge value.""" | |||
|
47 | if value < 0 and not delta: | |||
|
48 | if rate < 1: | |||
|
49 | if random.random() > rate: | |||
|
50 | return | |||
|
51 | with self.pipeline() as pipe: | |||
|
52 | pipe._send_stat(stat, '0|g', 1) | |||
|
53 | pipe._send_stat(stat, '%s|g' % value, 1) | |||
|
54 | else: | |||
|
55 | prefix = '+' if delta and value >= 0 else '' | |||
|
56 | self._send_stat(stat, '%s%s|g' % (prefix, value), rate) | |||
|
57 | ||||
|
58 | def set(self, stat, value, rate=1): | |||
|
59 | """Set a set value.""" | |||
|
60 | self._send_stat(stat, '%s|s' % value, rate) | |||
|
61 | ||||
|
62 | def _send_stat(self, stat, value, rate): | |||
|
63 | self._after(self._prepare(stat, value, rate)) | |||
|
64 | ||||
|
65 | def _prepare(self, stat, value, rate): | |||
|
66 | if rate < 1: | |||
|
67 | if random.random() > rate: | |||
|
68 | return | |||
|
69 | value = '%s|@%s' % (value, rate) | |||
|
70 | ||||
|
71 | if self._prefix: | |||
|
72 | stat = '%s.%s' % (self._prefix, stat) | |||
|
73 | ||||
|
74 | return '%s:%s' % (stat, value) | |||
|
75 | ||||
|
76 | def _after(self, data): | |||
|
77 | if data: | |||
|
78 | self._send(data) | |||
|
79 | ||||
|
80 | ||||
|
81 | class PipelineBase(StatsClientBase): | |||
|
82 | ||||
|
83 | def __init__(self, client): | |||
|
84 | self._client = client | |||
|
85 | self._prefix = client._prefix | |||
|
86 | self._stats = deque() | |||
|
87 | ||||
|
88 | def _send(self): | |||
|
89 | raise NotImplementedError() | |||
|
90 | ||||
|
91 | def _after(self, data): | |||
|
92 | if data is not None: | |||
|
93 | self._stats.append(data) | |||
|
94 | ||||
|
95 | def __enter__(self): | |||
|
96 | return self | |||
|
97 | ||||
|
98 | def __exit__(self, typ, value, tb): | |||
|
99 | self.send() | |||
|
100 | ||||
|
101 | def send(self): | |||
|
102 | if not self._stats: | |||
|
103 | return | |||
|
104 | self._send() | |||
|
105 | ||||
|
106 | def pipeline(self): | |||
|
107 | return self.__class__(self) |
@@ -0,0 +1,75 b'' | |||||
|
1 | from __future__ import absolute_import, division, unicode_literals | |||
|
2 | ||||
|
3 | import socket | |||
|
4 | ||||
|
5 | from .base import StatsClientBase, PipelineBase | |||
|
6 | ||||
|
7 | ||||
|
8 | class StreamPipeline(PipelineBase): | |||
|
9 | def _send(self): | |||
|
10 | self._client._after('\n'.join(self._stats)) | |||
|
11 | self._stats.clear() | |||
|
12 | ||||
|
13 | ||||
|
14 | class StreamClientBase(StatsClientBase): | |||
|
15 | def connect(self): | |||
|
16 | raise NotImplementedError() | |||
|
17 | ||||
|
18 | def close(self): | |||
|
19 | if self._sock and hasattr(self._sock, 'close'): | |||
|
20 | self._sock.close() | |||
|
21 | self._sock = None | |||
|
22 | ||||
|
23 | def reconnect(self): | |||
|
24 | self.close() | |||
|
25 | self.connect() | |||
|
26 | ||||
|
27 | def pipeline(self): | |||
|
28 | return StreamPipeline(self) | |||
|
29 | ||||
|
30 | def _send(self, data): | |||
|
31 | """Send data to statsd.""" | |||
|
32 | if not self._sock: | |||
|
33 | self.connect() | |||
|
34 | self._do_send(data) | |||
|
35 | ||||
|
36 | def _do_send(self, data): | |||
|
37 | self._sock.sendall(data.encode('ascii') + b'\n') | |||
|
38 | ||||
|
39 | ||||
|
40 | class TCPStatsClient(StreamClientBase): | |||
|
41 | """TCP version of StatsClient.""" | |||
|
42 | ||||
|
43 | def __init__(self, host='localhost', port=8125, prefix=None, | |||
|
44 | timeout=None, ipv6=False): | |||
|
45 | """Create a new client.""" | |||
|
46 | self._host = host | |||
|
47 | self._port = port | |||
|
48 | self._ipv6 = ipv6 | |||
|
49 | self._timeout = timeout | |||
|
50 | self._prefix = prefix | |||
|
51 | self._sock = None | |||
|
52 | ||||
|
53 | def connect(self): | |||
|
54 | fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET | |||
|
55 | family, _, _, _, addr = socket.getaddrinfo( | |||
|
56 | self._host, self._port, fam, socket.SOCK_STREAM)[0] | |||
|
57 | self._sock = socket.socket(family, socket.SOCK_STREAM) | |||
|
58 | self._sock.settimeout(self._timeout) | |||
|
59 | self._sock.connect(addr) | |||
|
60 | ||||
|
61 | ||||
|
62 | class UnixSocketStatsClient(StreamClientBase): | |||
|
63 | """Unix domain socket version of StatsClient.""" | |||
|
64 | ||||
|
65 | def __init__(self, socket_path, prefix=None, timeout=None): | |||
|
66 | """Create a new client.""" | |||
|
67 | self._socket_path = socket_path | |||
|
68 | self._timeout = timeout | |||
|
69 | self._prefix = prefix | |||
|
70 | self._sock = None | |||
|
71 | ||||
|
72 | def connect(self): | |||
|
73 | self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |||
|
74 | self._sock.settimeout(self._timeout) | |||
|
75 | self._sock.connect(self._socket_path) |
@@ -0,0 +1,71 b'' | |||||
|
1 | from __future__ import absolute_import, division, unicode_literals | |||
|
2 | ||||
|
3 | import functools | |||
|
4 | ||||
|
5 | # Use timer that's not susceptible to time of day adjustments. | |||
|
6 | try: | |||
|
7 | # perf_counter is only present on Py3.3+ | |||
|
8 | from time import perf_counter as time_now | |||
|
9 | except ImportError: | |||
|
10 | # fall back to using time | |||
|
11 | from time import time as time_now | |||
|
12 | ||||
|
13 | ||||
|
14 | def safe_wraps(wrapper, *args, **kwargs): | |||
|
15 | """Safely wraps partial functions.""" | |||
|
16 | while isinstance(wrapper, functools.partial): | |||
|
17 | wrapper = wrapper.func | |||
|
18 | return functools.wraps(wrapper, *args, **kwargs) | |||
|
19 | ||||
|
20 | ||||
|
21 | class Timer(object): | |||
|
22 | """A context manager/decorator for statsd.timing().""" | |||
|
23 | ||||
|
24 | def __init__(self, client, stat, rate=1): | |||
|
25 | self.client = client | |||
|
26 | self.stat = stat | |||
|
27 | self.rate = rate | |||
|
28 | self.ms = None | |||
|
29 | self._sent = False | |||
|
30 | self._start_time = None | |||
|
31 | ||||
|
32 | def __call__(self, f): | |||
|
33 | """Thread-safe timing function decorator.""" | |||
|
34 | @safe_wraps(f) | |||
|
35 | def _wrapped(*args, **kwargs): | |||
|
36 | start_time = time_now() | |||
|
37 | try: | |||
|
38 | return f(*args, **kwargs) | |||
|
39 | finally: | |||
|
40 | elapsed_time_ms = 1000.0 * (time_now() - start_time) | |||
|
41 | self.client.timing(self.stat, elapsed_time_ms, self.rate) | |||
|
42 | return _wrapped | |||
|
43 | ||||
|
44 | def __enter__(self): | |||
|
45 | return self.start() | |||
|
46 | ||||
|
47 | def __exit__(self, typ, value, tb): | |||
|
48 | self.stop() | |||
|
49 | ||||
|
50 | def start(self): | |||
|
51 | self.ms = None | |||
|
52 | self._sent = False | |||
|
53 | self._start_time = time_now() | |||
|
54 | return self | |||
|
55 | ||||
|
56 | def stop(self, send=True): | |||
|
57 | if self._start_time is None: | |||
|
58 | raise RuntimeError('Timer has not started.') | |||
|
59 | dt = time_now() - self._start_time | |||
|
60 | self.ms = 1000.0 * dt # Convert to milliseconds. | |||
|
61 | if send: | |||
|
62 | self.send() | |||
|
63 | return self | |||
|
64 | ||||
|
65 | def send(self): | |||
|
66 | if self.ms is None: | |||
|
67 | raise RuntimeError('No data recorded.') | |||
|
68 | if self._sent: | |||
|
69 | raise RuntimeError('Already sent data.') | |||
|
70 | self._sent = True | |||
|
71 | self.client.timing(self.stat, self.ms, self.rate) |
@@ -0,0 +1,55 b'' | |||||
|
1 | from __future__ import absolute_import, division, unicode_literals | |||
|
2 | ||||
|
3 | import socket | |||
|
4 | ||||
|
5 | from .base import StatsClientBase, PipelineBase | |||
|
6 | ||||
|
7 | ||||
|
8 | class Pipeline(PipelineBase): | |||
|
9 | ||||
|
10 | def __init__(self, client): | |||
|
11 | super(Pipeline, self).__init__(client) | |||
|
12 | self._maxudpsize = client._maxudpsize | |||
|
13 | ||||
|
14 | def _send(self): | |||
|
15 | data = self._stats.popleft() | |||
|
16 | while self._stats: | |||
|
17 | # Use popleft to preserve the order of the stats. | |||
|
18 | stat = self._stats.popleft() | |||
|
19 | if len(stat) + len(data) + 1 >= self._maxudpsize: | |||
|
20 | self._client._after(data) | |||
|
21 | data = stat | |||
|
22 | else: | |||
|
23 | data += '\n' + stat | |||
|
24 | self._client._after(data) | |||
|
25 | ||||
|
26 | ||||
|
27 | class StatsClient(StatsClientBase): | |||
|
28 | """A client for statsd.""" | |||
|
29 | ||||
|
30 | def __init__(self, host='localhost', port=8125, prefix=None, | |||
|
31 | maxudpsize=512, ipv6=False): | |||
|
32 | """Create a new client.""" | |||
|
33 | fam = socket.AF_INET6 if ipv6 else socket.AF_INET | |||
|
34 | family, _, _, _, addr = socket.getaddrinfo( | |||
|
35 | host, port, fam, socket.SOCK_DGRAM)[0] | |||
|
36 | self._addr = addr | |||
|
37 | self._sock = socket.socket(family, socket.SOCK_DGRAM) | |||
|
38 | self._prefix = prefix | |||
|
39 | self._maxudpsize = maxudpsize | |||
|
40 | ||||
|
41 | def _send(self, data): | |||
|
42 | """Send data to statsd.""" | |||
|
43 | try: | |||
|
44 | self._sock.sendto(data.encode('ascii'), self._addr) | |||
|
45 | except (socket.error, RuntimeError): | |||
|
46 | # No time for love, Dr. Jones! | |||
|
47 | pass | |||
|
48 | ||||
|
49 | def close(self): | |||
|
50 | if self._sock and hasattr(self._sock, 'close'): | |||
|
51 | self._sock.close() | |||
|
52 | self._sock = None | |||
|
53 | ||||
|
54 | def pipeline(self): | |||
|
55 | return Pipeline(self) |
@@ -340,6 +340,10 b' def includeme(config, auth_resources=Non' | |||||
340 | 'rhodecode.lib.request_counter.get_request_counter', |
|
340 | 'rhodecode.lib.request_counter.get_request_counter', | |
341 | 'request_count') |
|
341 | 'request_count') | |
342 |
|
342 | |||
|
343 | config.add_request_method( | |||
|
344 | 'rhodecode.lib._vendor.statsd.get_statsd_client', | |||
|
345 | 'statsd', reify=True) | |||
|
346 | ||||
343 | # Set the authorization policy. |
|
347 | # Set the authorization policy. | |
344 | authz_policy = ACLAuthorizationPolicy() |
|
348 | authz_policy = ACLAuthorizationPolicy() | |
345 | config.set_authorization_policy(authz_policy) |
|
349 | config.set_authorization_policy(authz_policy) |
@@ -37,21 +37,29 b' class RequestWrapperTween(object):' | |||||
37 |
|
37 | |||
38 | # one-time configuration code goes here |
|
38 | # one-time configuration code goes here | |
39 |
|
39 | |||
|
40 | def _get_user_info(self, request): | |||
|
41 | user = get_current_rhodecode_user(request) | |||
|
42 | if not user: | |||
|
43 | user = AuthUser.repr_user(ip=get_ip_addr(request.environ)) | |||
|
44 | return user | |||
|
45 | ||||
40 | def __call__(self, request): |
|
46 | def __call__(self, request): | |
41 | start = time.time() |
|
47 | start = time.time() | |
42 | log.debug('Starting request time measurement') |
|
48 | log.debug('Starting request time measurement') | |
43 | try: |
|
49 | try: | |
44 | response = self.handler(request) |
|
50 | response = self.handler(request) | |
45 | finally: |
|
51 | finally: | |
46 | end = time.time() |
|
|||
47 | total = end - start |
|
|||
48 | count = request.request_count() |
|
52 | count = request.request_count() | |
49 | _ver_ = rhodecode.__version__ |
|
53 | _ver_ = rhodecode.__version__ | |
50 | default_user_info = AuthUser.repr_user(ip=get_ip_addr(request.environ)) |
|
54 | statsd = request.statsd | |
51 | user_info = get_current_rhodecode_user(request) or default_user_info |
|
55 | total = time.time() - start | |
|
56 | if statsd: | |||
|
57 | statsd.timing('rhodecode.req.timing', total) | |||
|
58 | statsd.incr('rhodecode.req.count') | |||
|
59 | ||||
52 | log.info( |
|
60 | log.info( | |
53 | 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s', |
|
61 | 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s', | |
54 | count, user_info, request.environ.get('REQUEST_METHOD'), |
|
62 | count, self._get_user_info(request), request.environ.get('REQUEST_METHOD'), | |
55 | safe_str(get_access_path(request.environ)), total, |
|
63 | safe_str(get_access_path(request.environ)), total, | |
56 | get_user_agent(request. environ), _ver_ |
|
64 | get_user_agent(request. environ), _ver_ | |
57 | ) |
|
65 | ) |
General Comments 0
You need to be logged in to leave comments.
Login now