##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5304:db210dc1 default
r5496:cab50adf default
Show More
backends.py
335 lines | 10.5 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2015-2023 RhodeCode GmbH
caches: rewrite of auth/permission caches to dogpile.
r2845 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
caches: updated cache backend to new vcsserver caches implementation.
r3848
caches: use a gevent compatible file-lock mechanism....
r2878 import errno
caches: new updated rc_cache with archive cache module and python3 changes
r5067 import fcntl
import functools
caches: use a gevent compatible file-lock mechanism....
r2878 import logging
caches: new updated rc_cache with archive cache module and python3 changes
r5067 import os
import pickle
import time
caches: use a gevent compatible file-lock mechanism....
r2878
caches: new updated rc_cache with archive cache module and python3 changes
r5067 import gevent
caches: synced cache logic with vcsserver.
r3851 import msgpack
python3: fixed some compat problems
r4916 import redis
caches: new updated rc_cache with archive cache module and python3 changes
r5067
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 flock_org = fcntl.flock
from typing import Union
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: new updated rc_cache with archive cache module and python3 changes
r5067 from dogpile.cache.api import Deserializer, Serializer
from dogpile.cache.backends import file as file_backend
caches: rewrite of auth/permission caches to dogpile.
r2845 from dogpile.cache.backends import memory as memory_backend
from dogpile.cache.backends import redis as redis_backend
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 from dogpile.cache.backends.file import FileLock
caches: use a gevent compatible file-lock mechanism....
r2878 from dogpile.cache.util import memoized_property
caches: optimized defaults for safer more reliable behaviour
r4720
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking...
r2945 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
caches: new updated rc_cache with archive cache module and python3 changes
r5067 from rhodecode.lib.str_utils import safe_bytes, safe_str
from rhodecode.lib.type_utils import str2bool
caches: rewrite of auth/permission caches to dogpile.
r2845
_default_max_size = 1024
caches: use a gevent compatible file-lock mechanism....
r2878 log = logging.getLogger(__name__)
caches: rewrite of auth/permission caches to dogpile.
r2845
class LRUMemoryBackend(memory_backend.MemoryBackend):
caches: synced cache logic with vcsserver.
r3851 key_prefix = 'lru_mem_backend'
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882 pickle_values = False
caches: rewrite of auth/permission caches to dogpile.
r2845
def __init__(self, arguments):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 self.max_size = arguments.pop('max_size', _default_max_size)
caches: allow debug of max_size reached by the MemoryLRU cache.
r2887
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking...
r2945 LRUDictClass = LRUDict
if arguments.pop('log_key_count', None):
LRUDictClass = LRUDictDebug
caches: new updated rc_cache with archive cache module and python3 changes
r5067 arguments['cache_dict'] = LRUDictClass(self.max_size)
super().__init__(arguments)
def __repr__(self):
return f'{self.__class__}(maxsize=`{self.max_size}`)'
def __str__(self):
return self.__repr__()
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882 def delete(self, key):
caches: use safer method of purging keys from memory dict. During some concurrency tests...
r2931 try:
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882 del self._cache[key]
caches: use safer method of purging keys from memory dict. During some concurrency tests...
r2931 except KeyError:
# we don't care if key isn't there at deletion
pass
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 def list_keys(self, prefix):
return list(self._cache.keys())
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882 def delete_multi(self, keys):
for key in keys:
caches: use safer method of purging keys from memory dict. During some concurrency tests...
r2931 self.delete(key)
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 def delete_multi_by_prefix(self, prefix):
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
caches: rewrite of auth/permission caches to dogpile.
r2845
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 class PickleSerializer:
caches: new updated rc_cache with archive cache module and python3 changes
r5067 serializer: None | Serializer = staticmethod( # type: ignore
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
)
caches: new updated rc_cache with archive cache module and python3 changes
r5067 deserializer: None | Deserializer = staticmethod( # type: ignore
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 functools.partial(pickle.loads)
)
caches: rewrite of auth/permission caches to dogpile.
r2845
chore(rc-cache): synced with CE code
r5304 class MsgPackSerializer:
caches: new updated rc_cache with archive cache module and python3 changes
r5067 serializer: None | Serializer = staticmethod( # type: ignore
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 msgpack.packb
)
caches: new updated rc_cache with archive cache module and python3 changes
r5067 deserializer: None | Deserializer = staticmethod( # type: ignore
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 functools.partial(msgpack.unpackb, use_list=False)
)
cache: use global flock to prevent recursion when using gevent workers.
r3417
caches: use a gevent compatible file-lock mechanism....
r2878 class CustomLockFactory(FileLock):
@memoized_property
def _module(self):
def gevent_flock(fd, operation):
"""
Gevent compatible flock
"""
# set non-blocking, this will cause an exception if we cannot acquire a lock
operation |= fcntl.LOCK_NB
start_lock_time = time.time()
summary: don't load size on container expand, only on manual action....
r3334 timeout = 60 * 15 # 15min
caches: use a gevent compatible file-lock mechanism....
r2878 while True:
try:
flock_org(fd, operation)
# lock has been acquired
break
except (OSError, IOError) as e:
# raise on other errors than Resource temporarily unavailable
if e.errno != errno.EAGAIN:
raise
elif (time.time() - start_lock_time) > timeout:
# waited to much time on a lock, better fail than loop for ever
cache: fix overwrite of flock timeout, and improve logging.
r2947 log.error('Failed to acquire lock on `%s` after waiting %ss',
self.filename, timeout)
caches: use a gevent compatible file-lock mechanism....
r2878 raise
cache: fix overwrite of flock timeout, and improve logging.
r2947 wait_timeout = 0.03
log.debug('Failed to acquire lock on `%s`, retry in %ss',
self.filename, wait_timeout)
gevent.sleep(wait_timeout)
caches: use a gevent compatible file-lock mechanism....
r2878
fcntl.flock = gevent_flock
return fcntl
caches: synced cache logic with vcsserver.
r3851 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
key_prefix = 'file_backend'
caches: rewrite of auth/permission caches to dogpile.
r2845
def __init__(self, arguments):
caches: use a gevent compatible file-lock mechanism....
r2878 arguments['lock_factory'] = CustomLockFactory
caches: log errors when doing an initilize of DB caches.
r4423 db_file = arguments.get('filename')
caches: new updated rc_cache with archive cache module and python3 changes
r5067 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
db_file_dir = os.path.dirname(db_file)
if not os.path.isdir(db_file_dir):
os.makedirs(db_file_dir)
caches: log errors when doing an initilize of DB caches.
r4423 try:
caches: new updated rc_cache with archive cache module and python3 changes
r5067 super().__init__(arguments)
caches: log errors when doing an initilize of DB caches.
r4423 except Exception:
caches: make sure we init caches on repo names withou '/' to not create a new cache subpath
r4766 log.exception('Failed to initialize db at: %s', db_file)
caches: log errors when doing an initilize of DB caches.
r4423 raise
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: added reprs
r3933 def __repr__(self):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 return f'{self.__class__}(file=`{self.filename}`)'
def __str__(self):
return self.__repr__()
def _get_keys_pattern(self, prefix: bytes = b''):
return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
caches: added reprs
r3933
caches: dbm backend needs to use bytes to list keys
r5022 def list_keys(self, prefix: bytes = b''):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 prefix = self._get_keys_pattern(prefix)
caches: synced cache logic with vcsserver.
r3851
caches: dbm backend needs to use bytes to list keys
r5022 def cond(dbm_key: bytes):
caches: don't use beaker for file caches anymore
r2846 if not prefix:
return True
caches: dbm backend needs to use bytes to list keys
r5022 if dbm_key.startswith(prefix):
caches: don't use beaker for file caches anymore
r2846 return True
return False
caches: rewrite of auth/permission caches to dogpile.
r2845 with self._dbm_file(True) as dbm:
caches: report damaged DB on key iterations too not only the GET call
r4701 try:
caches: dbm backend needs to use bytes to list keys
r5022 return list(filter(cond, dbm.keys()))
caches: report damaged DB on key iterations too not only the GET call
r4701 except Exception:
log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
raise
caches: rewrite of auth/permission caches to dogpile.
r2845
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 def delete_multi_by_prefix(self, prefix):
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
caches: rewrite of auth/permission caches to dogpile.
r2845 def get_store(self):
return self.filename
caches: synced cache logic with vcsserver.
r3851 class BaseRedisBackend(redis_backend.RedisBackend):
cache: allow controlling lock_auto_renewal via .ini config
r4719 key_prefix = ''
def __init__(self, arguments):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
super().__init__(arguments)
cache: allow controlling lock_auto_renewal via .ini config
r4719 self._lock_timeout = self.lock_timeout
caches: new updated rc_cache with archive cache module and python3 changes
r5067 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
cache: allow controlling lock_auto_renewal via .ini config
r4719
if self._lock_auto_renewal and not self._lock_timeout:
# set default timeout for auto_renewal
caches: optimized defaults for safer more reliable behaviour
r4720 self._lock_timeout = 30
dogpile: use connection_pool for redis backend which is faster in gevent scenarios
r3930
caches: new updated rc_cache with archive cache module and python3 changes
r5067 def __repr__(self):
return f'{self.__class__}(conn=`{self.db_conn}`)'
def __str__(self):
return self.__repr__()
dogpile: use connection_pool for redis backend which is faster in gevent scenarios
r3930 def _create_client(self):
args = {}
if self.url is not None:
args.update(url=self.url)
else:
args.update(
host=self.host, password=self.password,
port=self.port, db=self.db
)
connection_pool = redis.ConnectionPool(**args)
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 self.writer_client = redis.StrictRedis(
connection_pool=connection_pool
)
self.reader_client = self.writer_client
dogpile: use connection_pool for redis backend which is faster in gevent scenarios
r3930
caches: new updated rc_cache with archive cache module and python3 changes
r5067 def _get_keys_pattern(self, prefix: bytes = b''):
return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
def list_keys(self, prefix: bytes = b''):
prefix = self._get_keys_pattern(prefix)
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 return self.reader_client.keys(prefix)
caches: rewrite of auth/permission caches to dogpile.
r2845
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 def delete_multi_by_prefix(self, prefix, use_lua=False):
if use_lua:
# high efficient LUA script to delete ALL keys by prefix...
lua = """local keys = redis.call('keys', ARGV[1])
for i=1,#keys,5000 do
redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
end
return #keys"""
num_affected_keys = self.writer_client.eval(
lua,
0,
f"{prefix}*")
else:
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
caches: rewrite of auth/permission caches to dogpile.
r2845 def get_store(self):
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 return self.reader_client.connection_pool
redis: add logging about lock acquire, this should help in case of locked row debugging
r3466
def get_mutex(self, key):
if self.distributed_lock:
caches: new updated rc_cache with archive cache module and python3 changes
r5067 lock_key = f'_lock_{safe_str(key)}'
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 return get_mutex_lock(
self.writer_client, lock_key,
self._lock_timeout,
auto_renewal=self._lock_auto_renewal
)
redis: add logging about lock acquire, this should help in case of locked row debugging
r3466 else:
return None
caches: synced cache logic with vcsserver.
r3851
class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
key_prefix = 'redis_pickle_backend'
pass
class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
key_prefix = 'redis_msgpack_backend'
pass
caches: improved locking problems with distributed lock new cache backend
r4714
def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
r4985 from rhodecode.lib._vendor import redis_lock
caches: improved locking problems with distributed lock new cache backend
r4714
chore(rc-cache): synced with CE code
r5304 class _RedisLockWrapper:
caches: improved locking problems with distributed lock new cache backend
r4714 """LockWrapper for redis_lock"""
caches: use the same lock instance, otherwise we won't release it....
r4723 @classmethod
def get_lock(cls):
caches: improved locking problems with distributed lock new cache backend
r4714 return redis_lock.Lock(
redis_client=client,
name=lock_key,
expire=lock_timeout,
auto_renewal=auto_renewal,
strict=True,
)
caches: added debug and timings
r4733 def __repr__(self):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 return f"{self.__class__.__name__}:{lock_key}"
caches: added debug and timings
r4733
def __str__(self):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 return f"{self.__class__.__name__}:{lock_key}"
caches: added debug and timings
r4733
caches: use the same lock instance, otherwise we won't release it....
r4723 def __init__(self):
self.lock = self.get_lock()
caches: added debug and timings
r4733 self.lock_key = lock_key
caches: use the same lock instance, otherwise we won't release it....
r4723
caches: improved locking problems with distributed lock new cache backend
r4714 def acquire(self, wait=True):
caches: added debug and timings
r4733 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
caches: use the same lock instance, otherwise we won't release it....
r4723 try:
caches: added debug and timings
r4733 acquired = self.lock.acquire(wait)
log.debug('Got lock for key %s, %s', self.lock_key, acquired)
return acquired
caches: fixed wrong except
r4724 except redis_lock.AlreadyAcquired:
caches: use the same lock instance, otherwise we won't release it....
r4723 return False
locks: handle refresh thread already started
r4726 except redis_lock.AlreadyStarted:
# refresh thread exists, but it also means we acquired the lock
return True
caches: improved locking problems with distributed lock new cache backend
r4714
def release(self):
try:
self.lock.release()
except redis_lock.NotAcquired:
pass
return _RedisLockWrapper()