Show More
@@ -0,0 +1,26 b'' | |||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |
|
2 | # Copyright (C) 2014-2020 RhodeCode GmbH | |
|
3 | # | |
|
4 | # This program is free software; you can redistribute it and/or modify | |
|
5 | # it under the terms of the GNU General Public License as published by | |
|
6 | # the Free Software Foundation; either version 3 of the License, or | |
|
7 | # (at your option) any later version. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU General Public License | |
|
15 | # along with this program; if not, write to the Free Software Foundation, | |
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
|
17 | ||
|
18 | # This package contains non rhodecode licensed packages that are | |
|
19 | # vendored for various reasons | |
|
20 | ||
|
21 | import os | |
|
22 | import sys | |
|
23 | ||
|
24 | vendor_dir = os.path.abspath(os.path.dirname(__file__)) | |
|
25 | ||
|
26 | sys.path.append(vendor_dir) |
@@ -0,0 +1,46 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import logging | |
|
4 | ||
|
5 | from .stream import TCPStatsClient, UnixSocketStatsClient # noqa | |
|
6 | from .udp import StatsClient # noqa | |
|
7 | ||
|
8 | HOST = 'localhost' | |
|
9 | PORT = 8125 | |
|
10 | IPV6 = False | |
|
11 | PREFIX = None | |
|
12 | MAXUDPSIZE = 512 | |
|
13 | ||
|
14 | log = logging.getLogger('rhodecode.statsd') | |
|
15 | ||
|
16 | ||
|
17 | def statsd_config(config, prefix='statsd.'): | |
|
18 | _config = {} | |
|
19 | for key in config.keys(): | |
|
20 | if key.startswith(prefix): | |
|
21 | _config[key[len(prefix):]] = config[key] | |
|
22 | return _config | |
|
23 | ||
|
24 | ||
|
25 | def client_from_config(configuration, prefix='statsd.', **kwargs): | |
|
26 | from pyramid.settings import asbool | |
|
27 | ||
|
28 | _config = statsd_config(configuration, prefix) | |
|
29 | statsd_enabled = asbool(_config.pop('enabled', False)) | |
|
30 | if not statsd_enabled: | |
|
31 | log.debug('statsd client not enabled by statsd.enabled = flag, skipping...') | |
|
32 | return | |
|
33 | ||
|
34 | host = _config.pop('statsd_host', HOST) | |
|
35 | port = _config.pop('statsd_port', PORT) | |
|
36 | prefix = _config.pop('statsd_prefix', PREFIX) | |
|
37 | maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE) | |
|
38 | ipv6 = asbool(_config.pop('statsd_ipv6', IPV6)) | |
|
39 | log.debug('configured statsd client %s:%s', host, port) | |
|
40 | ||
|
41 | return StatsClient( | |
|
42 | host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6) | |
|
43 | ||
|
44 | ||
|
45 | def get_statsd_client(request): | |
|
46 | return client_from_config(request.registry.settings) |
@@ -0,0 +1,107 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import random | |
|
4 | from collections import deque | |
|
5 | from datetime import timedelta | |
|
6 | ||
|
7 | from .timer import Timer | |
|
8 | ||
|
9 | ||
|
10 | class StatsClientBase(object): | |
|
11 | """A Base class for various statsd clients.""" | |
|
12 | ||
|
13 | def close(self): | |
|
14 | """Used to close and clean up any underlying resources.""" | |
|
15 | raise NotImplementedError() | |
|
16 | ||
|
17 | def _send(self): | |
|
18 | raise NotImplementedError() | |
|
19 | ||
|
20 | def pipeline(self): | |
|
21 | raise NotImplementedError() | |
|
22 | ||
|
23 | def timer(self, stat, rate=1): | |
|
24 | return Timer(self, stat, rate) | |
|
25 | ||
|
26 | def timing(self, stat, delta, rate=1): | |
|
27 | """ | |
|
28 | Send new timing information. | |
|
29 | ||
|
30 | `delta` can be either a number of milliseconds or a timedelta. | |
|
31 | """ | |
|
32 | if isinstance(delta, timedelta): | |
|
33 | # Convert timedelta to number of milliseconds. | |
|
34 | delta = delta.total_seconds() * 1000. | |
|
35 | self._send_stat(stat, '%0.6f|ms' % delta, rate) | |
|
36 | ||
|
37 | def incr(self, stat, count=1, rate=1): | |
|
38 | """Increment a stat by `count`.""" | |
|
39 | self._send_stat(stat, '%s|c' % count, rate) | |
|
40 | ||
|
41 | def decr(self, stat, count=1, rate=1): | |
|
42 | """Decrement a stat by `count`.""" | |
|
43 | self.incr(stat, -count, rate) | |
|
44 | ||
|
45 | def gauge(self, stat, value, rate=1, delta=False): | |
|
46 | """Set a gauge value.""" | |
|
47 | if value < 0 and not delta: | |
|
48 | if rate < 1: | |
|
49 | if random.random() > rate: | |
|
50 | return | |
|
51 | with self.pipeline() as pipe: | |
|
52 | pipe._send_stat(stat, '0|g', 1) | |
|
53 | pipe._send_stat(stat, '%s|g' % value, 1) | |
|
54 | else: | |
|
55 | prefix = '+' if delta and value >= 0 else '' | |
|
56 | self._send_stat(stat, '%s%s|g' % (prefix, value), rate) | |
|
57 | ||
|
58 | def set(self, stat, value, rate=1): | |
|
59 | """Set a set value.""" | |
|
60 | self._send_stat(stat, '%s|s' % value, rate) | |
|
61 | ||
|
62 | def _send_stat(self, stat, value, rate): | |
|
63 | self._after(self._prepare(stat, value, rate)) | |
|
64 | ||
|
65 | def _prepare(self, stat, value, rate): | |
|
66 | if rate < 1: | |
|
67 | if random.random() > rate: | |
|
68 | return | |
|
69 | value = '%s|@%s' % (value, rate) | |
|
70 | ||
|
71 | if self._prefix: | |
|
72 | stat = '%s.%s' % (self._prefix, stat) | |
|
73 | ||
|
74 | return '%s:%s' % (stat, value) | |
|
75 | ||
|
76 | def _after(self, data): | |
|
77 | if data: | |
|
78 | self._send(data) | |
|
79 | ||
|
80 | ||
|
81 | class PipelineBase(StatsClientBase): | |
|
82 | ||
|
83 | def __init__(self, client): | |
|
84 | self._client = client | |
|
85 | self._prefix = client._prefix | |
|
86 | self._stats = deque() | |
|
87 | ||
|
88 | def _send(self): | |
|
89 | raise NotImplementedError() | |
|
90 | ||
|
91 | def _after(self, data): | |
|
92 | if data is not None: | |
|
93 | self._stats.append(data) | |
|
94 | ||
|
95 | def __enter__(self): | |
|
96 | return self | |
|
97 | ||
|
98 | def __exit__(self, typ, value, tb): | |
|
99 | self.send() | |
|
100 | ||
|
101 | def send(self): | |
|
102 | if not self._stats: | |
|
103 | return | |
|
104 | self._send() | |
|
105 | ||
|
106 | def pipeline(self): | |
|
107 | return self.__class__(self) |
@@ -0,0 +1,75 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import socket | |
|
4 | ||
|
5 | from .base import StatsClientBase, PipelineBase | |
|
6 | ||
|
7 | ||
|
8 | class StreamPipeline(PipelineBase): | |
|
9 | def _send(self): | |
|
10 | self._client._after('\n'.join(self._stats)) | |
|
11 | self._stats.clear() | |
|
12 | ||
|
13 | ||
|
14 | class StreamClientBase(StatsClientBase): | |
|
15 | def connect(self): | |
|
16 | raise NotImplementedError() | |
|
17 | ||
|
18 | def close(self): | |
|
19 | if self._sock and hasattr(self._sock, 'close'): | |
|
20 | self._sock.close() | |
|
21 | self._sock = None | |
|
22 | ||
|
23 | def reconnect(self): | |
|
24 | self.close() | |
|
25 | self.connect() | |
|
26 | ||
|
27 | def pipeline(self): | |
|
28 | return StreamPipeline(self) | |
|
29 | ||
|
30 | def _send(self, data): | |
|
31 | """Send data to statsd.""" | |
|
32 | if not self._sock: | |
|
33 | self.connect() | |
|
34 | self._do_send(data) | |
|
35 | ||
|
36 | def _do_send(self, data): | |
|
37 | self._sock.sendall(data.encode('ascii') + b'\n') | |
|
38 | ||
|
39 | ||
|
40 | class TCPStatsClient(StreamClientBase): | |
|
41 | """TCP version of StatsClient.""" | |
|
42 | ||
|
43 | def __init__(self, host='localhost', port=8125, prefix=None, | |
|
44 | timeout=None, ipv6=False): | |
|
45 | """Create a new client.""" | |
|
46 | self._host = host | |
|
47 | self._port = port | |
|
48 | self._ipv6 = ipv6 | |
|
49 | self._timeout = timeout | |
|
50 | self._prefix = prefix | |
|
51 | self._sock = None | |
|
52 | ||
|
53 | def connect(self): | |
|
54 | fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET | |
|
55 | family, _, _, _, addr = socket.getaddrinfo( | |
|
56 | self._host, self._port, fam, socket.SOCK_STREAM)[0] | |
|
57 | self._sock = socket.socket(family, socket.SOCK_STREAM) | |
|
58 | self._sock.settimeout(self._timeout) | |
|
59 | self._sock.connect(addr) | |
|
60 | ||
|
61 | ||
|
62 | class UnixSocketStatsClient(StreamClientBase): | |
|
63 | """Unix domain socket version of StatsClient.""" | |
|
64 | ||
|
65 | def __init__(self, socket_path, prefix=None, timeout=None): | |
|
66 | """Create a new client.""" | |
|
67 | self._socket_path = socket_path | |
|
68 | self._timeout = timeout | |
|
69 | self._prefix = prefix | |
|
70 | self._sock = None | |
|
71 | ||
|
72 | def connect(self): | |
|
73 | self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
|
74 | self._sock.settimeout(self._timeout) | |
|
75 | self._sock.connect(self._socket_path) |
@@ -0,0 +1,71 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import functools | |
|
4 | ||
|
5 | # Use timer that's not susceptible to time of day adjustments. | |
|
6 | try: | |
|
7 | # perf_counter is only present on Py3.3+ | |
|
8 | from time import perf_counter as time_now | |
|
9 | except ImportError: | |
|
10 | # fall back to using time | |
|
11 | from time import time as time_now | |
|
12 | ||
|
13 | ||
|
14 | def safe_wraps(wrapper, *args, **kwargs): | |
|
15 | """Safely wraps partial functions.""" | |
|
16 | while isinstance(wrapper, functools.partial): | |
|
17 | wrapper = wrapper.func | |
|
18 | return functools.wraps(wrapper, *args, **kwargs) | |
|
19 | ||
|
20 | ||
|
21 | class Timer(object): | |
|
22 | """A context manager/decorator for statsd.timing().""" | |
|
23 | ||
|
24 | def __init__(self, client, stat, rate=1): | |
|
25 | self.client = client | |
|
26 | self.stat = stat | |
|
27 | self.rate = rate | |
|
28 | self.ms = None | |
|
29 | self._sent = False | |
|
30 | self._start_time = None | |
|
31 | ||
|
32 | def __call__(self, f): | |
|
33 | """Thread-safe timing function decorator.""" | |
|
34 | @safe_wraps(f) | |
|
35 | def _wrapped(*args, **kwargs): | |
|
36 | start_time = time_now() | |
|
37 | try: | |
|
38 | return f(*args, **kwargs) | |
|
39 | finally: | |
|
40 | elapsed_time_ms = 1000.0 * (time_now() - start_time) | |
|
41 | self.client.timing(self.stat, elapsed_time_ms, self.rate) | |
|
42 | return _wrapped | |
|
43 | ||
|
44 | def __enter__(self): | |
|
45 | return self.start() | |
|
46 | ||
|
47 | def __exit__(self, typ, value, tb): | |
|
48 | self.stop() | |
|
49 | ||
|
50 | def start(self): | |
|
51 | self.ms = None | |
|
52 | self._sent = False | |
|
53 | self._start_time = time_now() | |
|
54 | return self | |
|
55 | ||
|
56 | def stop(self, send=True): | |
|
57 | if self._start_time is None: | |
|
58 | raise RuntimeError('Timer has not started.') | |
|
59 | dt = time_now() - self._start_time | |
|
60 | self.ms = 1000.0 * dt # Convert to milliseconds. | |
|
61 | if send: | |
|
62 | self.send() | |
|
63 | return self | |
|
64 | ||
|
65 | def send(self): | |
|
66 | if self.ms is None: | |
|
67 | raise RuntimeError('No data recorded.') | |
|
68 | if self._sent: | |
|
69 | raise RuntimeError('Already sent data.') | |
|
70 | self._sent = True | |
|
71 | self.client.timing(self.stat, self.ms, self.rate) |
@@ -0,0 +1,55 b'' | |||
|
1 | from __future__ import absolute_import, division, unicode_literals | |
|
2 | ||
|
3 | import socket | |
|
4 | ||
|
5 | from .base import StatsClientBase, PipelineBase | |
|
6 | ||
|
7 | ||
|
8 | class Pipeline(PipelineBase): | |
|
9 | ||
|
10 | def __init__(self, client): | |
|
11 | super(Pipeline, self).__init__(client) | |
|
12 | self._maxudpsize = client._maxudpsize | |
|
13 | ||
|
14 | def _send(self): | |
|
15 | data = self._stats.popleft() | |
|
16 | while self._stats: | |
|
17 | # Use popleft to preserve the order of the stats. | |
|
18 | stat = self._stats.popleft() | |
|
19 | if len(stat) + len(data) + 1 >= self._maxudpsize: | |
|
20 | self._client._after(data) | |
|
21 | data = stat | |
|
22 | else: | |
|
23 | data += '\n' + stat | |
|
24 | self._client._after(data) | |
|
25 | ||
|
26 | ||
|
27 | class StatsClient(StatsClientBase): | |
|
28 | """A client for statsd.""" | |
|
29 | ||
|
30 | def __init__(self, host='localhost', port=8125, prefix=None, | |
|
31 | maxudpsize=512, ipv6=False): | |
|
32 | """Create a new client.""" | |
|
33 | fam = socket.AF_INET6 if ipv6 else socket.AF_INET | |
|
34 | family, _, _, _, addr = socket.getaddrinfo( | |
|
35 | host, port, fam, socket.SOCK_DGRAM)[0] | |
|
36 | self._addr = addr | |
|
37 | self._sock = socket.socket(family, socket.SOCK_DGRAM) | |
|
38 | self._prefix = prefix | |
|
39 | self._maxudpsize = maxudpsize | |
|
40 | ||
|
41 | def _send(self, data): | |
|
42 | """Send data to statsd.""" | |
|
43 | try: | |
|
44 | self._sock.sendto(data.encode('ascii'), self._addr) | |
|
45 | except (socket.error, RuntimeError): | |
|
46 | # No time for love, Dr. Jones! | |
|
47 | pass | |
|
48 | ||
|
49 | def close(self): | |
|
50 | if self._sock and hasattr(self._sock, 'close'): | |
|
51 | self._sock.close() | |
|
52 | self._sock = None | |
|
53 | ||
|
54 | def pipeline(self): | |
|
55 | return Pipeline(self) |
@@ -191,6 +191,12 b' rc_cache.repo_object.expiration_time = 2' | |||
|
191 | 191 | ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends |
|
192 | 192 | #rc_cache.repo_object.arguments.distributed_lock = true |
|
193 | 193 | |
|
194 | ; Statsd client config | |
|
195 | #statsd.enabled = false | |
|
196 | #statsd.statsd_host = 0.0.0.0 | |
|
197 | #statsd.statsd_port = 8125 | |
|
198 | #statsd.statsd_prefix = | |
|
199 | #statsd.statsd_ipv6 = false | |
|
194 | 200 | |
|
195 | 201 | ; ##################### |
|
196 | 202 | ; LOGGING CONFIGURATION |
@@ -154,6 +154,12 b' rc_cache.repo_object.expiration_time = 2' | |||
|
154 | 154 | ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends |
|
155 | 155 | #rc_cache.repo_object.arguments.distributed_lock = true |
|
156 | 156 | |
|
157 | ; Statsd client config | |
|
158 | #statsd.enabled = false | |
|
159 | #statsd.statsd_host = 0.0.0.0 | |
|
160 | #statsd.statsd_port = 8125 | |
|
161 | #statsd.statsd_prefix = | |
|
162 | #statsd.statsd_ipv6 = false | |
|
157 | 163 | |
|
158 | 164 | ; ##################### |
|
159 | 165 | ; LOGGING CONFIGURATION |
@@ -24,7 +24,6 b' import uuid' | |||
|
24 | 24 | import wsgiref.util |
|
25 | 25 | import traceback |
|
26 | 26 | import tempfile |
|
27 | import resource | |
|
28 | 27 | import psutil |
|
29 | 28 | from itertools import chain |
|
30 | 29 | from cStringIO import StringIO |
@@ -261,6 +260,7 b' class HTTPApplication(object):' | |||
|
261 | 260 | self.remote_wsgi = remote_wsgi_stub |
|
262 | 261 | |
|
263 | 262 | self._configure_settings(global_config, settings) |
|
263 | ||
|
264 | 264 | self._configure() |
|
265 | 265 | |
|
266 | 266 | def _configure_settings(self, global_config, app_settings): |
@@ -359,6 +359,10 b' class HTTPApplication(object):' | |||
|
359 | 359 | 'vcsserver.lib.request_counter.get_request_counter', |
|
360 | 360 | 'request_count') |
|
361 | 361 | |
|
362 | self.config.add_request_method( | |
|
363 | 'vcsserver.lib._vendor.statsd.get_statsd_client', | |
|
364 | 'statsd', reify=True) | |
|
365 | ||
|
362 | 366 | def wsgi_app(self): |
|
363 | 367 | return self.config.make_wsgi_app() |
|
364 | 368 |
@@ -43,17 +43,23 b' class RequestWrapperTween(object):' | |||
|
43 | 43 | |
|
44 | 44 | def __call__(self, request): |
|
45 | 45 | start = time.time() |
|
46 | log.debug('Starting request time measurement') | |
|
46 | 47 | try: |
|
47 | 48 | response = self.handler(request) |
|
48 | 49 | finally: |
|
49 | end = time.time() | |
|
50 | total = end - start | |
|
51 | 50 | count = request.request_count() |
|
52 | 51 | _ver_ = vcsserver.__version__ |
|
52 | statsd = request.statsd | |
|
53 | total = time.time() - start | |
|
54 | if statsd: | |
|
55 | statsd.timing('vcsserver.req.timing', total) | |
|
56 | statsd.incr('vcsserver.req.count') | |
|
53 | 57 | log.info( |
|
54 | 58 | 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s', |
|
55 | 59 | count, '127.0.0.1', request.environ.get('REQUEST_METHOD'), |
|
56 |
safe_str(get_access_path(request)), total, |
|
|
60 | safe_str(get_access_path(request)), total, | |
|
61 | get_user_agent(request.environ), _ver_ | |
|
62 | ) | |
|
57 | 63 | |
|
58 | 64 | return response |
|
59 | 65 |
General Comments 0
You need to be logged in to leave comments.
Login now