Show More
@@ -0,0 +1,49 b'' | |||
|
1 | from rhodecode.lib._vendor.statsd import client_from_config | |
|
2 | ||
|
3 | ||
|
4 | class StatsdClientNotInitialised(Exception): | |
|
5 | pass | |
|
6 | ||
|
7 | ||
|
8 | class _Singleton(type): | |
|
9 | """A metaclass that creates a Singleton base class when called.""" | |
|
10 | ||
|
11 | _instances = {} | |
|
12 | ||
|
13 | def __call__(cls, *args, **kwargs): | |
|
14 | if cls not in cls._instances: | |
|
15 | cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs) | |
|
16 | return cls._instances[cls] | |
|
17 | ||
|
18 | ||
|
19 | class Singleton(_Singleton("SingletonMeta", (object,), {})): | |
|
20 | pass | |
|
21 | ||
|
22 | ||
|
23 | class StatsdClientClass(Singleton): | |
|
24 | setup_run = False | |
|
25 | statsd_client = None | |
|
26 | statsd = None | |
|
27 | ||
|
28 | def __getattribute__(self, name): | |
|
29 | ||
|
30 | if name.startswith("statsd"): | |
|
31 | if self.setup_run: | |
|
32 | return super(StatsdClientClass, self).__getattribute__(name) | |
|
33 | else: | |
|
34 | return None | |
|
35 | #raise StatsdClientNotInitialised("requested key was %s" % name) | |
|
36 | ||
|
37 | return super(StatsdClientClass, self).__getattribute__(name) | |
|
38 | ||
|
39 | def setup(self, settings): | |
|
40 | """ | |
|
41 | Initialize the client | |
|
42 | """ | |
|
43 | statsd = client_from_config(settings) | |
|
44 | self.statsd = statsd | |
|
45 | self.statsd_client = statsd | |
|
46 | self.setup_run = True | |
|
47 | ||
|
48 | ||
|
49 | StatsdClient = StatsdClientClass() |
@@ -53,7 +53,7 b' from rhodecode.lib.exc_tracking import s' | |||
|
53 | 53 | from rhodecode.subscribers import ( |
|
54 | 54 | scan_repositories_if_enabled, write_js_routes_if_enabled, |
|
55 | 55 | write_metadata_if_needed, write_usage_data) |
|
56 | ||
|
56 | from rhodecode.lib.statsd_client import StatsdClient | |
|
57 | 57 | |
|
58 | 58 | log = logging.getLogger(__name__) |
|
59 | 59 | |
@@ -93,6 +93,9 b' def make_pyramid_app(global_config, **se' | |||
|
93 | 93 | start_time = time.time() |
|
94 | 94 | log.info('Pyramid app config starting') |
|
95 | 95 | |
|
96 | # init and bootstrap StatsdClient | |
|
97 | StatsdClient.setup(settings) | |
|
98 | ||
|
96 | 99 | debug = asbool(global_config.get('debug')) |
|
97 | 100 | if debug: |
|
98 | 101 | enable_debug() |
@@ -105,6 +108,8 b' def make_pyramid_app(global_config, **se' | |||
|
105 | 108 | sanitize_settings_and_apply_defaults(global_config, settings) |
|
106 | 109 | |
|
107 | 110 | config = Configurator(settings=settings) |
|
111 | # Init our statsd at very start | |
|
112 | config.registry.statsd = StatsdClient.statsd | |
|
108 | 113 | |
|
109 | 114 | # Apply compatibility patches |
|
110 | 115 | patches.inspect_getargspec() |
@@ -124,10 +129,16 b' def make_pyramid_app(global_config, **se' | |||
|
124 | 129 | |
|
125 | 130 | # creating the app uses a connection - return it after we are done |
|
126 | 131 | meta.Session.remove() |
|
132 | statsd = StatsdClient.statsd | |
|
133 | ||
|
127 | 134 | total_time = time.time() - start_time |
|
128 | 135 | log.info('Pyramid app `%s` created and configured in %.2fs', |
|
129 | 136 | pyramid_app.func_name, total_time) |
|
130 | ||
|
137 | if statsd: | |
|
138 | elapsed_time_ms = 1000.0 * total_time | |
|
139 | statsd.timing('rhodecode_app_bootstrap_timing', elapsed_time_ms, tags=[ | |
|
140 | "pyramid_app:{}".format(pyramid_app.func_name) | |
|
141 | ]) | |
|
131 | 142 | return pyramid_app |
|
132 | 143 | |
|
133 | 144 | |
@@ -169,6 +180,10 b' def error_handler(exception, request):' | |||
|
169 | 180 | log.exception( |
|
170 | 181 | 'error occurred handling this request for path: %s', request.path) |
|
171 | 182 | |
|
183 | statsd = request.registry.statsd | |
|
184 | if statsd and base_response.status_code > 499: | |
|
185 | statsd.incr('rhodecode_exception') | |
|
186 | ||
|
172 | 187 | error_explanation = base_response.explanation or str(base_response) |
|
173 | 188 | if base_response.status_code == 404: |
|
174 | 189 | error_explanation += " Optionally you don't have permission to access this page." |
@@ -343,10 +358,6 b' def includeme(config, auth_resources=Non' | |||
|
343 | 358 | 'rhodecode.lib.request_counter.get_request_counter', |
|
344 | 359 | 'request_count') |
|
345 | 360 | |
|
346 | config.add_request_method( | |
|
347 | 'rhodecode.lib._vendor.statsd.get_statsd_client', | |
|
348 | 'statsd', reify=True) | |
|
349 | ||
|
350 | 361 | # Set the authorization policy. |
|
351 | 362 | authz_policy = ACLAuthorizationPolicy() |
|
352 | 363 | config.set_authorization_policy(authz_policy) |
@@ -1,11 +1,27 b'' | |||
|
1 | 1 | from __future__ import absolute_import, division, unicode_literals |
|
2 | 2 | |
|
3 | import re | |
|
3 | 4 | import random |
|
4 | 5 | from collections import deque |
|
5 | 6 | from datetime import timedelta |
|
7 | from repoze.lru import lru_cache | |
|
6 | 8 | |
|
7 | 9 | from .timer import Timer |
|
8 | 10 | |
|
11 | TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE) | |
|
12 | TAG_INVALID_CHARS_SUBS = "_" | |
|
13 | ||
|
14 | ||
|
15 | @lru_cache(maxsize=500) | |
|
16 | def _normalize_tags_with_cache(tag_list): | |
|
17 | return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list] | |
|
18 | ||
|
19 | ||
|
20 | def normalize_tags(tag_list): | |
|
21 | # We have to turn our input tag list into a non-mutable tuple for it to | |
|
22 | # be hashable (and thus usable) by the @lru_cache decorator. | |
|
23 | return _normalize_tags_with_cache(tuple(tag_list)) | |
|
24 | ||
|
9 | 25 | |
|
10 | 26 | class StatsClientBase(object): |
|
11 | 27 | """A Base class for various statsd clients.""" |
@@ -20,10 +36,10 b' class StatsClientBase(object):' | |||
|
20 | 36 | def pipeline(self): |
|
21 | 37 | raise NotImplementedError() |
|
22 | 38 | |
|
23 | def timer(self, stat, rate=1): | |
|
24 | return Timer(self, stat, rate) | |
|
39 | def timer(self, stat, rate=1, tags=None): | |
|
40 | return Timer(self, stat, rate, tags) | |
|
25 | 41 | |
|
26 | def timing(self, stat, delta, rate=1): | |
|
42 | def timing(self, stat, delta, rate=1, tags=None): | |
|
27 | 43 | """ |
|
28 | 44 | Send new timing information. |
|
29 | 45 | |
@@ -32,17 +48,17 b' class StatsClientBase(object):' | |||
|
32 | 48 | if isinstance(delta, timedelta): |
|
33 | 49 | # Convert timedelta to number of milliseconds. |
|
34 | 50 | delta = delta.total_seconds() * 1000. |
|
35 | self._send_stat(stat, '%0.6f|ms' % delta, rate) | |
|
51 | self._send_stat(stat, '%0.6f|ms' % delta, rate, tags) | |
|
36 | 52 | |
|
37 | def incr(self, stat, count=1, rate=1): | |
|
53 | def incr(self, stat, count=1, rate=1, tags=None): | |
|
38 | 54 | """Increment a stat by `count`.""" |
|
39 | self._send_stat(stat, '%s|c' % count, rate) | |
|
55 | self._send_stat(stat, '%s|c' % count, rate, tags) | |
|
40 | 56 | |
|
41 | def decr(self, stat, count=1, rate=1): | |
|
57 | def decr(self, stat, count=1, rate=1, tags=None): | |
|
42 | 58 | """Decrement a stat by `count`.""" |
|
43 | self.incr(stat, -count, rate) | |
|
59 | self.incr(stat, -count, rate, tags) | |
|
44 | 60 | |
|
45 | def gauge(self, stat, value, rate=1, delta=False): | |
|
61 | def gauge(self, stat, value, rate=1, delta=False, tags=None): | |
|
46 | 62 | """Set a gauge value.""" |
|
47 | 63 | if value < 0 and not delta: |
|
48 | 64 | if rate < 1: |
@@ -53,16 +69,16 b' class StatsClientBase(object):' | |||
|
53 | 69 | pipe._send_stat(stat, '%s|g' % value, 1) |
|
54 | 70 | else: |
|
55 | 71 | prefix = '+' if delta and value >= 0 else '' |
|
56 | self._send_stat(stat, '%s%s|g' % (prefix, value), rate) | |
|
72 | self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags) | |
|
57 | 73 | |
|
58 | 74 | def set(self, stat, value, rate=1): |
|
59 | 75 | """Set a set value.""" |
|
60 | 76 | self._send_stat(stat, '%s|s' % value, rate) |
|
61 | 77 | |
|
62 | def _send_stat(self, stat, value, rate): | |
|
63 | self._after(self._prepare(stat, value, rate)) | |
|
78 | def _send_stat(self, stat, value, rate, tags=None): | |
|
79 | self._after(self._prepare(stat, value, rate, tags)) | |
|
64 | 80 | |
|
65 | def _prepare(self, stat, value, rate): | |
|
81 | def _prepare(self, stat, value, rate, tags=None): | |
|
66 | 82 | if rate < 1: |
|
67 | 83 | if random.random() > rate: |
|
68 | 84 | return |
@@ -71,7 +87,12 b' class StatsClientBase(object):' | |||
|
71 | 87 | if self._prefix: |
|
72 | 88 | stat = '%s.%s' % (self._prefix, stat) |
|
73 | 89 | |
|
74 |
re |
|
|
90 | res = '%s:%s%s' % ( | |
|
91 | stat, | |
|
92 | value, | |
|
93 | ("|#" + ",".join(normalize_tags(tags))) if tags else "", | |
|
94 | ) | |
|
95 | return res | |
|
75 | 96 | |
|
76 | 97 | def _after(self, data): |
|
77 | 98 | if data: |
@@ -21,10 +21,11 b' def safe_wraps(wrapper, *args, **kwargs)' | |||
|
21 | 21 | class Timer(object): |
|
22 | 22 | """A context manager/decorator for statsd.timing().""" |
|
23 | 23 | |
|
24 | def __init__(self, client, stat, rate=1): | |
|
24 | def __init__(self, client, stat, rate=1, tags=None): | |
|
25 | 25 | self.client = client |
|
26 | 26 | self.stat = stat |
|
27 | 27 | self.rate = rate |
|
28 | self.tags = tags | |
|
28 | 29 | self.ms = None |
|
29 | 30 | self._sent = False |
|
30 | 31 | self._start_time = None |
@@ -38,7 +39,7 b' class Timer(object):' | |||
|
38 | 39 | return f(*args, **kwargs) |
|
39 | 40 | finally: |
|
40 | 41 | elapsed_time_ms = 1000.0 * (time_now() - start_time) |
|
41 | self.client.timing(self.stat, elapsed_time_ms, self.rate) | |
|
42 | self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags) | |
|
42 | 43 | return _wrapped |
|
43 | 44 | |
|
44 | 45 | def __enter__(self): |
@@ -25,6 +25,7 b' import rhodecode' | |||
|
25 | 25 | from zope.cachedescriptors.property import Lazy as LazyProperty |
|
26 | 26 | from rhodecode.lib.celerylib.loader import ( |
|
27 | 27 | celery_app, RequestContextTask, get_logger) |
|
28 | from rhodecode.lib.statsd_client import StatsdClient | |
|
28 | 29 | |
|
29 | 30 | async_task = celery_app.task |
|
30 | 31 | |
@@ -42,16 +43,19 b' class ResultWrapper(object):' | |||
|
42 | 43 | |
|
43 | 44 | |
|
44 | 45 | def run_task(task, *args, **kwargs): |
|
45 | log.debug('Got task `%s` for execution', task) | |
|
46 | log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED) | |
|
46 | 47 | if task is None: |
|
47 | 48 | raise ValueError('Got non-existing task for execution') |
|
48 | 49 | |
|
50 | statsd = StatsdClient.statsd | |
|
51 | exec_mode = 'sync' | |
|
52 | ||
|
49 | 53 | if rhodecode.CELERY_ENABLED: |
|
50 | celery_is_up = False | |
|
54 | ||
|
51 | 55 | try: |
|
52 | 56 | t = task.apply_async(args=args, kwargs=kwargs) |
|
53 | celery_is_up = True | |
|
54 | 57 | log.debug('executing task %s:%s in async mode', t.task_id, task) |
|
58 | exec_mode = 'async' | |
|
55 | 59 | return t |
|
56 | 60 | |
|
57 | 61 | except socket.error as e: |
@@ -69,4 +73,9 b' def run_task(task, *args, **kwargs):' | |||
|
69 | 73 | else: |
|
70 | 74 | log.debug('executing task %s:%s in sync mode', 'TASK', task) |
|
71 | 75 | |
|
76 | if statsd: | |
|
77 | statsd.incr('rhodecode_celery_task', tags=[ | |
|
78 | 'task:{}'.format(task), | |
|
79 | 'mode:{}'.format(exec_mode) | |
|
80 | ]) | |
|
72 | 81 | return ResultWrapper(task(*args, **kwargs)) |
@@ -51,19 +51,32 b' class RequestWrapperTween(object):' | |||
|
51 | 51 | finally: |
|
52 | 52 | count = request.request_count() |
|
53 | 53 | _ver_ = rhodecode.__version__ |
|
54 | statsd = request.statsd | |
|
54 | _path = safe_str(get_access_path(request.environ)) | |
|
55 | _auth_user = self._get_user_info(request) | |
|
56 | ||
|
55 | 57 | total = time.time() - start |
|
56 | if statsd: | |
|
57 | statsd.timing('rhodecode.req.timing', total) | |
|
58 | statsd.incr('rhodecode.req.count') | |
|
59 | ||
|
60 | 58 | log.info( |
|
61 | 59 | 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s', |
|
62 |
count, |
|
|
63 | safe_str(get_access_path(request.environ)), total, | |
|
64 | get_user_agent(request. environ), _ver_ | |
|
60 | count, _auth_user, request.environ.get('REQUEST_METHOD'), | |
|
61 | _path, total, get_user_agent(request. environ), _ver_ | |
|
65 | 62 | ) |
|
66 | 63 | |
|
64 | statsd = request.registry.statsd | |
|
65 | if statsd: | |
|
66 | elapsed_time_ms = 1000.0 * total | |
|
67 | statsd.timing( | |
|
68 | 'rhodecode_req_timing', elapsed_time_ms, | |
|
69 | tags=[ | |
|
70 | "path:{}".format(_path), | |
|
71 | "user:{}".format(_auth_user.user_id) | |
|
72 | ] | |
|
73 | ) | |
|
74 | statsd.incr( | |
|
75 | 'rhodecode_req_count', tags=[ | |
|
76 | "path:{}".format(_path), | |
|
77 | "user:{}".format(_auth_user.user_id) | |
|
78 | ]) | |
|
79 | ||
|
67 | 80 | return response |
|
68 | 81 | |
|
69 | 82 |
General Comments 0
You need to be logged in to leave comments.
Login now