base.py
154 lines
| 4.2 KiB
| text/x-python
|
PythonLexer
r4792 | import re | |||
r4632 | import random | |||
from collections import deque | ||||
from datetime import timedelta | ||||
r4792 | from repoze.lru import lru_cache | |||
r4632 | ||||
from .timer import Timer | ||||
r4795 | TAG_INVALID_CHARS_RE = re.compile( | |||
r"[^\w\d_\-:/\.]", | ||||
#re.UNICODE | ||||
) | ||||
r4792 | TAG_INVALID_CHARS_SUBS = "_" | |||
r4806 | # we save and expose methods called by statsd for discovery | |||
r4807 | buckets_dict = { | |||
r4806 | ||||
} | ||||
r4792 | @lru_cache(maxsize=500) | |||
def _normalize_tags_with_cache(tag_list): | ||||
return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list] | ||||
def normalize_tags(tag_list): | ||||
# We have to turn our input tag list into a non-mutable tuple for it to | ||||
# be hashable (and thus usable) by the @lru_cache decorator. | ||||
return _normalize_tags_with_cache(tuple(tag_list)) | ||||
r4632 | ||||
r5431 | class StatsClientBase: | |||
r4632 | """A Base class for various statsd clients.""" | |||
def close(self): | ||||
"""Used to close and clean up any underlying resources.""" | ||||
raise NotImplementedError() | ||||
def _send(self): | ||||
raise NotImplementedError() | ||||
def pipeline(self): | ||||
raise NotImplementedError() | ||||
r4831 | def timer(self, stat, rate=1, tags=None, auto_send=True): | |||
r4814 | """ | |||
r4831 | statsd = StatsdClient.statsd | |||
r4814 | with statsd.timer('bucket_name', auto_send=True) as tmr: | |||
# This block will be timed. | ||||
r4906 | for i in range(0, 100000): | |||
r4814 | i ** 2 | |||
# you can access time here... | ||||
elapsed_ms = tmr.ms | ||||
""" | ||||
r4831 | return Timer(self, stat, rate, tags, auto_send=auto_send) | |||
r4632 | ||||
r4806 | def timing(self, stat, delta, rate=1, tags=None, use_decimals=True): | |||
r4632 | """ | |||
Send new timing information. | ||||
`delta` can be either a number of milliseconds or a timedelta. | ||||
""" | ||||
if isinstance(delta, timedelta): | ||||
# Convert timedelta to number of milliseconds. | ||||
delta = delta.total_seconds() * 1000. | ||||
r4806 | if use_decimals: | |||
fmt = '%0.6f|ms' | ||||
else: | ||||
fmt = '%s|ms' | ||||
self._send_stat(stat, fmt % delta, rate, tags) | ||||
r4632 | ||||
r4792 | def incr(self, stat, count=1, rate=1, tags=None): | |||
r4632 | """Increment a stat by `count`.""" | |||
r5431 | self._send_stat(stat, f'{count}|c', rate, tags) | |||
r4632 | ||||
r4792 | def decr(self, stat, count=1, rate=1, tags=None): | |||
r4632 | """Decrement a stat by `count`.""" | |||
r4792 | self.incr(stat, -count, rate, tags) | |||
r4632 | ||||
r4792 | def gauge(self, stat, value, rate=1, delta=False, tags=None): | |||
r4632 | """Set a gauge value.""" | |||
if value < 0 and not delta: | ||||
if rate < 1: | ||||
if random.random() > rate: | ||||
return | ||||
with self.pipeline() as pipe: | ||||
pipe._send_stat(stat, '0|g', 1) | ||||
r5431 | pipe._send_stat(stat, f'{value}|g', 1) | |||
r4632 | else: | |||
prefix = '+' if delta and value >= 0 else '' | ||||
r5431 | self._send_stat(stat, f'{prefix}{value}|g', rate, tags) | |||
r4632 | ||||
def set(self, stat, value, rate=1): | ||||
"""Set a set value.""" | ||||
r5431 | self._send_stat(stat, f'{value}|s', rate) | |||
r4632 | ||||
r4806 | def histogram(self, stat, value, rate=1, tags=None): | |||
"""Set a histogram""" | ||||
r5431 | self._send_stat(stat, f'{value}|h', rate, tags) | |||
r4806 | ||||
r4792 | def _send_stat(self, stat, value, rate, tags=None): | |||
self._after(self._prepare(stat, value, rate, tags)) | ||||
r4632 | ||||
r4792 | def _prepare(self, stat, value, rate, tags=None): | |||
r4807 | global buckets_dict | |||
buckets_dict[stat] = 1 | ||||
r4806 | ||||
r4632 | if rate < 1: | |||
if random.random() > rate: | ||||
return | ||||
r5431 | value = f'{value}|@{rate}' | |||
r4632 | ||||
if self._prefix: | ||||
r5431 | stat = f'{self._prefix}.{stat}' | |||
r4632 | ||||
r4792 | res = '%s:%s%s' % ( | |||
stat, | ||||
value, | ||||
("|#" + ",".join(normalize_tags(tags))) if tags else "", | ||||
) | ||||
return res | ||||
r4632 | ||||
def _after(self, data): | ||||
if data: | ||||
self._send(data) | ||||
class PipelineBase(StatsClientBase): | ||||
def __init__(self, client): | ||||
self._client = client | ||||
self._prefix = client._prefix | ||||
self._stats = deque() | ||||
def _send(self): | ||||
raise NotImplementedError() | ||||
def _after(self, data): | ||||
if data is not None: | ||||
self._stats.append(data) | ||||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, typ, value, tb): | ||||
self.send() | ||||
def send(self): | ||||
if not self._stats: | ||||
return | ||||
self._send() | ||||
def pipeline(self): | ||||
return self.__class__(self) | ||||