##// END OF EJS Templates
fix: lfs chunked uploads....
fix: lfs chunked uploads. When testing large file uploads it's found that gunicorn raises NoMoreData instead of returning value. This fixes the problem and doesn't show excesive exceptions for no reason. Previously file upload still worked but spawned errors in logs

File last commit:

r1153:bc27ecc0 default
r1280:b2259b07 default
Show More
base.py
154 lines | 4.2 KiB | text/x-python | PythonLexer
import re
import random
from collections import deque
from datetime import timedelta
from repoze.lru import lru_cache
from .timer import Timer
TAG_INVALID_CHARS_RE = re.compile(
r"[^\w\d_\-:/\.]",
#re.UNICODE
)
TAG_INVALID_CHARS_SUBS = "_"
# we save and expose methods called by statsd for discovery
buckets_dict = {
}
@lru_cache(maxsize=500)
def _normalize_tags_with_cache(tag_list):
return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
def normalize_tags(tag_list):
# We have to turn our input tag list into a non-mutable tuple for it to
# be hashable (and thus usable) by the @lru_cache decorator.
return _normalize_tags_with_cache(tuple(tag_list))
class StatsClientBase:
"""A Base class for various statsd clients."""
def close(self):
"""Used to close and clean up any underlying resources."""
raise NotImplementedError()
def _send(self):
raise NotImplementedError()
def pipeline(self):
raise NotImplementedError()
def timer(self, stat, rate=1, tags=None, auto_send=True):
"""
statsd = StatsdClient.statsd
with statsd.timer('bucket_name', auto_send=True) as tmr:
# This block will be timed.
for i in range(0, 100000):
i ** 2
# you can access time here...
elapsed_ms = tmr.ms
"""
return Timer(self, stat, rate, tags, auto_send=auto_send)
def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
"""
Send new timing information.
`delta` can be either a number of milliseconds or a timedelta.
"""
if isinstance(delta, timedelta):
# Convert timedelta to number of milliseconds.
delta = delta.total_seconds() * 1000.
if use_decimals:
fmt = '%0.6f|ms'
else:
fmt = '%s|ms'
self._send_stat(stat, fmt % delta, rate, tags)
def incr(self, stat, count=1, rate=1, tags=None):
"""Increment a stat by `count`."""
self._send_stat(stat, f'{count}|c', rate, tags)
def decr(self, stat, count=1, rate=1, tags=None):
"""Decrement a stat by `count`."""
self.incr(stat, -count, rate, tags)
def gauge(self, stat, value, rate=1, delta=False, tags=None):
"""Set a gauge value."""
if value < 0 and not delta:
if rate < 1:
if random.random() > rate:
return
with self.pipeline() as pipe:
pipe._send_stat(stat, '0|g', 1)
pipe._send_stat(stat, f'{value}|g', 1)
else:
prefix = '+' if delta and value >= 0 else ''
self._send_stat(stat, f'{prefix}{value}|g', rate, tags)
def set(self, stat, value, rate=1):
"""Set a set value."""
self._send_stat(stat, f'{value}|s', rate)
def histogram(self, stat, value, rate=1, tags=None):
"""Set a histogram"""
self._send_stat(stat, f'{value}|h', rate, tags)
def _send_stat(self, stat, value, rate, tags=None):
self._after(self._prepare(stat, value, rate, tags))
def _prepare(self, stat, value, rate, tags=None):
global buckets_dict
buckets_dict[stat] = 1
if rate < 1:
if random.random() > rate:
return
value = f'{value}|@{rate}'
if self._prefix:
stat = f'{self._prefix}.{stat}'
res = '%s:%s%s' % (
stat,
value,
("|#" + ",".join(normalize_tags(tags))) if tags else "",
)
return res
def _after(self, data):
if data:
self._send(data)
class PipelineBase(StatsClientBase):
def __init__(self, client):
self._client = client
self._prefix = client._prefix
self._stats = deque()
def _send(self):
raise NotImplementedError()
def _after(self, data):
if data is not None:
self._stats.append(data)
def __enter__(self):
return self
def __exit__(self, typ, value, tb):
self.send()
def send(self):
if not self._stats:
return
self._send()
def pipeline(self):
return self.__class__(self)