##// END OF EJS Templates
metrics: fixed celery task names, fixed hiistogram type metrics, client small fixes
super-admin -
r4807:1bd9683a default
parent child Browse files
Show More
@@ -1,148 +1,147 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import re
4 4 import random
5 5 from collections import deque
6 6 from datetime import timedelta
7 7 from repoze.lru import lru_cache
8 8
9 9 from .timer import Timer
10 10
11 11 TAG_INVALID_CHARS_RE = re.compile(
12 12 r"[^\w\d_\-:/\.]",
13 13 #re.UNICODE
14 14 )
15 15 TAG_INVALID_CHARS_SUBS = "_"
16 16
17 17 # we save and expose methods called by statsd for discovery
18 stat_dict = {
18 buckets_dict = {
19 19
20 20 }
21 21
22 22
23
24 23 @lru_cache(maxsize=500)
25 24 def _normalize_tags_with_cache(tag_list):
26 25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
27 26
28 27
29 28 def normalize_tags(tag_list):
30 29 # We have to turn our input tag list into a non-mutable tuple for it to
31 30 # be hashable (and thus usable) by the @lru_cache decorator.
32 31 return _normalize_tags_with_cache(tuple(tag_list))
33 32
34 33
35 34 class StatsClientBase(object):
36 35 """A Base class for various statsd clients."""
37 36
38 37 def close(self):
39 38 """Used to close and clean up any underlying resources."""
40 39 raise NotImplementedError()
41 40
42 41 def _send(self):
43 42 raise NotImplementedError()
44 43
45 44 def pipeline(self):
46 45 raise NotImplementedError()
47 46
48 47 def timer(self, stat, rate=1, tags=None):
49 48 return Timer(self, stat, rate, tags)
50 49
51 50 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
52 51 """
53 52 Send new timing information.
54 53
55 54 `delta` can be either a number of milliseconds or a timedelta.
56 55 """
57 56 if isinstance(delta, timedelta):
58 57 # Convert timedelta to number of milliseconds.
59 58 delta = delta.total_seconds() * 1000.
60 59 if use_decimals:
61 60 fmt = '%0.6f|ms'
62 61 else:
63 62 fmt = '%s|ms'
64 63 self._send_stat(stat, fmt % delta, rate, tags)
65 64
66 65 def incr(self, stat, count=1, rate=1, tags=None):
67 66 """Increment a stat by `count`."""
68 67 self._send_stat(stat, '%s|c' % count, rate, tags)
69 68
70 69 def decr(self, stat, count=1, rate=1, tags=None):
71 70 """Decrement a stat by `count`."""
72 71 self.incr(stat, -count, rate, tags)
73 72
74 73 def gauge(self, stat, value, rate=1, delta=False, tags=None):
75 74 """Set a gauge value."""
76 75 if value < 0 and not delta:
77 76 if rate < 1:
78 77 if random.random() > rate:
79 78 return
80 79 with self.pipeline() as pipe:
81 80 pipe._send_stat(stat, '0|g', 1)
82 81 pipe._send_stat(stat, '%s|g' % value, 1)
83 82 else:
84 83 prefix = '+' if delta and value >= 0 else ''
85 84 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
86 85
87 86 def set(self, stat, value, rate=1):
88 87 """Set a set value."""
89 88 self._send_stat(stat, '%s|s' % value, rate)
90 89
91 90 def histogram(self, stat, value, rate=1, tags=None):
92 91 """Set a histogram"""
93 92 self._send_stat(stat, '%s|h' % value, rate, tags)
94 93
95 94 def _send_stat(self, stat, value, rate, tags=None):
96 95 self._after(self._prepare(stat, value, rate, tags))
97 96
98 97 def _prepare(self, stat, value, rate, tags=None):
99 global stat_dict
100 stat_dict[stat] = 1
98 global buckets_dict
99 buckets_dict[stat] = 1
101 100
102 101 if rate < 1:
103 102 if random.random() > rate:
104 103 return
105 104 value = '%s|@%s' % (value, rate)
106 105
107 106 if self._prefix:
108 107 stat = '%s.%s' % (self._prefix, stat)
109 108
110 109 res = '%s:%s%s' % (
111 110 stat,
112 111 value,
113 112 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
114 113 )
115 114 return res
116 115
117 116 def _after(self, data):
118 117 if data:
119 118 self._send(data)
120 119
121 120
122 121 class PipelineBase(StatsClientBase):
123 122
124 123 def __init__(self, client):
125 124 self._client = client
126 125 self._prefix = client._prefix
127 126 self._stats = deque()
128 127
129 128 def _send(self):
130 129 raise NotImplementedError()
131 130
132 131 def _after(self, data):
133 132 if data is not None:
134 133 self._stats.append(data)
135 134
136 135 def __enter__(self):
137 136 return self
138 137
139 138 def __exit__(self, typ, value, tb):
140 139 self.send()
141 140
142 141 def send(self):
143 142 if not self._stats:
144 143 return
145 144 self._send()
146 145
147 146 def pipeline(self):
148 147 return self.__class__(self)
@@ -1,81 +1,84 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import socket
22 22 import logging
23 23
24 24 import rhodecode
25 25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 26 from rhodecode.lib.celerylib.loader import (
27 27 celery_app, RequestContextTask, get_logger)
28 28 from rhodecode.lib.statsd_client import StatsdClient
29 29
30 30 async_task = celery_app.task
31 31
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ResultWrapper(object):
37 37 def __init__(self, task):
38 38 self.task = task
39 39
40 40 @LazyProperty
41 41 def result(self):
42 42 return self.task
43 43
44 44
45 45 def run_task(task, *args, **kwargs):
46 46 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 47 if task is None:
48 48 raise ValueError('Got non-existing task for execution')
49 49
50 statsd = StatsdClient.statsd
51 50 exec_mode = 'sync'
52 51
53 52 if rhodecode.CELERY_ENABLED:
54
53 t = None
55 54 try:
56 55 t = task.apply_async(args=args, kwargs=kwargs)
57 56 log.debug('executing task %s:%s in async mode', t.task_id, task)
58 57 exec_mode = 'async'
59 return t
60
61 58 except socket.error as e:
62 59 if isinstance(e, IOError) and e.errno == 111:
63 60 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
64 61 else:
65 62 log.exception("Exception while connecting to celeryd.")
66 63 except KeyError as e:
67 64 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
68 65 except Exception as e:
69 66 log.exception(
70 67 "Exception while trying to run task asynchronous. "
71 68 "Fallback to sync execution.")
72 69
73 70 else:
74 71 log.debug('executing task %s:%s in sync mode', 'TASK', task)
75 72
73 statsd = StatsdClient.statsd
76 74 if statsd:
75 task_repr = getattr(task, 'name', task)
77 76 statsd.incr('rhodecode_celery_task_total', tags=[
78 'task:{}'.format(task),
77 'task:{}'.format(task_repr),
79 78 'mode:{}'.format(exec_mode)
80 79 ])
80
81 # we got async task, return it after statsd call
82 if t:
83 return t
81 84 return ResultWrapper(task(*args, **kwargs))
@@ -1,91 +1,89 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import logging
23 23
24 24 import rhodecode
25 25 from rhodecode.lib.auth import AuthUser
26 26 from rhodecode.lib.base import get_ip_addr, get_access_path, get_user_agent
27 27 from rhodecode.lib.utils2 import safe_str, get_current_rhodecode_user
28 28
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class RequestWrapperTween(object):
34 34 def __init__(self, handler, registry):
35 35 self.handler = handler
36 36 self.registry = registry
37 37
38 38 # one-time configuration code goes here
39 39
40 40 def _get_user_info(self, request):
41 41 user = get_current_rhodecode_user(request)
42 42 if not user:
43 43 user = AuthUser.repr_user(ip=get_ip_addr(request.environ))
44 44 return user
45 45
46 46 def __call__(self, request):
47 47 start = time.time()
48 48 log.debug('Starting request time measurement')
49 49 try:
50 50 response = self.handler(request)
51 51 finally:
52 52 count = request.request_count()
53 53 _ver_ = rhodecode.__version__
54 54 _path = safe_str(get_access_path(request.environ))
55 55 _auth_user = self._get_user_info(request)
56 user_id = getattr(_auth_user, 'user_id', _auth_user)
56
57 57 total = time.time() - start
58 58 log.info(
59 59 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s',
60 60 count, _auth_user, request.environ.get('REQUEST_METHOD'),
61 61 _path, total, get_user_agent(request. environ), _ver_
62 62 )
63 63
64 64 statsd = request.registry.statsd
65 65 if statsd:
66 66 match_route = request.matched_route.name if request.matched_route else _path
67 67 resp_code = response.status_code
68 68 elapsed_time_ms = round(1000.0 * total) # use ms only
69 69 statsd.timing(
70 'rhodecode_req_timing', elapsed_time_ms,
70 "rhodecode_req_timing.histogram", elapsed_time_ms,
71 71 tags=[
72 72 "view_name:{}".format(match_route),
73 #"user:{}".format(user_id),
74 73 "code:{}".format(resp_code)
75 74 ],
76 75 use_decimals=False
77 76 )
78 77 statsd.incr(
79 78 'rhodecode_req_total', tags=[
80 79 "view_name:{}".format(match_route),
81 #"user:{}".format(user_id),
82 80 "code:{}".format(resp_code)
83 81 ])
84 82
85 83 return response
86 84
87 85
88 86 def includeme(config):
89 87 config.add_tween(
90 88 'rhodecode.lib.middleware.request_wrapper.RequestWrapperTween',
91 89 )
General Comments 0
You need to be logged in to leave comments. Login now