##// END OF EJS Templates
fix: lfs chunked uploads....
fix: lfs chunked uploads. When testing large file uploads it's found that gunicorn raises NoMoreData instead of returning value. This fixes the problem and doesn't show excesive exceptions for no reason. Previously file upload still worked but spawned errors in logs

File last commit:

r1275:ed405288 default
r1280:b2259b07 default
Show More
backends.py
310 lines | 9.4 KiB | text/x-python | PythonLexer
# RhodeCode VCSServer provides access to different vcs backends via network.
# Copyright (C) 2014-2023 RhodeCode GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#import errno
import fcntl
import functools
import logging
import os
import pickle
#import time
#import gevent
import msgpack
import redis
flock_org = fcntl.flock
from typing import Union
from dogpile.cache.api import Deserializer, Serializer
from dogpile.cache.backends import file as file_backend
from dogpile.cache.backends import memory as memory_backend
from dogpile.cache.backends import redis as redis_backend
from dogpile.cache.backends.file import FileLock
from dogpile.cache.util import memoized_property
from ...lib.memory_lru_dict import LRUDict, LRUDictDebug
from ...lib.str_utils import safe_bytes, safe_str
from ...lib.type_utils import str2bool
_default_max_size = 1024
log = logging.getLogger(__name__)
class LRUMemoryBackend(memory_backend.MemoryBackend):
key_prefix = 'lru_mem_backend'
pickle_values = False
def __init__(self, arguments):
self.max_size = arguments.pop('max_size', _default_max_size)
LRUDictClass = LRUDict
if arguments.pop('log_key_count', None):
LRUDictClass = LRUDictDebug
arguments['cache_dict'] = LRUDictClass(self.max_size)
super().__init__(arguments)
def __repr__(self):
return f'{self.__class__}(maxsize=`{self.max_size}`)'
def __str__(self):
return self.__repr__()
def delete(self, key):
try:
del self._cache[key]
except KeyError:
# we don't care if key isn't there at deletion
pass
def list_keys(self, prefix):
return list(self._cache.keys())
def delete_multi(self, keys):
for key in keys:
self.delete(key)
def delete_multi_by_prefix(self, prefix):
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
class PickleSerializer:
serializer: None | Serializer = staticmethod( # type: ignore
functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
)
deserializer: None | Deserializer = staticmethod( # type: ignore
functools.partial(pickle.loads)
)
class MsgPackSerializer:
serializer: None | Serializer = staticmethod( # type: ignore
msgpack.packb
)
deserializer: None | Deserializer = staticmethod( # type: ignore
functools.partial(msgpack.unpackb, use_list=False)
)
class CustomLockFactory(FileLock):
pass
class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
key_prefix = 'file_backend'
def __init__(self, arguments):
arguments['lock_factory'] = CustomLockFactory
db_file = arguments.get('filename')
log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
db_file_dir = os.path.dirname(db_file)
if not os.path.isdir(db_file_dir):
os.makedirs(db_file_dir)
try:
super().__init__(arguments)
except Exception:
log.exception('Failed to initialize db at: %s', db_file)
raise
def __repr__(self):
return f'{self.__class__}(file=`{self.filename}`)'
def __str__(self):
return self.__repr__()
def _get_keys_pattern(self, prefix: bytes = b''):
return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
def list_keys(self, prefix: bytes = b''):
prefix = self._get_keys_pattern(prefix)
def cond(dbm_key: bytes):
if not prefix:
return True
if dbm_key.startswith(prefix):
return True
return False
with self._dbm_file(True) as dbm:
try:
return list(filter(cond, dbm.keys()))
except Exception:
log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
raise
def delete_multi_by_prefix(self, prefix):
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
def get_store(self):
return self.filename
def cleanup_store(self):
for ext in ("db", "dat", "pag", "dir"):
final_filename = self.filename + os.extsep + ext
if os.path.exists(final_filename):
os.remove(final_filename)
log.warning('Removed dbm file %s', final_filename)
class BaseRedisBackend(redis_backend.RedisBackend):
key_prefix = ''
def __init__(self, arguments):
self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
super().__init__(arguments)
self._lock_timeout = self.lock_timeout
self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
if self._lock_auto_renewal and not self._lock_timeout:
# set default timeout for auto_renewal
self._lock_timeout = 30
def __repr__(self):
return f'{self.__class__}(conn=`{self.db_conn}`)'
def __str__(self):
return self.__repr__()
def _create_client(self):
args = {}
if self.url is not None:
args.update(url=self.url)
else:
args.update(
host=self.host, password=self.password,
port=self.port, db=self.db
)
connection_pool = redis.ConnectionPool(**args)
self.writer_client = redis.StrictRedis(
connection_pool=connection_pool
)
self.reader_client = self.writer_client
def _get_keys_pattern(self, prefix: bytes = b''):
return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
def list_keys(self, prefix: bytes = b''):
prefix = self._get_keys_pattern(prefix)
return self.reader_client.keys(prefix)
def delete_multi_by_prefix(self, prefix, use_lua=False):
if use_lua:
# high efficient LUA script to delete ALL keys by prefix...
lua = """local keys = redis.call('keys', ARGV[1])
for i=1,#keys,5000 do
redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
end
return #keys"""
num_affected_keys = self.writer_client.eval(
lua,
0,
f"{prefix}*")
else:
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
def get_store(self):
return self.reader_client.connection_pool
def get_mutex(self, key):
if self.distributed_lock:
lock_key = f'_lock_{safe_str(key)}'
return get_mutex_lock(
self.writer_client, lock_key,
self._lock_timeout,
auto_renewal=self._lock_auto_renewal
)
else:
return None
class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
key_prefix = 'redis_pickle_backend'
pass
class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
key_prefix = 'redis_msgpack_backend'
pass
def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
from ...lib._vendor import redis_lock
class _RedisLockWrapper:
"""LockWrapper for redis_lock"""
@classmethod
def get_lock(cls):
return redis_lock.Lock(
redis_client=client,
name=lock_key,
expire=lock_timeout,
auto_renewal=auto_renewal,
strict=True,
)
def __repr__(self):
return f"{self.__class__.__name__}:{lock_key}"
def __str__(self):
return f"{self.__class__.__name__}:{lock_key}"
def __init__(self):
self.lock = self.get_lock()
self.lock_key = lock_key
def acquire(self, wait=True):
log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
try:
acquired = self.lock.acquire(wait)
log.debug('Got lock for key %s, %s', self.lock_key, acquired)
return acquired
except redis_lock.AlreadyAcquired:
return False
except redis_lock.AlreadyStarted:
# refresh thread exists, but it also means we acquired the lock
return True
def release(self):
try:
self.lock.release()
except redis_lock.NotAcquired:
pass
return _RedisLockWrapper()