##// END OF EJS Templates
statsd: synced client with rhodecode
super-admin -
r1027:6541a7d5 default
parent child Browse files
Show More
@@ -1,156 +1,156 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import re
3 import re
4 import random
4 import random
5 from collections import deque
5 from collections import deque
6 from datetime import timedelta
6 from datetime import timedelta
7 from repoze.lru import lru_cache
7 from repoze.lru import lru_cache
8
8
9 from .timer import Timer
9 from .timer import Timer
10
10
11 TAG_INVALID_CHARS_RE = re.compile(
11 TAG_INVALID_CHARS_RE = re.compile(
12 r"[^\w\d_\-:/\.]",
12 r"[^\w\d_\-:/\.]",
13 #re.UNICODE
13 #re.UNICODE
14 )
14 )
15 TAG_INVALID_CHARS_SUBS = "_"
15 TAG_INVALID_CHARS_SUBS = "_"
16
16
17 # we save and expose methods called by statsd for discovery
17 # we save and expose methods called by statsd for discovery
18 buckets_dict = {
18 buckets_dict = {
19
19
20 }
20 }
21
21
22
22
23 @lru_cache(maxsize=500)
23 @lru_cache(maxsize=500)
24 def _normalize_tags_with_cache(tag_list):
24 def _normalize_tags_with_cache(tag_list):
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26
26
27
27
28 def normalize_tags(tag_list):
28 def normalize_tags(tag_list):
29 # We have to turn our input tag list into a non-mutable tuple for it to
29 # We have to turn our input tag list into a non-mutable tuple for it to
30 # be hashable (and thus usable) by the @lru_cache decorator.
30 # be hashable (and thus usable) by the @lru_cache decorator.
31 return _normalize_tags_with_cache(tuple(tag_list))
31 return _normalize_tags_with_cache(tuple(tag_list))
32
32
33
33
34 class StatsClientBase(object):
34 class StatsClientBase(object):
35 """A Base class for various statsd clients."""
35 """A Base class for various statsd clients."""
36
36
37 def close(self):
37 def close(self):
38 """Used to close and clean up any underlying resources."""
38 """Used to close and clean up any underlying resources."""
39 raise NotImplementedError()
39 raise NotImplementedError()
40
40
41 def _send(self):
41 def _send(self):
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 def pipeline(self):
44 def pipeline(self):
45 raise NotImplementedError()
45 raise NotImplementedError()
46
46
47 def timer(self, stat, rate=1, tags=None):
47 def timer(self, stat, rate=1, tags=None, auto_send=True):
48 """
48 """
49 statsd = StatsdClient()
49 statsd = StatsdClient.statsd
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 # This block will be timed.
51 # This block will be timed.
52 for i in xrange(0, 100000):
52 for i in xrange(0, 100000):
53 i ** 2
53 i ** 2
54 # you can access time here...
54 # you can access time here...
55 elapsed_ms = tmr.ms
55 elapsed_ms = tmr.ms
56 """
56 """
57 return Timer(self, stat, rate, tags)
57 return Timer(self, stat, rate, tags, auto_send=auto_send)
58
58
59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
60 """
60 """
61 Send new timing information.
61 Send new timing information.
62
62
63 `delta` can be either a number of milliseconds or a timedelta.
63 `delta` can be either a number of milliseconds or a timedelta.
64 """
64 """
65 if isinstance(delta, timedelta):
65 if isinstance(delta, timedelta):
66 # Convert timedelta to number of milliseconds.
66 # Convert timedelta to number of milliseconds.
67 delta = delta.total_seconds() * 1000.
67 delta = delta.total_seconds() * 1000.
68 if use_decimals:
68 if use_decimals:
69 fmt = '%0.6f|ms'
69 fmt = '%0.6f|ms'
70 else:
70 else:
71 fmt = '%s|ms'
71 fmt = '%s|ms'
72 self._send_stat(stat, fmt % delta, rate, tags)
72 self._send_stat(stat, fmt % delta, rate, tags)
73
73
74 def incr(self, stat, count=1, rate=1, tags=None):
74 def incr(self, stat, count=1, rate=1, tags=None):
75 """Increment a stat by `count`."""
75 """Increment a stat by `count`."""
76 self._send_stat(stat, '%s|c' % count, rate, tags)
76 self._send_stat(stat, '%s|c' % count, rate, tags)
77
77
78 def decr(self, stat, count=1, rate=1, tags=None):
78 def decr(self, stat, count=1, rate=1, tags=None):
79 """Decrement a stat by `count`."""
79 """Decrement a stat by `count`."""
80 self.incr(stat, -count, rate, tags)
80 self.incr(stat, -count, rate, tags)
81
81
82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
83 """Set a gauge value."""
83 """Set a gauge value."""
84 if value < 0 and not delta:
84 if value < 0 and not delta:
85 if rate < 1:
85 if rate < 1:
86 if random.random() > rate:
86 if random.random() > rate:
87 return
87 return
88 with self.pipeline() as pipe:
88 with self.pipeline() as pipe:
89 pipe._send_stat(stat, '0|g', 1)
89 pipe._send_stat(stat, '0|g', 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
91 else:
91 else:
92 prefix = '+' if delta and value >= 0 else ''
92 prefix = '+' if delta and value >= 0 else ''
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
94
94
95 def set(self, stat, value, rate=1):
95 def set(self, stat, value, rate=1):
96 """Set a set value."""
96 """Set a set value."""
97 self._send_stat(stat, '%s|s' % value, rate)
97 self._send_stat(stat, '%s|s' % value, rate)
98
98
99 def histogram(self, stat, value, rate=1, tags=None):
99 def histogram(self, stat, value, rate=1, tags=None):
100 """Set a histogram"""
100 """Set a histogram"""
101 self._send_stat(stat, '%s|h' % value, rate, tags)
101 self._send_stat(stat, '%s|h' % value, rate, tags)
102
102
103 def _send_stat(self, stat, value, rate, tags=None):
103 def _send_stat(self, stat, value, rate, tags=None):
104 self._after(self._prepare(stat, value, rate, tags))
104 self._after(self._prepare(stat, value, rate, tags))
105
105
106 def _prepare(self, stat, value, rate, tags=None):
106 def _prepare(self, stat, value, rate, tags=None):
107 global buckets_dict
107 global buckets_dict
108 buckets_dict[stat] = 1
108 buckets_dict[stat] = 1
109
109
110 if rate < 1:
110 if rate < 1:
111 if random.random() > rate:
111 if random.random() > rate:
112 return
112 return
113 value = '%s|@%s' % (value, rate)
113 value = '%s|@%s' % (value, rate)
114
114
115 if self._prefix:
115 if self._prefix:
116 stat = '%s.%s' % (self._prefix, stat)
116 stat = '%s.%s' % (self._prefix, stat)
117
117
118 res = '%s:%s%s' % (
118 res = '%s:%s%s' % (
119 stat,
119 stat,
120 value,
120 value,
121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
122 )
122 )
123 return res
123 return res
124
124
125 def _after(self, data):
125 def _after(self, data):
126 if data:
126 if data:
127 self._send(data)
127 self._send(data)
128
128
129
129
130 class PipelineBase(StatsClientBase):
130 class PipelineBase(StatsClientBase):
131
131
132 def __init__(self, client):
132 def __init__(self, client):
133 self._client = client
133 self._client = client
134 self._prefix = client._prefix
134 self._prefix = client._prefix
135 self._stats = deque()
135 self._stats = deque()
136
136
137 def _send(self):
137 def _send(self):
138 raise NotImplementedError()
138 raise NotImplementedError()
139
139
140 def _after(self, data):
140 def _after(self, data):
141 if data is not None:
141 if data is not None:
142 self._stats.append(data)
142 self._stats.append(data)
143
143
144 def __enter__(self):
144 def __enter__(self):
145 return self
145 return self
146
146
147 def __exit__(self, typ, value, tb):
147 def __exit__(self, typ, value, tb):
148 self.send()
148 self.send()
149
149
150 def send(self):
150 def send(self):
151 if not self._stats:
151 if not self._stats:
152 return
152 return
153 self._send()
153 self._send()
154
154
155 def pipeline(self):
155 def pipeline(self):
156 return self.__class__(self)
156 return self.__class__(self)
@@ -1,72 +1,75 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import functools
3 import functools
4
4
5 # Use timer that's not susceptible to time of day adjustments.
5 # Use timer that's not susceptible to time of day adjustments.
6 try:
6 try:
7 # perf_counter is only present on Py3.3+
7 # perf_counter is only present on Py3.3+
8 from time import perf_counter as time_now
8 from time import perf_counter as time_now
9 except ImportError:
9 except ImportError:
10 # fall back to using time
10 # fall back to using time
11 from time import time as time_now
11 from time import time as time_now
12
12
13
13
14 def safe_wraps(wrapper, *args, **kwargs):
14 def safe_wraps(wrapper, *args, **kwargs):
15 """Safely wraps partial functions."""
15 """Safely wraps partial functions."""
16 while isinstance(wrapper, functools.partial):
16 while isinstance(wrapper, functools.partial):
17 wrapper = wrapper.func
17 wrapper = wrapper.func
18 return functools.wraps(wrapper, *args, **kwargs)
18 return functools.wraps(wrapper, *args, **kwargs)
19
19
20
20
21 class Timer(object):
21 class Timer(object):
22 """A context manager/decorator for statsd.timing()."""
22 """A context manager/decorator for statsd.timing()."""
23
23
24 def __init__(self, client, stat, rate=1, tags=None):
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
25 self.client = client
25 self.client = client
26 self.stat = stat
26 self.stat = stat
27 self.rate = rate
27 self.rate = rate
28 self.tags = tags
28 self.tags = tags
29 self.ms = None
29 self.ms = None
30 self._sent = False
30 self._sent = False
31 self._start_time = None
31 self._start_time = None
32 self.use_decimals = use_decimals
33 self.auto_send = auto_send
32
34
33 def __call__(self, f):
35 def __call__(self, f):
34 """Thread-safe timing function decorator."""
36 """Thread-safe timing function decorator."""
35 @safe_wraps(f)
37 @safe_wraps(f)
36 def _wrapped(*args, **kwargs):
38 def _wrapped(*args, **kwargs):
37 start_time = time_now()
39 start_time = time_now()
38 try:
40 try:
39 return f(*args, **kwargs)
41 return f(*args, **kwargs)
40 finally:
42 finally:
41 elapsed_time_ms = 1000.0 * (time_now() - start_time)
43 elapsed_time_ms = 1000.0 * (time_now() - start_time)
42 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags)
44 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
45 self._sent = True
43 return _wrapped
46 return _wrapped
44
47
45 def __enter__(self):
48 def __enter__(self):
46 return self.start()
49 return self.start()
47
50
48 def __exit__(self, typ, value, tb):
51 def __exit__(self, typ, value, tb):
49 self.stop()
52 self.stop(send=self.auto_send)
50
53
51 def start(self):
54 def start(self):
52 self.ms = None
55 self.ms = None
53 self._sent = False
56 self._sent = False
54 self._start_time = time_now()
57 self._start_time = time_now()
55 return self
58 return self
56
59
57 def stop(self, send=True):
60 def stop(self, send=True):
58 if self._start_time is None:
61 if self._start_time is None:
59 raise RuntimeError('Timer has not started.')
62 raise RuntimeError('Timer has not started.')
60 dt = time_now() - self._start_time
63 dt = time_now() - self._start_time
61 self.ms = 1000.0 * dt # Convert to milliseconds.
64 self.ms = 1000.0 * dt # Convert to milliseconds.
62 if send:
65 if send:
63 self.send()
66 self.send()
64 return self
67 return self
65
68
66 def send(self):
69 def send(self):
67 if self.ms is None:
70 if self.ms is None:
68 raise RuntimeError('No data recorded.')
71 raise RuntimeError('No data recorded.')
69 if self._sent:
72 if self._sent:
70 raise RuntimeError('Already sent data.')
73 raise RuntimeError('Already sent data.')
71 self._sent = True
74 self._sent = True
72 self.client.timing(self.stat, self.ms, self.rate)
75 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
General Comments 0
You need to be logged in to leave comments. Login now