##// END OF EJS Templates
metrics: fixed celery task names, fixed hiistogram type metrics, client small fixes
super-admin -
r4807:1bd9683a default
parent child Browse files
Show More
@@ -1,148 +1,147 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import re
3 import re
4 import random
4 import random
5 from collections import deque
5 from collections import deque
6 from datetime import timedelta
6 from datetime import timedelta
7 from repoze.lru import lru_cache
7 from repoze.lru import lru_cache
8
8
9 from .timer import Timer
9 from .timer import Timer
10
10
11 TAG_INVALID_CHARS_RE = re.compile(
11 TAG_INVALID_CHARS_RE = re.compile(
12 r"[^\w\d_\-:/\.]",
12 r"[^\w\d_\-:/\.]",
13 #re.UNICODE
13 #re.UNICODE
14 )
14 )
15 TAG_INVALID_CHARS_SUBS = "_"
15 TAG_INVALID_CHARS_SUBS = "_"
16
16
17 # we save and expose methods called by statsd for discovery
17 # we save and expose methods called by statsd for discovery
18 stat_dict = {
18 buckets_dict = {
19
19
20 }
20 }
21
21
22
22
23
24 @lru_cache(maxsize=500)
23 @lru_cache(maxsize=500)
25 def _normalize_tags_with_cache(tag_list):
24 def _normalize_tags_with_cache(tag_list):
26 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
27
26
28
27
29 def normalize_tags(tag_list):
28 def normalize_tags(tag_list):
30 # We have to turn our input tag list into a non-mutable tuple for it to
29 # We have to turn our input tag list into a non-mutable tuple for it to
31 # be hashable (and thus usable) by the @lru_cache decorator.
30 # be hashable (and thus usable) by the @lru_cache decorator.
32 return _normalize_tags_with_cache(tuple(tag_list))
31 return _normalize_tags_with_cache(tuple(tag_list))
33
32
34
33
35 class StatsClientBase(object):
34 class StatsClientBase(object):
36 """A Base class for various statsd clients."""
35 """A Base class for various statsd clients."""
37
36
38 def close(self):
37 def close(self):
39 """Used to close and clean up any underlying resources."""
38 """Used to close and clean up any underlying resources."""
40 raise NotImplementedError()
39 raise NotImplementedError()
41
40
42 def _send(self):
41 def _send(self):
43 raise NotImplementedError()
42 raise NotImplementedError()
44
43
45 def pipeline(self):
44 def pipeline(self):
46 raise NotImplementedError()
45 raise NotImplementedError()
47
46
48 def timer(self, stat, rate=1, tags=None):
47 def timer(self, stat, rate=1, tags=None):
49 return Timer(self, stat, rate, tags)
48 return Timer(self, stat, rate, tags)
50
49
51 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
50 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
52 """
51 """
53 Send new timing information.
52 Send new timing information.
54
53
55 `delta` can be either a number of milliseconds or a timedelta.
54 `delta` can be either a number of milliseconds or a timedelta.
56 """
55 """
57 if isinstance(delta, timedelta):
56 if isinstance(delta, timedelta):
58 # Convert timedelta to number of milliseconds.
57 # Convert timedelta to number of milliseconds.
59 delta = delta.total_seconds() * 1000.
58 delta = delta.total_seconds() * 1000.
60 if use_decimals:
59 if use_decimals:
61 fmt = '%0.6f|ms'
60 fmt = '%0.6f|ms'
62 else:
61 else:
63 fmt = '%s|ms'
62 fmt = '%s|ms'
64 self._send_stat(stat, fmt % delta, rate, tags)
63 self._send_stat(stat, fmt % delta, rate, tags)
65
64
66 def incr(self, stat, count=1, rate=1, tags=None):
65 def incr(self, stat, count=1, rate=1, tags=None):
67 """Increment a stat by `count`."""
66 """Increment a stat by `count`."""
68 self._send_stat(stat, '%s|c' % count, rate, tags)
67 self._send_stat(stat, '%s|c' % count, rate, tags)
69
68
70 def decr(self, stat, count=1, rate=1, tags=None):
69 def decr(self, stat, count=1, rate=1, tags=None):
71 """Decrement a stat by `count`."""
70 """Decrement a stat by `count`."""
72 self.incr(stat, -count, rate, tags)
71 self.incr(stat, -count, rate, tags)
73
72
74 def gauge(self, stat, value, rate=1, delta=False, tags=None):
73 def gauge(self, stat, value, rate=1, delta=False, tags=None):
75 """Set a gauge value."""
74 """Set a gauge value."""
76 if value < 0 and not delta:
75 if value < 0 and not delta:
77 if rate < 1:
76 if rate < 1:
78 if random.random() > rate:
77 if random.random() > rate:
79 return
78 return
80 with self.pipeline() as pipe:
79 with self.pipeline() as pipe:
81 pipe._send_stat(stat, '0|g', 1)
80 pipe._send_stat(stat, '0|g', 1)
82 pipe._send_stat(stat, '%s|g' % value, 1)
81 pipe._send_stat(stat, '%s|g' % value, 1)
83 else:
82 else:
84 prefix = '+' if delta and value >= 0 else ''
83 prefix = '+' if delta and value >= 0 else ''
85 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
84 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
86
85
87 def set(self, stat, value, rate=1):
86 def set(self, stat, value, rate=1):
88 """Set a set value."""
87 """Set a set value."""
89 self._send_stat(stat, '%s|s' % value, rate)
88 self._send_stat(stat, '%s|s' % value, rate)
90
89
91 def histogram(self, stat, value, rate=1, tags=None):
90 def histogram(self, stat, value, rate=1, tags=None):
92 """Set a histogram"""
91 """Set a histogram"""
93 self._send_stat(stat, '%s|h' % value, rate, tags)
92 self._send_stat(stat, '%s|h' % value, rate, tags)
94
93
95 def _send_stat(self, stat, value, rate, tags=None):
94 def _send_stat(self, stat, value, rate, tags=None):
96 self._after(self._prepare(stat, value, rate, tags))
95 self._after(self._prepare(stat, value, rate, tags))
97
96
98 def _prepare(self, stat, value, rate, tags=None):
97 def _prepare(self, stat, value, rate, tags=None):
99 global stat_dict
98 global buckets_dict
100 stat_dict[stat] = 1
99 buckets_dict[stat] = 1
101
100
102 if rate < 1:
101 if rate < 1:
103 if random.random() > rate:
102 if random.random() > rate:
104 return
103 return
105 value = '%s|@%s' % (value, rate)
104 value = '%s|@%s' % (value, rate)
106
105
107 if self._prefix:
106 if self._prefix:
108 stat = '%s.%s' % (self._prefix, stat)
107 stat = '%s.%s' % (self._prefix, stat)
109
108
110 res = '%s:%s%s' % (
109 res = '%s:%s%s' % (
111 stat,
110 stat,
112 value,
111 value,
113 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
112 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
114 )
113 )
115 return res
114 return res
116
115
117 def _after(self, data):
116 def _after(self, data):
118 if data:
117 if data:
119 self._send(data)
118 self._send(data)
120
119
121
120
122 class PipelineBase(StatsClientBase):
121 class PipelineBase(StatsClientBase):
123
122
124 def __init__(self, client):
123 def __init__(self, client):
125 self._client = client
124 self._client = client
126 self._prefix = client._prefix
125 self._prefix = client._prefix
127 self._stats = deque()
126 self._stats = deque()
128
127
129 def _send(self):
128 def _send(self):
130 raise NotImplementedError()
129 raise NotImplementedError()
131
130
132 def _after(self, data):
131 def _after(self, data):
133 if data is not None:
132 if data is not None:
134 self._stats.append(data)
133 self._stats.append(data)
135
134
136 def __enter__(self):
135 def __enter__(self):
137 return self
136 return self
138
137
139 def __exit__(self, typ, value, tb):
138 def __exit__(self, typ, value, tb):
140 self.send()
139 self.send()
141
140
142 def send(self):
141 def send(self):
143 if not self._stats:
142 if not self._stats:
144 return
143 return
145 self._send()
144 self._send()
146
145
147 def pipeline(self):
146 def pipeline(self):
148 return self.__class__(self)
147 return self.__class__(self)
@@ -1,81 +1,84 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import socket
21 import socket
22 import logging
22 import logging
23
23
24 import rhodecode
24 import rhodecode
25 from zope.cachedescriptors.property import Lazy as LazyProperty
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
27 celery_app, RequestContextTask, get_logger)
28 from rhodecode.lib.statsd_client import StatsdClient
28 from rhodecode.lib.statsd_client import StatsdClient
29
29
30 async_task = celery_app.task
30 async_task = celery_app.task
31
31
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class ResultWrapper(object):
36 class ResultWrapper(object):
37 def __init__(self, task):
37 def __init__(self, task):
38 self.task = task
38 self.task = task
39
39
40 @LazyProperty
40 @LazyProperty
41 def result(self):
41 def result(self):
42 return self.task
42 return self.task
43
43
44
44
45 def run_task(task, *args, **kwargs):
45 def run_task(task, *args, **kwargs):
46 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
46 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 if task is None:
47 if task is None:
48 raise ValueError('Got non-existing task for execution')
48 raise ValueError('Got non-existing task for execution')
49
49
50 statsd = StatsdClient.statsd
51 exec_mode = 'sync'
50 exec_mode = 'sync'
52
51
53 if rhodecode.CELERY_ENABLED:
52 if rhodecode.CELERY_ENABLED:
54
53 t = None
55 try:
54 try:
56 t = task.apply_async(args=args, kwargs=kwargs)
55 t = task.apply_async(args=args, kwargs=kwargs)
57 log.debug('executing task %s:%s in async mode', t.task_id, task)
56 log.debug('executing task %s:%s in async mode', t.task_id, task)
58 exec_mode = 'async'
57 exec_mode = 'async'
59 return t
60
61 except socket.error as e:
58 except socket.error as e:
62 if isinstance(e, IOError) and e.errno == 111:
59 if isinstance(e, IOError) and e.errno == 111:
63 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
60 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
64 else:
61 else:
65 log.exception("Exception while connecting to celeryd.")
62 log.exception("Exception while connecting to celeryd.")
66 except KeyError as e:
63 except KeyError as e:
67 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
64 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
68 except Exception as e:
65 except Exception as e:
69 log.exception(
66 log.exception(
70 "Exception while trying to run task asynchronous. "
67 "Exception while trying to run task asynchronous. "
71 "Fallback to sync execution.")
68 "Fallback to sync execution.")
72
69
73 else:
70 else:
74 log.debug('executing task %s:%s in sync mode', 'TASK', task)
71 log.debug('executing task %s:%s in sync mode', 'TASK', task)
75
72
73 statsd = StatsdClient.statsd
76 if statsd:
74 if statsd:
75 task_repr = getattr(task, 'name', task)
77 statsd.incr('rhodecode_celery_task_total', tags=[
76 statsd.incr('rhodecode_celery_task_total', tags=[
78 'task:{}'.format(task),
77 'task:{}'.format(task_repr),
79 'mode:{}'.format(exec_mode)
78 'mode:{}'.format(exec_mode)
80 ])
79 ])
80
81 # we got async task, return it after statsd call
82 if t:
83 return t
81 return ResultWrapper(task(*args, **kwargs))
84 return ResultWrapper(task(*args, **kwargs))
@@ -1,91 +1,89 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import logging
22 import logging
23
23
24 import rhodecode
24 import rhodecode
25 from rhodecode.lib.auth import AuthUser
25 from rhodecode.lib.auth import AuthUser
26 from rhodecode.lib.base import get_ip_addr, get_access_path, get_user_agent
26 from rhodecode.lib.base import get_ip_addr, get_access_path, get_user_agent
27 from rhodecode.lib.utils2 import safe_str, get_current_rhodecode_user
27 from rhodecode.lib.utils2 import safe_str, get_current_rhodecode_user
28
28
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class RequestWrapperTween(object):
33 class RequestWrapperTween(object):
34 def __init__(self, handler, registry):
34 def __init__(self, handler, registry):
35 self.handler = handler
35 self.handler = handler
36 self.registry = registry
36 self.registry = registry
37
37
38 # one-time configuration code goes here
38 # one-time configuration code goes here
39
39
40 def _get_user_info(self, request):
40 def _get_user_info(self, request):
41 user = get_current_rhodecode_user(request)
41 user = get_current_rhodecode_user(request)
42 if not user:
42 if not user:
43 user = AuthUser.repr_user(ip=get_ip_addr(request.environ))
43 user = AuthUser.repr_user(ip=get_ip_addr(request.environ))
44 return user
44 return user
45
45
46 def __call__(self, request):
46 def __call__(self, request):
47 start = time.time()
47 start = time.time()
48 log.debug('Starting request time measurement')
48 log.debug('Starting request time measurement')
49 try:
49 try:
50 response = self.handler(request)
50 response = self.handler(request)
51 finally:
51 finally:
52 count = request.request_count()
52 count = request.request_count()
53 _ver_ = rhodecode.__version__
53 _ver_ = rhodecode.__version__
54 _path = safe_str(get_access_path(request.environ))
54 _path = safe_str(get_access_path(request.environ))
55 _auth_user = self._get_user_info(request)
55 _auth_user = self._get_user_info(request)
56 user_id = getattr(_auth_user, 'user_id', _auth_user)
56
57 total = time.time() - start
57 total = time.time() - start
58 log.info(
58 log.info(
59 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s',
59 'Req[%4s] %s %s Request to %s time: %.4fs [%s], RhodeCode %s',
60 count, _auth_user, request.environ.get('REQUEST_METHOD'),
60 count, _auth_user, request.environ.get('REQUEST_METHOD'),
61 _path, total, get_user_agent(request. environ), _ver_
61 _path, total, get_user_agent(request. environ), _ver_
62 )
62 )
63
63
64 statsd = request.registry.statsd
64 statsd = request.registry.statsd
65 if statsd:
65 if statsd:
66 match_route = request.matched_route.name if request.matched_route else _path
66 match_route = request.matched_route.name if request.matched_route else _path
67 resp_code = response.status_code
67 resp_code = response.status_code
68 elapsed_time_ms = round(1000.0 * total) # use ms only
68 elapsed_time_ms = round(1000.0 * total) # use ms only
69 statsd.timing(
69 statsd.timing(
70 'rhodecode_req_timing', elapsed_time_ms,
70 "rhodecode_req_timing.histogram", elapsed_time_ms,
71 tags=[
71 tags=[
72 "view_name:{}".format(match_route),
72 "view_name:{}".format(match_route),
73 #"user:{}".format(user_id),
74 "code:{}".format(resp_code)
73 "code:{}".format(resp_code)
75 ],
74 ],
76 use_decimals=False
75 use_decimals=False
77 )
76 )
78 statsd.incr(
77 statsd.incr(
79 'rhodecode_req_total', tags=[
78 'rhodecode_req_total', tags=[
80 "view_name:{}".format(match_route),
79 "view_name:{}".format(match_route),
81 #"user:{}".format(user_id),
82 "code:{}".format(resp_code)
80 "code:{}".format(resp_code)
83 ])
81 ])
84
82
85 return response
83 return response
86
84
87
85
88 def includeme(config):
86 def includeme(config):
89 config.add_tween(
87 config.add_tween(
90 'rhodecode.lib.middleware.request_wrapper.RequestWrapperTween',
88 'rhodecode.lib.middleware.request_wrapper.RequestWrapperTween',
91 )
89 )
General Comments 0
You need to be logged in to leave comments. Login now