##// END OF EJS Templates
statsd: added option to use timer without auto-sending data
super-admin -
r4814:bdd096de default
parent child Browse files
Show More
@@ -1,147 +1,156 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import re
3 import re
4 import random
4 import random
5 from collections import deque
5 from collections import deque
6 from datetime import timedelta
6 from datetime import timedelta
7 from repoze.lru import lru_cache
7 from repoze.lru import lru_cache
8
8
9 from .timer import Timer
9 from .timer import Timer
10
10
11 TAG_INVALID_CHARS_RE = re.compile(
11 TAG_INVALID_CHARS_RE = re.compile(
12 r"[^\w\d_\-:/\.]",
12 r"[^\w\d_\-:/\.]",
13 #re.UNICODE
13 #re.UNICODE
14 )
14 )
15 TAG_INVALID_CHARS_SUBS = "_"
15 TAG_INVALID_CHARS_SUBS = "_"
16
16
17 # we save and expose methods called by statsd for discovery
17 # we save and expose methods called by statsd for discovery
18 buckets_dict = {
18 buckets_dict = {
19
19
20 }
20 }
21
21
22
22
23 @lru_cache(maxsize=500)
23 @lru_cache(maxsize=500)
24 def _normalize_tags_with_cache(tag_list):
24 def _normalize_tags_with_cache(tag_list):
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26
26
27
27
28 def normalize_tags(tag_list):
28 def normalize_tags(tag_list):
29 # We have to turn our input tag list into a non-mutable tuple for it to
29 # We have to turn our input tag list into a non-mutable tuple for it to
30 # be hashable (and thus usable) by the @lru_cache decorator.
30 # be hashable (and thus usable) by the @lru_cache decorator.
31 return _normalize_tags_with_cache(tuple(tag_list))
31 return _normalize_tags_with_cache(tuple(tag_list))
32
32
33
33
34 class StatsClientBase(object):
34 class StatsClientBase(object):
35 """A Base class for various statsd clients."""
35 """A Base class for various statsd clients."""
36
36
37 def close(self):
37 def close(self):
38 """Used to close and clean up any underlying resources."""
38 """Used to close and clean up any underlying resources."""
39 raise NotImplementedError()
39 raise NotImplementedError()
40
40
41 def _send(self):
41 def _send(self):
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 def pipeline(self):
44 def pipeline(self):
45 raise NotImplementedError()
45 raise NotImplementedError()
46
46
47 def timer(self, stat, rate=1, tags=None):
47 def timer(self, stat, rate=1, tags=None):
48 """
49 statsd = StatsdClient()
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 # This block will be timed.
52 for i in xrange(0, 100000):
53 i ** 2
54 # you can access time here...
55 elapsed_ms = tmr.ms
56 """
48 return Timer(self, stat, rate, tags)
57 return Timer(self, stat, rate, tags)
49
58
50 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
51 """
60 """
52 Send new timing information.
61 Send new timing information.
53
62
54 `delta` can be either a number of milliseconds or a timedelta.
63 `delta` can be either a number of milliseconds or a timedelta.
55 """
64 """
56 if isinstance(delta, timedelta):
65 if isinstance(delta, timedelta):
57 # Convert timedelta to number of milliseconds.
66 # Convert timedelta to number of milliseconds.
58 delta = delta.total_seconds() * 1000.
67 delta = delta.total_seconds() * 1000.
59 if use_decimals:
68 if use_decimals:
60 fmt = '%0.6f|ms'
69 fmt = '%0.6f|ms'
61 else:
70 else:
62 fmt = '%s|ms'
71 fmt = '%s|ms'
63 self._send_stat(stat, fmt % delta, rate, tags)
72 self._send_stat(stat, fmt % delta, rate, tags)
64
73
65 def incr(self, stat, count=1, rate=1, tags=None):
74 def incr(self, stat, count=1, rate=1, tags=None):
66 """Increment a stat by `count`."""
75 """Increment a stat by `count`."""
67 self._send_stat(stat, '%s|c' % count, rate, tags)
76 self._send_stat(stat, '%s|c' % count, rate, tags)
68
77
69 def decr(self, stat, count=1, rate=1, tags=None):
78 def decr(self, stat, count=1, rate=1, tags=None):
70 """Decrement a stat by `count`."""
79 """Decrement a stat by `count`."""
71 self.incr(stat, -count, rate, tags)
80 self.incr(stat, -count, rate, tags)
72
81
73 def gauge(self, stat, value, rate=1, delta=False, tags=None):
82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
74 """Set a gauge value."""
83 """Set a gauge value."""
75 if value < 0 and not delta:
84 if value < 0 and not delta:
76 if rate < 1:
85 if rate < 1:
77 if random.random() > rate:
86 if random.random() > rate:
78 return
87 return
79 with self.pipeline() as pipe:
88 with self.pipeline() as pipe:
80 pipe._send_stat(stat, '0|g', 1)
89 pipe._send_stat(stat, '0|g', 1)
81 pipe._send_stat(stat, '%s|g' % value, 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
82 else:
91 else:
83 prefix = '+' if delta and value >= 0 else ''
92 prefix = '+' if delta and value >= 0 else ''
84 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
85
94
86 def set(self, stat, value, rate=1):
95 def set(self, stat, value, rate=1):
87 """Set a set value."""
96 """Set a set value."""
88 self._send_stat(stat, '%s|s' % value, rate)
97 self._send_stat(stat, '%s|s' % value, rate)
89
98
90 def histogram(self, stat, value, rate=1, tags=None):
99 def histogram(self, stat, value, rate=1, tags=None):
91 """Set a histogram"""
100 """Set a histogram"""
92 self._send_stat(stat, '%s|h' % value, rate, tags)
101 self._send_stat(stat, '%s|h' % value, rate, tags)
93
102
94 def _send_stat(self, stat, value, rate, tags=None):
103 def _send_stat(self, stat, value, rate, tags=None):
95 self._after(self._prepare(stat, value, rate, tags))
104 self._after(self._prepare(stat, value, rate, tags))
96
105
97 def _prepare(self, stat, value, rate, tags=None):
106 def _prepare(self, stat, value, rate, tags=None):
98 global buckets_dict
107 global buckets_dict
99 buckets_dict[stat] = 1
108 buckets_dict[stat] = 1
100
109
101 if rate < 1:
110 if rate < 1:
102 if random.random() > rate:
111 if random.random() > rate:
103 return
112 return
104 value = '%s|@%s' % (value, rate)
113 value = '%s|@%s' % (value, rate)
105
114
106 if self._prefix:
115 if self._prefix:
107 stat = '%s.%s' % (self._prefix, stat)
116 stat = '%s.%s' % (self._prefix, stat)
108
117
109 res = '%s:%s%s' % (
118 res = '%s:%s%s' % (
110 stat,
119 stat,
111 value,
120 value,
112 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
113 )
122 )
114 return res
123 return res
115
124
116 def _after(self, data):
125 def _after(self, data):
117 if data:
126 if data:
118 self._send(data)
127 self._send(data)
119
128
120
129
121 class PipelineBase(StatsClientBase):
130 class PipelineBase(StatsClientBase):
122
131
123 def __init__(self, client):
132 def __init__(self, client):
124 self._client = client
133 self._client = client
125 self._prefix = client._prefix
134 self._prefix = client._prefix
126 self._stats = deque()
135 self._stats = deque()
127
136
128 def _send(self):
137 def _send(self):
129 raise NotImplementedError()
138 raise NotImplementedError()
130
139
131 def _after(self, data):
140 def _after(self, data):
132 if data is not None:
141 if data is not None:
133 self._stats.append(data)
142 self._stats.append(data)
134
143
135 def __enter__(self):
144 def __enter__(self):
136 return self
145 return self
137
146
138 def __exit__(self, typ, value, tb):
147 def __exit__(self, typ, value, tb):
139 self.send()
148 self.send()
140
149
141 def send(self):
150 def send(self):
142 if not self._stats:
151 if not self._stats:
143 return
152 return
144 self._send()
153 self._send()
145
154
146 def pipeline(self):
155 def pipeline(self):
147 return self.__class__(self)
156 return self.__class__(self)
@@ -1,73 +1,75 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import functools
3 import functools
4
4
5 # Use timer that's not susceptible to time of day adjustments.
5 # Use timer that's not susceptible to time of day adjustments.
6 try:
6 try:
7 # perf_counter is only present on Py3.3+
7 # perf_counter is only present on Py3.3+
8 from time import perf_counter as time_now
8 from time import perf_counter as time_now
9 except ImportError:
9 except ImportError:
10 # fall back to using time
10 # fall back to using time
11 from time import time as time_now
11 from time import time as time_now
12
12
13
13
14 def safe_wraps(wrapper, *args, **kwargs):
14 def safe_wraps(wrapper, *args, **kwargs):
15 """Safely wraps partial functions."""
15 """Safely wraps partial functions."""
16 while isinstance(wrapper, functools.partial):
16 while isinstance(wrapper, functools.partial):
17 wrapper = wrapper.func
17 wrapper = wrapper.func
18 return functools.wraps(wrapper, *args, **kwargs)
18 return functools.wraps(wrapper, *args, **kwargs)
19
19
20
20
21 class Timer(object):
21 class Timer(object):
22 """A context manager/decorator for statsd.timing()."""
22 """A context manager/decorator for statsd.timing()."""
23
23
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True):
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
25 self.client = client
25 self.client = client
26 self.stat = stat
26 self.stat = stat
27 self.rate = rate
27 self.rate = rate
28 self.tags = tags
28 self.tags = tags
29 self.ms = None
29 self.ms = None
30 self._sent = False
30 self._sent = False
31 self._start_time = None
31 self._start_time = None
32 self.use_decimals = use_decimals
32 self.use_decimals = use_decimals
33 self.auto_send = auto_send
33
34
34 def __call__(self, f):
35 def __call__(self, f):
35 """Thread-safe timing function decorator."""
36 """Thread-safe timing function decorator."""
36 @safe_wraps(f)
37 @safe_wraps(f)
37 def _wrapped(*args, **kwargs):
38 def _wrapped(*args, **kwargs):
38 start_time = time_now()
39 start_time = time_now()
39 try:
40 try:
40 return f(*args, **kwargs)
41 return f(*args, **kwargs)
41 finally:
42 finally:
42 elapsed_time_ms = 1000.0 * (time_now() - start_time)
43 elapsed_time_ms = 1000.0 * (time_now() - start_time)
43 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
44 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
45 self._sent = True
44 return _wrapped
46 return _wrapped
45
47
46 def __enter__(self):
48 def __enter__(self):
47 return self.start()
49 return self.start()
48
50
49 def __exit__(self, typ, value, tb):
51 def __exit__(self, typ, value, tb):
50 self.stop()
52 self.stop(send=self.auto_send)
51
53
52 def start(self):
54 def start(self):
53 self.ms = None
55 self.ms = None
54 self._sent = False
56 self._sent = False
55 self._start_time = time_now()
57 self._start_time = time_now()
56 return self
58 return self
57
59
58 def stop(self, send=True):
60 def stop(self, send=True):
59 if self._start_time is None:
61 if self._start_time is None:
60 raise RuntimeError('Timer has not started.')
62 raise RuntimeError('Timer has not started.')
61 dt = time_now() - self._start_time
63 dt = time_now() - self._start_time
62 self.ms = 1000.0 * dt # Convert to milliseconds.
64 self.ms = 1000.0 * dt # Convert to milliseconds.
63 if send:
65 if send:
64 self.send()
66 self.send()
65 return self
67 return self
66
68
67 def send(self):
69 def send(self):
68 if self.ms is None:
70 if self.ms is None:
69 raise RuntimeError('No data recorded.')
71 raise RuntimeError('No data recorded.')
70 if self._sent:
72 if self._sent:
71 raise RuntimeError('Already sent data.')
73 raise RuntimeError('Already sent data.')
72 self._sent = True
74 self._sent = True
73 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
75 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
General Comments 0
You need to be logged in to leave comments. Login now