##// END OF EJS Templates
metrics: updated statsd client and fixed some metrics
super-admin -
r1013:dcc06da1 default
parent child Browse files
Show More
@@ -1,131 +1,147 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import re
3 import re
4 import random
4 import random
5 from collections import deque
5 from collections import deque
6 from datetime import timedelta
6 from datetime import timedelta
7 from repoze.lru import lru_cache
7 from repoze.lru import lru_cache
8
8
9 from .timer import Timer
9 from .timer import Timer
10
10
11 TAG_INVALID_CHARS_RE = re.compile(
11 TAG_INVALID_CHARS_RE = re.compile(
12 r"[^\w\d_\-:/\.]",
12 r"[^\w\d_\-:/\.]",
13 #re.UNICODE
13 #re.UNICODE
14 )
14 )
15 TAG_INVALID_CHARS_SUBS = "_"
15 TAG_INVALID_CHARS_SUBS = "_"
16
16
17 # we save and expose methods called by statsd for discovery
18 buckets_dict = {
19
20 }
21
17
22
18 @lru_cache(maxsize=500)
23 @lru_cache(maxsize=500)
19 def _normalize_tags_with_cache(tag_list):
24 def _normalize_tags_with_cache(tag_list):
20 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
21
26
22
27
23 def normalize_tags(tag_list):
28 def normalize_tags(tag_list):
24 # We have to turn our input tag list into a non-mutable tuple for it to
29 # We have to turn our input tag list into a non-mutable tuple for it to
25 # be hashable (and thus usable) by the @lru_cache decorator.
30 # be hashable (and thus usable) by the @lru_cache decorator.
26 return _normalize_tags_with_cache(tuple(tag_list))
31 return _normalize_tags_with_cache(tuple(tag_list))
27
32
28
33
29 class StatsClientBase(object):
34 class StatsClientBase(object):
30 """A Base class for various statsd clients."""
35 """A Base class for various statsd clients."""
31
36
32 def close(self):
37 def close(self):
33 """Used to close and clean up any underlying resources."""
38 """Used to close and clean up any underlying resources."""
34 raise NotImplementedError()
39 raise NotImplementedError()
35
40
36 def _send(self):
41 def _send(self):
37 raise NotImplementedError()
42 raise NotImplementedError()
38
43
39 def pipeline(self):
44 def pipeline(self):
40 raise NotImplementedError()
45 raise NotImplementedError()
41
46
42 def timer(self, stat, rate=1, tags=None):
47 def timer(self, stat, rate=1, tags=None):
43 return Timer(self, stat, rate, tags)
48 return Timer(self, stat, rate, tags)
44
49
45 def timing(self, stat, delta, rate=1, tags=None):
50 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
46 """
51 """
47 Send new timing information.
52 Send new timing information.
48
53
49 `delta` can be either a number of milliseconds or a timedelta.
54 `delta` can be either a number of milliseconds or a timedelta.
50 """
55 """
51 if isinstance(delta, timedelta):
56 if isinstance(delta, timedelta):
52 # Convert timedelta to number of milliseconds.
57 # Convert timedelta to number of milliseconds.
53 delta = delta.total_seconds() * 1000.
58 delta = delta.total_seconds() * 1000.
54 self._send_stat(stat, '%0.6f|ms' % delta, rate, tags)
59 if use_decimals:
60 fmt = '%0.6f|ms'
61 else:
62 fmt = '%s|ms'
63 self._send_stat(stat, fmt % delta, rate, tags)
55
64
56 def incr(self, stat, count=1, rate=1, tags=None):
65 def incr(self, stat, count=1, rate=1, tags=None):
57 """Increment a stat by `count`."""
66 """Increment a stat by `count`."""
58 self._send_stat(stat, '%s|c' % count, rate, tags)
67 self._send_stat(stat, '%s|c' % count, rate, tags)
59
68
60 def decr(self, stat, count=1, rate=1, tags=None):
69 def decr(self, stat, count=1, rate=1, tags=None):
61 """Decrement a stat by `count`."""
70 """Decrement a stat by `count`."""
62 self.incr(stat, -count, rate, tags)
71 self.incr(stat, -count, rate, tags)
63
72
64 def gauge(self, stat, value, rate=1, delta=False, tags=None):
73 def gauge(self, stat, value, rate=1, delta=False, tags=None):
65 """Set a gauge value."""
74 """Set a gauge value."""
66 if value < 0 and not delta:
75 if value < 0 and not delta:
67 if rate < 1:
76 if rate < 1:
68 if random.random() > rate:
77 if random.random() > rate:
69 return
78 return
70 with self.pipeline() as pipe:
79 with self.pipeline() as pipe:
71 pipe._send_stat(stat, '0|g', 1)
80 pipe._send_stat(stat, '0|g', 1)
72 pipe._send_stat(stat, '%s|g' % value, 1)
81 pipe._send_stat(stat, '%s|g' % value, 1)
73 else:
82 else:
74 prefix = '+' if delta and value >= 0 else ''
83 prefix = '+' if delta and value >= 0 else ''
75 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
84 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
76
85
77 def set(self, stat, value, rate=1):
86 def set(self, stat, value, rate=1):
78 """Set a set value."""
87 """Set a set value."""
79 self._send_stat(stat, '%s|s' % value, rate)
88 self._send_stat(stat, '%s|s' % value, rate)
80
89
90 def histogram(self, stat, value, rate=1, tags=None):
91 """Set a histogram"""
92 self._send_stat(stat, '%s|h' % value, rate, tags)
93
81 def _send_stat(self, stat, value, rate, tags=None):
94 def _send_stat(self, stat, value, rate, tags=None):
82 self._after(self._prepare(stat, value, rate, tags))
95 self._after(self._prepare(stat, value, rate, tags))
83
96
84 def _prepare(self, stat, value, rate, tags=None):
97 def _prepare(self, stat, value, rate, tags=None):
98 global buckets_dict
99 buckets_dict[stat] = 1
100
85 if rate < 1:
101 if rate < 1:
86 if random.random() > rate:
102 if random.random() > rate:
87 return
103 return
88 value = '%s|@%s' % (value, rate)
104 value = '%s|@%s' % (value, rate)
89
105
90 if self._prefix:
106 if self._prefix:
91 stat = '%s.%s' % (self._prefix, stat)
107 stat = '%s.%s' % (self._prefix, stat)
92
108
93 res = '%s:%s%s' % (
109 res = '%s:%s%s' % (
94 stat,
110 stat,
95 value,
111 value,
96 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
112 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
97 )
113 )
98 return res
114 return res
99
115
100 def _after(self, data):
116 def _after(self, data):
101 if data:
117 if data:
102 self._send(data)
118 self._send(data)
103
119
104
120
105 class PipelineBase(StatsClientBase):
121 class PipelineBase(StatsClientBase):
106
122
107 def __init__(self, client):
123 def __init__(self, client):
108 self._client = client
124 self._client = client
109 self._prefix = client._prefix
125 self._prefix = client._prefix
110 self._stats = deque()
126 self._stats = deque()
111
127
112 def _send(self):
128 def _send(self):
113 raise NotImplementedError()
129 raise NotImplementedError()
114
130
115 def _after(self, data):
131 def _after(self, data):
116 if data is not None:
132 if data is not None:
117 self._stats.append(data)
133 self._stats.append(data)
118
134
119 def __enter__(self):
135 def __enter__(self):
120 return self
136 return self
121
137
122 def __exit__(self, typ, value, tb):
138 def __exit__(self, typ, value, tb):
123 self.send()
139 self.send()
124
140
125 def send(self):
141 def send(self):
126 if not self._stats:
142 if not self._stats:
127 return
143 return
128 self._send()
144 self._send()
129
145
130 def pipeline(self):
146 def pipeline(self):
131 return self.__class__(self)
147 return self.__class__(self)
@@ -1,76 +1,85 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import time
18 import time
19 import logging
19 import logging
20
20
21 import vcsserver
21 import vcsserver
22 from vcsserver.utils import safe_str
22 from vcsserver.utils import safe_str
23
23
24
24
25 log = logging.getLogger(__name__)
25 log = logging.getLogger(__name__)
26
26
27
27
28 def get_access_path(environ):
28 def get_access_path(environ):
29 path = environ.get('PATH_INFO')
29 path = environ.get('PATH_INFO')
30 return path
30 return path
31
31
32
32
33 def get_user_agent(environ):
33 def get_user_agent(environ):
34 return environ.get('HTTP_USER_AGENT')
34 return environ.get('HTTP_USER_AGENT')
35
35
36
36
37 class RequestWrapperTween(object):
37 class RequestWrapperTween(object):
38 def __init__(self, handler, registry):
38 def __init__(self, handler, registry):
39 self.handler = handler
39 self.handler = handler
40 self.registry = registry
40 self.registry = registry
41
41
42 # one-time configuration code goes here
42 # one-time configuration code goes here
43
43
44 def __call__(self, request):
44 def __call__(self, request):
45 start = time.time()
45 start = time.time()
46 log.debug('Starting request time measurement')
46 log.debug('Starting request time measurement')
47 try:
47 try:
48 response = self.handler(request)
48 response = self.handler(request)
49 finally:
49 finally:
50 count = request.request_count()
50 count = request.request_count()
51 _ver_ = vcsserver.__version__
51 _ver_ = vcsserver.__version__
52 _path = safe_str(get_access_path(request.environ))
52 _path = safe_str(get_access_path(request.environ))
53
53
54 total = time.time() - start
54 total = time.time() - start
55 log.info(
55 log.info(
56 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
56 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
57 count, '127.0.0.1', request.environ.get('REQUEST_METHOD'),
57 count, '127.0.0.1', request.environ.get('REQUEST_METHOD'),
58 _path, total, get_user_agent(request.environ), _ver_
58 _path, total, get_user_agent(request.environ), _ver_
59 )
59 )
60
60
61 statsd = request.registry.statsd
61 statsd = request.registry.statsd
62 if statsd:
62 if statsd:
63 elapsed_time_ms = 1000.0 * total
63 match_route = request.matched_route.name if request.matched_route else _path
64 resp_code = response.status_code
65 elapsed_time_ms = round(1000.0 * total) # use ms only
64 statsd.timing(
66 statsd.timing(
65 'vcsserver_req_timing', elapsed_time_ms,
67 "vcsserver_req_timing.histogram", elapsed_time_ms,
66 tags=["path:{}".format(_path)]
68 tags=[
69 "view_name:{}".format(match_route),
70 "code:{}".format(resp_code)
71 ],
72 use_decimals=False
67 )
73 )
68 statsd.incr(
74 statsd.incr(
69 'vcsserver_req_total', tags=["path:{}".format(_path)])
75 "vcsserver_req_total", tags=[
76 "view_name:{}".format(match_route),
77 "code:{}".format(resp_code)
78 ])
70 return response
79 return response
71
80
72
81
73 def includeme(config):
82 def includeme(config):
74 config.add_tween(
83 config.add_tween(
75 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
84 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
76 )
85 )
General Comments 0
You need to be logged in to leave comments. Login now