base.py
128 lines
| 3.6 KiB
| text/x-python
|
PythonLexer
r4632 | from __future__ import absolute_import, division, unicode_literals | |||
r4792 | import re | |||
r4632 | import random | |||
from collections import deque | ||||
from datetime import timedelta | ||||
r4792 | from repoze.lru import lru_cache | |||
r4632 | ||||
from .timer import Timer | ||||
r4792 | TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE) | |||
TAG_INVALID_CHARS_SUBS = "_" | ||||
@lru_cache(maxsize=500) | ||||
def _normalize_tags_with_cache(tag_list): | ||||
return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list] | ||||
def normalize_tags(tag_list): | ||||
# We have to turn our input tag list into a non-mutable tuple for it to | ||||
# be hashable (and thus usable) by the @lru_cache decorator. | ||||
return _normalize_tags_with_cache(tuple(tag_list)) | ||||
r4632 | ||||
class StatsClientBase(object): | ||||
"""A Base class for various statsd clients.""" | ||||
def close(self): | ||||
"""Used to close and clean up any underlying resources.""" | ||||
raise NotImplementedError() | ||||
def _send(self): | ||||
raise NotImplementedError() | ||||
def pipeline(self): | ||||
raise NotImplementedError() | ||||
r4792 | def timer(self, stat, rate=1, tags=None): | |||
return Timer(self, stat, rate, tags) | ||||
r4632 | ||||
r4792 | def timing(self, stat, delta, rate=1, tags=None): | |||
r4632 | """ | |||
Send new timing information. | ||||
`delta` can be either a number of milliseconds or a timedelta. | ||||
""" | ||||
if isinstance(delta, timedelta): | ||||
# Convert timedelta to number of milliseconds. | ||||
delta = delta.total_seconds() * 1000. | ||||
r4792 | self._send_stat(stat, '%0.6f|ms' % delta, rate, tags) | |||
r4632 | ||||
r4792 | def incr(self, stat, count=1, rate=1, tags=None): | |||
r4632 | """Increment a stat by `count`.""" | |||
r4792 | self._send_stat(stat, '%s|c' % count, rate, tags) | |||
r4632 | ||||
r4792 | def decr(self, stat, count=1, rate=1, tags=None): | |||
r4632 | """Decrement a stat by `count`.""" | |||
r4792 | self.incr(stat, -count, rate, tags) | |||
r4632 | ||||
r4792 | def gauge(self, stat, value, rate=1, delta=False, tags=None): | |||
r4632 | """Set a gauge value.""" | |||
if value < 0 and not delta: | ||||
if rate < 1: | ||||
if random.random() > rate: | ||||
return | ||||
with self.pipeline() as pipe: | ||||
pipe._send_stat(stat, '0|g', 1) | ||||
pipe._send_stat(stat, '%s|g' % value, 1) | ||||
else: | ||||
prefix = '+' if delta and value >= 0 else '' | ||||
r4792 | self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags) | |||
r4632 | ||||
def set(self, stat, value, rate=1): | ||||
"""Set a set value.""" | ||||
self._send_stat(stat, '%s|s' % value, rate) | ||||
r4792 | def _send_stat(self, stat, value, rate, tags=None): | |||
self._after(self._prepare(stat, value, rate, tags)) | ||||
r4632 | ||||
r4792 | def _prepare(self, stat, value, rate, tags=None): | |||
r4632 | if rate < 1: | |||
if random.random() > rate: | ||||
return | ||||
value = '%s|@%s' % (value, rate) | ||||
if self._prefix: | ||||
stat = '%s.%s' % (self._prefix, stat) | ||||
r4792 | res = '%s:%s%s' % ( | |||
stat, | ||||
value, | ||||
("|#" + ",".join(normalize_tags(tags))) if tags else "", | ||||
) | ||||
return res | ||||
r4632 | ||||
def _after(self, data): | ||||
if data: | ||||
self._send(data) | ||||
class PipelineBase(StatsClientBase): | ||||
def __init__(self, client): | ||||
self._client = client | ||||
self._prefix = client._prefix | ||||
self._stats = deque() | ||||
def _send(self): | ||||
raise NotImplementedError() | ||||
def _after(self, data): | ||||
if data is not None: | ||||
self._stats.append(data) | ||||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, typ, value, tb): | ||||
self.send() | ||||
def send(self): | ||||
if not self._stats: | ||||
return | ||||
self._send() | ||||
def pipeline(self): | ||||
return self.__class__(self) | ||||