##// END OF EJS Templates
statsd: synced client with rhodecode
super-admin -
r1027:6541a7d5 default
parent child Browse files
Show More
@@ -1,156 +1,156 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import re
4 4 import random
5 5 from collections import deque
6 6 from datetime import timedelta
7 7 from repoze.lru import lru_cache
8 8
9 9 from .timer import Timer
10 10
11 11 TAG_INVALID_CHARS_RE = re.compile(
12 12 r"[^\w\d_\-:/\.]",
13 13 #re.UNICODE
14 14 )
15 15 TAG_INVALID_CHARS_SUBS = "_"
16 16
17 17 # we save and expose methods called by statsd for discovery
18 18 buckets_dict = {
19 19
20 20 }
21 21
22 22
23 23 @lru_cache(maxsize=500)
24 24 def _normalize_tags_with_cache(tag_list):
25 25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26 26
27 27
28 28 def normalize_tags(tag_list):
29 29 # We have to turn our input tag list into a non-mutable tuple for it to
30 30 # be hashable (and thus usable) by the @lru_cache decorator.
31 31 return _normalize_tags_with_cache(tuple(tag_list))
32 32
33 33
34 34 class StatsClientBase(object):
35 35 """A Base class for various statsd clients."""
36 36
37 37 def close(self):
38 38 """Used to close and clean up any underlying resources."""
39 39 raise NotImplementedError()
40 40
41 41 def _send(self):
42 42 raise NotImplementedError()
43 43
44 44 def pipeline(self):
45 45 raise NotImplementedError()
46 46
47 def timer(self, stat, rate=1, tags=None):
47 def timer(self, stat, rate=1, tags=None, auto_send=True):
48 48 """
49 statsd = StatsdClient()
49 statsd = StatsdClient.statsd
50 50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 51 # This block will be timed.
52 52 for i in xrange(0, 100000):
53 53 i ** 2
54 54 # you can access time here...
55 55 elapsed_ms = tmr.ms
56 56 """
57 return Timer(self, stat, rate, tags)
57 return Timer(self, stat, rate, tags, auto_send=auto_send)
58 58
59 59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
60 60 """
61 61 Send new timing information.
62 62
63 63 `delta` can be either a number of milliseconds or a timedelta.
64 64 """
65 65 if isinstance(delta, timedelta):
66 66 # Convert timedelta to number of milliseconds.
67 67 delta = delta.total_seconds() * 1000.
68 68 if use_decimals:
69 69 fmt = '%0.6f|ms'
70 70 else:
71 71 fmt = '%s|ms'
72 72 self._send_stat(stat, fmt % delta, rate, tags)
73 73
74 74 def incr(self, stat, count=1, rate=1, tags=None):
75 75 """Increment a stat by `count`."""
76 76 self._send_stat(stat, '%s|c' % count, rate, tags)
77 77
78 78 def decr(self, stat, count=1, rate=1, tags=None):
79 79 """Decrement a stat by `count`."""
80 80 self.incr(stat, -count, rate, tags)
81 81
82 82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
83 83 """Set a gauge value."""
84 84 if value < 0 and not delta:
85 85 if rate < 1:
86 86 if random.random() > rate:
87 87 return
88 88 with self.pipeline() as pipe:
89 89 pipe._send_stat(stat, '0|g', 1)
90 90 pipe._send_stat(stat, '%s|g' % value, 1)
91 91 else:
92 92 prefix = '+' if delta and value >= 0 else ''
93 93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
94 94
95 95 def set(self, stat, value, rate=1):
96 96 """Set a set value."""
97 97 self._send_stat(stat, '%s|s' % value, rate)
98 98
99 99 def histogram(self, stat, value, rate=1, tags=None):
100 100 """Set a histogram"""
101 101 self._send_stat(stat, '%s|h' % value, rate, tags)
102 102
103 103 def _send_stat(self, stat, value, rate, tags=None):
104 104 self._after(self._prepare(stat, value, rate, tags))
105 105
106 106 def _prepare(self, stat, value, rate, tags=None):
107 107 global buckets_dict
108 108 buckets_dict[stat] = 1
109 109
110 110 if rate < 1:
111 111 if random.random() > rate:
112 112 return
113 113 value = '%s|@%s' % (value, rate)
114 114
115 115 if self._prefix:
116 116 stat = '%s.%s' % (self._prefix, stat)
117 117
118 118 res = '%s:%s%s' % (
119 119 stat,
120 120 value,
121 121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
122 122 )
123 123 return res
124 124
125 125 def _after(self, data):
126 126 if data:
127 127 self._send(data)
128 128
129 129
130 130 class PipelineBase(StatsClientBase):
131 131
132 132 def __init__(self, client):
133 133 self._client = client
134 134 self._prefix = client._prefix
135 135 self._stats = deque()
136 136
137 137 def _send(self):
138 138 raise NotImplementedError()
139 139
140 140 def _after(self, data):
141 141 if data is not None:
142 142 self._stats.append(data)
143 143
144 144 def __enter__(self):
145 145 return self
146 146
147 147 def __exit__(self, typ, value, tb):
148 148 self.send()
149 149
150 150 def send(self):
151 151 if not self._stats:
152 152 return
153 153 self._send()
154 154
155 155 def pipeline(self):
156 156 return self.__class__(self)
@@ -1,72 +1,75 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import functools
4 4
5 5 # Use timer that's not susceptible to time of day adjustments.
6 6 try:
7 7 # perf_counter is only present on Py3.3+
8 8 from time import perf_counter as time_now
9 9 except ImportError:
10 10 # fall back to using time
11 11 from time import time as time_now
12 12
13 13
14 14 def safe_wraps(wrapper, *args, **kwargs):
15 15 """Safely wraps partial functions."""
16 16 while isinstance(wrapper, functools.partial):
17 17 wrapper = wrapper.func
18 18 return functools.wraps(wrapper, *args, **kwargs)
19 19
20 20
21 21 class Timer(object):
22 22 """A context manager/decorator for statsd.timing()."""
23 23
24 def __init__(self, client, stat, rate=1, tags=None):
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
25 25 self.client = client
26 26 self.stat = stat
27 27 self.rate = rate
28 28 self.tags = tags
29 29 self.ms = None
30 30 self._sent = False
31 31 self._start_time = None
32 self.use_decimals = use_decimals
33 self.auto_send = auto_send
32 34
33 35 def __call__(self, f):
34 36 """Thread-safe timing function decorator."""
35 37 @safe_wraps(f)
36 38 def _wrapped(*args, **kwargs):
37 39 start_time = time_now()
38 40 try:
39 41 return f(*args, **kwargs)
40 42 finally:
41 43 elapsed_time_ms = 1000.0 * (time_now() - start_time)
42 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags)
44 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
45 self._sent = True
43 46 return _wrapped
44 47
45 48 def __enter__(self):
46 49 return self.start()
47 50
48 51 def __exit__(self, typ, value, tb):
49 self.stop()
52 self.stop(send=self.auto_send)
50 53
51 54 def start(self):
52 55 self.ms = None
53 56 self._sent = False
54 57 self._start_time = time_now()
55 58 return self
56 59
57 60 def stop(self, send=True):
58 61 if self._start_time is None:
59 62 raise RuntimeError('Timer has not started.')
60 63 dt = time_now() - self._start_time
61 64 self.ms = 1000.0 * dt # Convert to milliseconds.
62 65 if send:
63 66 self.send()
64 67 return self
65 68
66 69 def send(self):
67 70 if self.ms is None:
68 71 raise RuntimeError('No data recorded.')
69 72 if self._sent:
70 73 raise RuntimeError('Already sent data.')
71 74 self._sent = True
72 self.client.timing(self.stat, self.ms, self.rate)
75 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
General Comments 0
You need to be logged in to leave comments. Login now