##// END OF EJS Templates
statsd: fixed timed logic
super-admin -
r4831:56c4f044 default
parent child Browse files
Show More
@@ -1,156 +1,156 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import re
4 4 import random
5 5 from collections import deque
6 6 from datetime import timedelta
7 7 from repoze.lru import lru_cache
8 8
9 9 from .timer import Timer
10 10
11 11 TAG_INVALID_CHARS_RE = re.compile(
12 12 r"[^\w\d_\-:/\.]",
13 13 #re.UNICODE
14 14 )
15 15 TAG_INVALID_CHARS_SUBS = "_"
16 16
17 17 # we save and expose methods called by statsd for discovery
18 18 buckets_dict = {
19 19
20 20 }
21 21
22 22
23 23 @lru_cache(maxsize=500)
24 24 def _normalize_tags_with_cache(tag_list):
25 25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26 26
27 27
28 28 def normalize_tags(tag_list):
29 29 # We have to turn our input tag list into a non-mutable tuple for it to
30 30 # be hashable (and thus usable) by the @lru_cache decorator.
31 31 return _normalize_tags_with_cache(tuple(tag_list))
32 32
33 33
34 34 class StatsClientBase(object):
35 35 """A Base class for various statsd clients."""
36 36
37 37 def close(self):
38 38 """Used to close and clean up any underlying resources."""
39 39 raise NotImplementedError()
40 40
41 41 def _send(self):
42 42 raise NotImplementedError()
43 43
44 44 def pipeline(self):
45 45 raise NotImplementedError()
46 46
47 def timer(self, stat, rate=1, tags=None):
47 def timer(self, stat, rate=1, tags=None, auto_send=True):
48 48 """
49 statsd = StatsdClient()
49 statsd = StatsdClient.statsd
50 50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 51 # This block will be timed.
52 52 for i in xrange(0, 100000):
53 53 i ** 2
54 54 # you can access time here...
55 55 elapsed_ms = tmr.ms
56 56 """
57 return Timer(self, stat, rate, tags)
57 return Timer(self, stat, rate, tags, auto_send=auto_send)
58 58
59 59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
60 60 """
61 61 Send new timing information.
62 62
63 63 `delta` can be either a number of milliseconds or a timedelta.
64 64 """
65 65 if isinstance(delta, timedelta):
66 66 # Convert timedelta to number of milliseconds.
67 67 delta = delta.total_seconds() * 1000.
68 68 if use_decimals:
69 69 fmt = '%0.6f|ms'
70 70 else:
71 71 fmt = '%s|ms'
72 72 self._send_stat(stat, fmt % delta, rate, tags)
73 73
74 74 def incr(self, stat, count=1, rate=1, tags=None):
75 75 """Increment a stat by `count`."""
76 76 self._send_stat(stat, '%s|c' % count, rate, tags)
77 77
78 78 def decr(self, stat, count=1, rate=1, tags=None):
79 79 """Decrement a stat by `count`."""
80 80 self.incr(stat, -count, rate, tags)
81 81
82 82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
83 83 """Set a gauge value."""
84 84 if value < 0 and not delta:
85 85 if rate < 1:
86 86 if random.random() > rate:
87 87 return
88 88 with self.pipeline() as pipe:
89 89 pipe._send_stat(stat, '0|g', 1)
90 90 pipe._send_stat(stat, '%s|g' % value, 1)
91 91 else:
92 92 prefix = '+' if delta and value >= 0 else ''
93 93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
94 94
95 95 def set(self, stat, value, rate=1):
96 96 """Set a set value."""
97 97 self._send_stat(stat, '%s|s' % value, rate)
98 98
99 99 def histogram(self, stat, value, rate=1, tags=None):
100 100 """Set a histogram"""
101 101 self._send_stat(stat, '%s|h' % value, rate, tags)
102 102
103 103 def _send_stat(self, stat, value, rate, tags=None):
104 104 self._after(self._prepare(stat, value, rate, tags))
105 105
106 106 def _prepare(self, stat, value, rate, tags=None):
107 107 global buckets_dict
108 108 buckets_dict[stat] = 1
109 109
110 110 if rate < 1:
111 111 if random.random() > rate:
112 112 return
113 113 value = '%s|@%s' % (value, rate)
114 114
115 115 if self._prefix:
116 116 stat = '%s.%s' % (self._prefix, stat)
117 117
118 118 res = '%s:%s%s' % (
119 119 stat,
120 120 value,
121 121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
122 122 )
123 123 return res
124 124
125 125 def _after(self, data):
126 126 if data:
127 127 self._send(data)
128 128
129 129
130 130 class PipelineBase(StatsClientBase):
131 131
132 132 def __init__(self, client):
133 133 self._client = client
134 134 self._prefix = client._prefix
135 135 self._stats = deque()
136 136
137 137 def _send(self):
138 138 raise NotImplementedError()
139 139
140 140 def _after(self, data):
141 141 if data is not None:
142 142 self._stats.append(data)
143 143
144 144 def __enter__(self):
145 145 return self
146 146
147 147 def __exit__(self, typ, value, tb):
148 148 self.send()
149 149
150 150 def send(self):
151 151 if not self._stats:
152 152 return
153 153 self._send()
154 154
155 155 def pipeline(self):
156 156 return self.__class__(self)
General Comments 0
You need to be logged in to leave comments. Login now