##// END OF EJS Templates
statsd: added option to use timer without auto-sending data
super-admin -
r4814:bdd096de default
parent child Browse files
Show More
@@ -1,147 +1,156 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import re
4 4 import random
5 5 from collections import deque
6 6 from datetime import timedelta
7 7 from repoze.lru import lru_cache
8 8
9 9 from .timer import Timer
10 10
11 11 TAG_INVALID_CHARS_RE = re.compile(
12 12 r"[^\w\d_\-:/\.]",
13 13 #re.UNICODE
14 14 )
15 15 TAG_INVALID_CHARS_SUBS = "_"
16 16
17 17 # we save and expose methods called by statsd for discovery
18 18 buckets_dict = {
19 19
20 20 }
21 21
22 22
23 23 @lru_cache(maxsize=500)
24 24 def _normalize_tags_with_cache(tag_list):
25 25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26 26
27 27
28 28 def normalize_tags(tag_list):
29 29 # We have to turn our input tag list into a non-mutable tuple for it to
30 30 # be hashable (and thus usable) by the @lru_cache decorator.
31 31 return _normalize_tags_with_cache(tuple(tag_list))
32 32
33 33
34 34 class StatsClientBase(object):
35 35 """A Base class for various statsd clients."""
36 36
37 37 def close(self):
38 38 """Used to close and clean up any underlying resources."""
39 39 raise NotImplementedError()
40 40
41 41 def _send(self):
42 42 raise NotImplementedError()
43 43
44 44 def pipeline(self):
45 45 raise NotImplementedError()
46 46
47 47 def timer(self, stat, rate=1, tags=None):
48 """
49 statsd = StatsdClient()
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 # This block will be timed.
52 for i in xrange(0, 100000):
53 i ** 2
54 # you can access time here...
55 elapsed_ms = tmr.ms
56 """
48 57 return Timer(self, stat, rate, tags)
49 58
50 59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
51 60 """
52 61 Send new timing information.
53 62
54 63 `delta` can be either a number of milliseconds or a timedelta.
55 64 """
56 65 if isinstance(delta, timedelta):
57 66 # Convert timedelta to number of milliseconds.
58 67 delta = delta.total_seconds() * 1000.
59 68 if use_decimals:
60 69 fmt = '%0.6f|ms'
61 70 else:
62 71 fmt = '%s|ms'
63 72 self._send_stat(stat, fmt % delta, rate, tags)
64 73
65 74 def incr(self, stat, count=1, rate=1, tags=None):
66 75 """Increment a stat by `count`."""
67 76 self._send_stat(stat, '%s|c' % count, rate, tags)
68 77
69 78 def decr(self, stat, count=1, rate=1, tags=None):
70 79 """Decrement a stat by `count`."""
71 80 self.incr(stat, -count, rate, tags)
72 81
73 82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
74 83 """Set a gauge value."""
75 84 if value < 0 and not delta:
76 85 if rate < 1:
77 86 if random.random() > rate:
78 87 return
79 88 with self.pipeline() as pipe:
80 89 pipe._send_stat(stat, '0|g', 1)
81 90 pipe._send_stat(stat, '%s|g' % value, 1)
82 91 else:
83 92 prefix = '+' if delta and value >= 0 else ''
84 93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
85 94
86 95 def set(self, stat, value, rate=1):
87 96 """Set a set value."""
88 97 self._send_stat(stat, '%s|s' % value, rate)
89 98
90 99 def histogram(self, stat, value, rate=1, tags=None):
91 100 """Set a histogram"""
92 101 self._send_stat(stat, '%s|h' % value, rate, tags)
93 102
94 103 def _send_stat(self, stat, value, rate, tags=None):
95 104 self._after(self._prepare(stat, value, rate, tags))
96 105
97 106 def _prepare(self, stat, value, rate, tags=None):
98 107 global buckets_dict
99 108 buckets_dict[stat] = 1
100 109
101 110 if rate < 1:
102 111 if random.random() > rate:
103 112 return
104 113 value = '%s|@%s' % (value, rate)
105 114
106 115 if self._prefix:
107 116 stat = '%s.%s' % (self._prefix, stat)
108 117
109 118 res = '%s:%s%s' % (
110 119 stat,
111 120 value,
112 121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
113 122 )
114 123 return res
115 124
116 125 def _after(self, data):
117 126 if data:
118 127 self._send(data)
119 128
120 129
121 130 class PipelineBase(StatsClientBase):
122 131
123 132 def __init__(self, client):
124 133 self._client = client
125 134 self._prefix = client._prefix
126 135 self._stats = deque()
127 136
128 137 def _send(self):
129 138 raise NotImplementedError()
130 139
131 140 def _after(self, data):
132 141 if data is not None:
133 142 self._stats.append(data)
134 143
135 144 def __enter__(self):
136 145 return self
137 146
138 147 def __exit__(self, typ, value, tb):
139 148 self.send()
140 149
141 150 def send(self):
142 151 if not self._stats:
143 152 return
144 153 self._send()
145 154
146 155 def pipeline(self):
147 156 return self.__class__(self)
@@ -1,73 +1,75 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import functools
4 4
5 5 # Use timer that's not susceptible to time of day adjustments.
6 6 try:
7 7 # perf_counter is only present on Py3.3+
8 8 from time import perf_counter as time_now
9 9 except ImportError:
10 10 # fall back to using time
11 11 from time import time as time_now
12 12
13 13
14 14 def safe_wraps(wrapper, *args, **kwargs):
15 15 """Safely wraps partial functions."""
16 16 while isinstance(wrapper, functools.partial):
17 17 wrapper = wrapper.func
18 18 return functools.wraps(wrapper, *args, **kwargs)
19 19
20 20
21 21 class Timer(object):
22 22 """A context manager/decorator for statsd.timing()."""
23 23
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True):
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
25 25 self.client = client
26 26 self.stat = stat
27 27 self.rate = rate
28 28 self.tags = tags
29 29 self.ms = None
30 30 self._sent = False
31 31 self._start_time = None
32 32 self.use_decimals = use_decimals
33 self.auto_send = auto_send
33 34
34 35 def __call__(self, f):
35 36 """Thread-safe timing function decorator."""
36 37 @safe_wraps(f)
37 38 def _wrapped(*args, **kwargs):
38 39 start_time = time_now()
39 40 try:
40 41 return f(*args, **kwargs)
41 42 finally:
42 43 elapsed_time_ms = 1000.0 * (time_now() - start_time)
43 44 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
45 self._sent = True
44 46 return _wrapped
45 47
46 48 def __enter__(self):
47 49 return self.start()
48 50
49 51 def __exit__(self, typ, value, tb):
50 self.stop()
52 self.stop(send=self.auto_send)
51 53
52 54 def start(self):
53 55 self.ms = None
54 56 self._sent = False
55 57 self._start_time = time_now()
56 58 return self
57 59
58 60 def stop(self, send=True):
59 61 if self._start_time is None:
60 62 raise RuntimeError('Timer has not started.')
61 63 dt = time_now() - self._start_time
62 64 self.ms = 1000.0 * dt # Convert to milliseconds.
63 65 if send:
64 66 self.send()
65 67 return self
66 68
67 69 def send(self):
68 70 if self.ms is None:
69 71 raise RuntimeError('No data recorded.')
70 72 if self._sent:
71 73 raise RuntimeError('Already sent data.')
72 74 self._sent = True
73 75 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
General Comments 0
You need to be logged in to leave comments. Login now