##// END OF EJS Templates
fix: lfs chunked uploads....
fix: lfs chunked uploads. When testing large file uploads it's found that gunicorn raises NoMoreData instead of returning value. This fixes the problem and doesn't show excesive exceptions for no reason. Previously file upload still worked but spawned errors in logs

File last commit:

r1153:bc27ecc0 default
r1280:b2259b07 default
Show More
base.py
154 lines | 4.2 KiB | text/x-python | PythonLexer
metrics: use new statsd client logic, and start gathering new metrics
r1005 import re
application: added statsd client for sending usage statistics.
r920 import random
from collections import deque
from datetime import timedelta
metrics: use new statsd client logic, and start gathering new metrics
r1005 from repoze.lru import lru_cache
application: added statsd client for sending usage statistics.
r920
from .timer import Timer
statsd: fix unicode tags generation as we use str now
r1007 TAG_INVALID_CHARS_RE = re.compile(
r"[^\w\d_\-:/\.]",
#re.UNICODE
)
metrics: use new statsd client logic, and start gathering new metrics
r1005 TAG_INVALID_CHARS_SUBS = "_"
metrics: updated statsd client and fixed some metrics
r1013 # we save and expose methods called by statsd for discovery
buckets_dict = {
}
metrics: use new statsd client logic, and start gathering new metrics
r1005
@lru_cache(maxsize=500)
def _normalize_tags_with_cache(tag_list):
return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
def normalize_tags(tag_list):
# We have to turn our input tag list into a non-mutable tuple for it to
# be hashable (and thus usable) by the @lru_cache decorator.
return _normalize_tags_with_cache(tuple(tag_list))
application: added statsd client for sending usage statistics.
r920
lint: auto-fixes
r1152 class StatsClientBase:
application: added statsd client for sending usage statistics.
r920 """A Base class for various statsd clients."""
def close(self):
"""Used to close and clean up any underlying resources."""
raise NotImplementedError()
def _send(self):
raise NotImplementedError()
def pipeline(self):
raise NotImplementedError()
statsd: synced client with rhodecode
r1027 def timer(self, stat, rate=1, tags=None, auto_send=True):
libs: synced with ce codebase
r1018 """
statsd: synced client with rhodecode
r1027 statsd = StatsdClient.statsd
libs: synced with ce codebase
r1018 with statsd.timer('bucket_name', auto_send=True) as tmr:
# This block will be timed.
vcsserver: removed xargs
r1057 for i in range(0, 100000):
libs: synced with ce codebase
r1018 i ** 2
# you can access time here...
elapsed_ms = tmr.ms
"""
statsd: synced client with rhodecode
r1027 return Timer(self, stat, rate, tags, auto_send=auto_send)
application: added statsd client for sending usage statistics.
r920
metrics: updated statsd client and fixed some metrics
r1013 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
application: added statsd client for sending usage statistics.
r920 """
Send new timing information.
`delta` can be either a number of milliseconds or a timedelta.
"""
if isinstance(delta, timedelta):
# Convert timedelta to number of milliseconds.
delta = delta.total_seconds() * 1000.
metrics: updated statsd client and fixed some metrics
r1013 if use_decimals:
fmt = '%0.6f|ms'
else:
fmt = '%s|ms'
self._send_stat(stat, fmt % delta, rate, tags)
application: added statsd client for sending usage statistics.
r920
metrics: use new statsd client logic, and start gathering new metrics
r1005 def incr(self, stat, count=1, rate=1, tags=None):
application: added statsd client for sending usage statistics.
r920 """Increment a stat by `count`."""
lint: more autofixes
r1153 self._send_stat(stat, f'{count}|c', rate, tags)
application: added statsd client for sending usage statistics.
r920
metrics: use new statsd client logic, and start gathering new metrics
r1005 def decr(self, stat, count=1, rate=1, tags=None):
application: added statsd client for sending usage statistics.
r920 """Decrement a stat by `count`."""
metrics: use new statsd client logic, and start gathering new metrics
r1005 self.incr(stat, -count, rate, tags)
application: added statsd client for sending usage statistics.
r920
metrics: use new statsd client logic, and start gathering new metrics
r1005 def gauge(self, stat, value, rate=1, delta=False, tags=None):
application: added statsd client for sending usage statistics.
r920 """Set a gauge value."""
if value < 0 and not delta:
if rate < 1:
if random.random() > rate:
return
with self.pipeline() as pipe:
pipe._send_stat(stat, '0|g', 1)
lint: more autofixes
r1153 pipe._send_stat(stat, f'{value}|g', 1)
application: added statsd client for sending usage statistics.
r920 else:
prefix = '+' if delta and value >= 0 else ''
lint: more autofixes
r1153 self._send_stat(stat, f'{prefix}{value}|g', rate, tags)
application: added statsd client for sending usage statistics.
r920
def set(self, stat, value, rate=1):
"""Set a set value."""
lint: more autofixes
r1153 self._send_stat(stat, f'{value}|s', rate)
application: added statsd client for sending usage statistics.
r920
metrics: updated statsd client and fixed some metrics
r1013 def histogram(self, stat, value, rate=1, tags=None):
"""Set a histogram"""
lint: more autofixes
r1153 self._send_stat(stat, f'{value}|h', rate, tags)
metrics: updated statsd client and fixed some metrics
r1013
metrics: use new statsd client logic, and start gathering new metrics
r1005 def _send_stat(self, stat, value, rate, tags=None):
self._after(self._prepare(stat, value, rate, tags))
application: added statsd client for sending usage statistics.
r920
metrics: use new statsd client logic, and start gathering new metrics
r1005 def _prepare(self, stat, value, rate, tags=None):
metrics: updated statsd client and fixed some metrics
r1013 global buckets_dict
buckets_dict[stat] = 1
application: added statsd client for sending usage statistics.
r920 if rate < 1:
if random.random() > rate:
return
lint: more autofixes
r1153 value = f'{value}|@{rate}'
application: added statsd client for sending usage statistics.
r920
if self._prefix:
lint: more autofixes
r1153 stat = f'{self._prefix}.{stat}'
application: added statsd client for sending usage statistics.
r920
metrics: use new statsd client logic, and start gathering new metrics
r1005 res = '%s:%s%s' % (
stat,
value,
("|#" + ",".join(normalize_tags(tags))) if tags else "",
)
return res
application: added statsd client for sending usage statistics.
r920
def _after(self, data):
if data:
self._send(data)
class PipelineBase(StatsClientBase):
def __init__(self, client):
self._client = client
self._prefix = client._prefix
self._stats = deque()
def _send(self):
raise NotImplementedError()
def _after(self, data):
if data is not None:
self._stats.append(data)
def __enter__(self):
return self
def __exit__(self, typ, value, tb):
self.send()
def send(self):
if not self._stats:
return
self._send()
def pipeline(self):
return self.__class__(self)