##// END OF EJS Templates
metrics: updated statsd client and fixed some metrics
super-admin -
r1013:dcc06da1 default
parent child Browse files
Show More
@@ -1,131 +1,147 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import re
4 4 import random
5 5 from collections import deque
6 6 from datetime import timedelta
7 7 from repoze.lru import lru_cache
8 8
9 9 from .timer import Timer
10 10
11 11 TAG_INVALID_CHARS_RE = re.compile(
12 12 r"[^\w\d_\-:/\.]",
13 13 #re.UNICODE
14 14 )
15 15 TAG_INVALID_CHARS_SUBS = "_"
16 16
17 # we save and expose methods called by statsd for discovery
18 buckets_dict = {
19
20 }
21
17 22
18 23 @lru_cache(maxsize=500)
19 24 def _normalize_tags_with_cache(tag_list):
20 25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
21 26
22 27
23 28 def normalize_tags(tag_list):
24 29 # We have to turn our input tag list into a non-mutable tuple for it to
25 30 # be hashable (and thus usable) by the @lru_cache decorator.
26 31 return _normalize_tags_with_cache(tuple(tag_list))
27 32
28 33
29 34 class StatsClientBase(object):
30 35 """A Base class for various statsd clients."""
31 36
32 37 def close(self):
33 38 """Used to close and clean up any underlying resources."""
34 39 raise NotImplementedError()
35 40
36 41 def _send(self):
37 42 raise NotImplementedError()
38 43
39 44 def pipeline(self):
40 45 raise NotImplementedError()
41 46
42 47 def timer(self, stat, rate=1, tags=None):
43 48 return Timer(self, stat, rate, tags)
44 49
45 def timing(self, stat, delta, rate=1, tags=None):
50 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
46 51 """
47 52 Send new timing information.
48 53
49 54 `delta` can be either a number of milliseconds or a timedelta.
50 55 """
51 56 if isinstance(delta, timedelta):
52 57 # Convert timedelta to number of milliseconds.
53 58 delta = delta.total_seconds() * 1000.
54 self._send_stat(stat, '%0.6f|ms' % delta, rate, tags)
59 if use_decimals:
60 fmt = '%0.6f|ms'
61 else:
62 fmt = '%s|ms'
63 self._send_stat(stat, fmt % delta, rate, tags)
55 64
56 65 def incr(self, stat, count=1, rate=1, tags=None):
57 66 """Increment a stat by `count`."""
58 67 self._send_stat(stat, '%s|c' % count, rate, tags)
59 68
60 69 def decr(self, stat, count=1, rate=1, tags=None):
61 70 """Decrement a stat by `count`."""
62 71 self.incr(stat, -count, rate, tags)
63 72
64 73 def gauge(self, stat, value, rate=1, delta=False, tags=None):
65 74 """Set a gauge value."""
66 75 if value < 0 and not delta:
67 76 if rate < 1:
68 77 if random.random() > rate:
69 78 return
70 79 with self.pipeline() as pipe:
71 80 pipe._send_stat(stat, '0|g', 1)
72 81 pipe._send_stat(stat, '%s|g' % value, 1)
73 82 else:
74 83 prefix = '+' if delta and value >= 0 else ''
75 84 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
76 85
77 86 def set(self, stat, value, rate=1):
78 87 """Set a set value."""
79 88 self._send_stat(stat, '%s|s' % value, rate)
80 89
90 def histogram(self, stat, value, rate=1, tags=None):
91 """Set a histogram"""
92 self._send_stat(stat, '%s|h' % value, rate, tags)
93
81 94 def _send_stat(self, stat, value, rate, tags=None):
82 95 self._after(self._prepare(stat, value, rate, tags))
83 96
84 97 def _prepare(self, stat, value, rate, tags=None):
98 global buckets_dict
99 buckets_dict[stat] = 1
100
85 101 if rate < 1:
86 102 if random.random() > rate:
87 103 return
88 104 value = '%s|@%s' % (value, rate)
89 105
90 106 if self._prefix:
91 107 stat = '%s.%s' % (self._prefix, stat)
92 108
93 109 res = '%s:%s%s' % (
94 110 stat,
95 111 value,
96 112 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
97 113 )
98 114 return res
99 115
100 116 def _after(self, data):
101 117 if data:
102 118 self._send(data)
103 119
104 120
105 121 class PipelineBase(StatsClientBase):
106 122
107 123 def __init__(self, client):
108 124 self._client = client
109 125 self._prefix = client._prefix
110 126 self._stats = deque()
111 127
112 128 def _send(self):
113 129 raise NotImplementedError()
114 130
115 131 def _after(self, data):
116 132 if data is not None:
117 133 self._stats.append(data)
118 134
119 135 def __enter__(self):
120 136 return self
121 137
122 138 def __exit__(self, typ, value, tb):
123 139 self.send()
124 140
125 141 def send(self):
126 142 if not self._stats:
127 143 return
128 144 self._send()
129 145
130 146 def pipeline(self):
131 147 return self.__class__(self)
@@ -1,76 +1,85 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import time
19 19 import logging
20 20
21 21 import vcsserver
22 22 from vcsserver.utils import safe_str
23 23
24 24
25 25 log = logging.getLogger(__name__)
26 26
27 27
28 28 def get_access_path(environ):
29 29 path = environ.get('PATH_INFO')
30 30 return path
31 31
32 32
33 33 def get_user_agent(environ):
34 34 return environ.get('HTTP_USER_AGENT')
35 35
36 36
37 37 class RequestWrapperTween(object):
38 38 def __init__(self, handler, registry):
39 39 self.handler = handler
40 40 self.registry = registry
41 41
42 42 # one-time configuration code goes here
43 43
44 44 def __call__(self, request):
45 45 start = time.time()
46 46 log.debug('Starting request time measurement')
47 47 try:
48 48 response = self.handler(request)
49 49 finally:
50 50 count = request.request_count()
51 51 _ver_ = vcsserver.__version__
52 52 _path = safe_str(get_access_path(request.environ))
53 53
54 54 total = time.time() - start
55 55 log.info(
56 56 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
57 57 count, '127.0.0.1', request.environ.get('REQUEST_METHOD'),
58 58 _path, total, get_user_agent(request.environ), _ver_
59 59 )
60 60
61 61 statsd = request.registry.statsd
62 62 if statsd:
63 elapsed_time_ms = 1000.0 * total
63 match_route = request.matched_route.name if request.matched_route else _path
64 resp_code = response.status_code
65 elapsed_time_ms = round(1000.0 * total) # use ms only
64 66 statsd.timing(
65 'vcsserver_req_timing', elapsed_time_ms,
66 tags=["path:{}".format(_path)]
67 "vcsserver_req_timing.histogram", elapsed_time_ms,
68 tags=[
69 "view_name:{}".format(match_route),
70 "code:{}".format(resp_code)
71 ],
72 use_decimals=False
67 73 )
68 74 statsd.incr(
69 'vcsserver_req_total', tags=["path:{}".format(_path)])
75 "vcsserver_req_total", tags=[
76 "view_name:{}".format(match_route),
77 "code:{}".format(resp_code)
78 ])
70 79 return response
71 80
72 81
73 82 def includeme(config):
74 83 config.add_tween(
75 84 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
76 85 )
General Comments 0
You need to be logged in to leave comments. Login now