##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5304:db210dc1 default
r5496:cab50adf default
Show More
backends.py
335 lines | 10.5 KiB | text/x-python | PythonLexer
# Copyright (C) 2015-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import errno
import fcntl
import functools
import logging
import os
import pickle
import time
import gevent
import msgpack
import redis
flock_org = fcntl.flock
from typing import Union
from dogpile.cache.api import Deserializer, Serializer
from dogpile.cache.backends import file as file_backend
from dogpile.cache.backends import memory as memory_backend
from dogpile.cache.backends import redis as redis_backend
from dogpile.cache.backends.file import FileLock
from dogpile.cache.util import memoized_property
from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
from rhodecode.lib.str_utils import safe_bytes, safe_str
from rhodecode.lib.type_utils import str2bool
_default_max_size = 1024
log = logging.getLogger(__name__)
class LRUMemoryBackend(memory_backend.MemoryBackend):
key_prefix = 'lru_mem_backend'
pickle_values = False
def __init__(self, arguments):
self.max_size = arguments.pop('max_size', _default_max_size)
LRUDictClass = LRUDict
if arguments.pop('log_key_count', None):
LRUDictClass = LRUDictDebug
arguments['cache_dict'] = LRUDictClass(self.max_size)
super().__init__(arguments)
def __repr__(self):
return f'{self.__class__}(maxsize=`{self.max_size}`)'
def __str__(self):
return self.__repr__()
def delete(self, key):
try:
del self._cache[key]
except KeyError:
# we don't care if key isn't there at deletion
pass
def list_keys(self, prefix):
return list(self._cache.keys())
def delete_multi(self, keys):
for key in keys:
self.delete(key)
def delete_multi_by_prefix(self, prefix):
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
class PickleSerializer:
serializer: None | Serializer = staticmethod( # type: ignore
functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
)
deserializer: None | Deserializer = staticmethod( # type: ignore
functools.partial(pickle.loads)
)
class MsgPackSerializer:
serializer: None | Serializer = staticmethod( # type: ignore
msgpack.packb
)
deserializer: None | Deserializer = staticmethod( # type: ignore
functools.partial(msgpack.unpackb, use_list=False)
)
class CustomLockFactory(FileLock):
@memoized_property
def _module(self):
def gevent_flock(fd, operation):
"""
Gevent compatible flock
"""
# set non-blocking, this will cause an exception if we cannot acquire a lock
operation |= fcntl.LOCK_NB
start_lock_time = time.time()
timeout = 60 * 15 # 15min
while True:
try:
flock_org(fd, operation)
# lock has been acquired
break
except (OSError, IOError) as e:
# raise on other errors than Resource temporarily unavailable
if e.errno != errno.EAGAIN:
raise
elif (time.time() - start_lock_time) > timeout:
# waited to much time on a lock, better fail than loop for ever
log.error('Failed to acquire lock on `%s` after waiting %ss',
self.filename, timeout)
raise
wait_timeout = 0.03
log.debug('Failed to acquire lock on `%s`, retry in %ss',
self.filename, wait_timeout)
gevent.sleep(wait_timeout)
fcntl.flock = gevent_flock
return fcntl
class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
key_prefix = 'file_backend'
def __init__(self, arguments):
arguments['lock_factory'] = CustomLockFactory
db_file = arguments.get('filename')
log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
db_file_dir = os.path.dirname(db_file)
if not os.path.isdir(db_file_dir):
os.makedirs(db_file_dir)
try:
super().__init__(arguments)
except Exception:
log.exception('Failed to initialize db at: %s', db_file)
raise
def __repr__(self):
return f'{self.__class__}(file=`{self.filename}`)'
def __str__(self):
return self.__repr__()
def _get_keys_pattern(self, prefix: bytes = b''):
return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
def list_keys(self, prefix: bytes = b''):
prefix = self._get_keys_pattern(prefix)
def cond(dbm_key: bytes):
if not prefix:
return True
if dbm_key.startswith(prefix):
return True
return False
with self._dbm_file(True) as dbm:
try:
return list(filter(cond, dbm.keys()))
except Exception:
log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
raise
def delete_multi_by_prefix(self, prefix):
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
def get_store(self):
return self.filename
class BaseRedisBackend(redis_backend.RedisBackend):
key_prefix = ''
def __init__(self, arguments):
self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
super().__init__(arguments)
self._lock_timeout = self.lock_timeout
self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
if self._lock_auto_renewal and not self._lock_timeout:
# set default timeout for auto_renewal
self._lock_timeout = 30
def __repr__(self):
return f'{self.__class__}(conn=`{self.db_conn}`)'
def __str__(self):
return self.__repr__()
def _create_client(self):
args = {}
if self.url is not None:
args.update(url=self.url)
else:
args.update(
host=self.host, password=self.password,
port=self.port, db=self.db
)
connection_pool = redis.ConnectionPool(**args)
self.writer_client = redis.StrictRedis(
connection_pool=connection_pool
)
self.reader_client = self.writer_client
def _get_keys_pattern(self, prefix: bytes = b''):
return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
def list_keys(self, prefix: bytes = b''):
prefix = self._get_keys_pattern(prefix)
return self.reader_client.keys(prefix)
def delete_multi_by_prefix(self, prefix, use_lua=False):
if use_lua:
# high efficient LUA script to delete ALL keys by prefix...
lua = """local keys = redis.call('keys', ARGV[1])
for i=1,#keys,5000 do
redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
end
return #keys"""
num_affected_keys = self.writer_client.eval(
lua,
0,
f"{prefix}*")
else:
cache_keys = self.list_keys(prefix=prefix)
num_affected_keys = len(cache_keys)
if num_affected_keys:
self.delete_multi(cache_keys)
return num_affected_keys
def get_store(self):
return self.reader_client.connection_pool
def get_mutex(self, key):
if self.distributed_lock:
lock_key = f'_lock_{safe_str(key)}'
return get_mutex_lock(
self.writer_client, lock_key,
self._lock_timeout,
auto_renewal=self._lock_auto_renewal
)
else:
return None
class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
key_prefix = 'redis_pickle_backend'
pass
class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
key_prefix = 'redis_msgpack_backend'
pass
def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
from rhodecode.lib._vendor import redis_lock
class _RedisLockWrapper:
"""LockWrapper for redis_lock"""
@classmethod
def get_lock(cls):
return redis_lock.Lock(
redis_client=client,
name=lock_key,
expire=lock_timeout,
auto_renewal=auto_renewal,
strict=True,
)
def __repr__(self):
return f"{self.__class__.__name__}:{lock_key}"
def __str__(self):
return f"{self.__class__.__name__}:{lock_key}"
def __init__(self):
self.lock = self.get_lock()
self.lock_key = lock_key
def acquire(self, wait=True):
log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
try:
acquired = self.lock.acquire(wait)
log.debug('Got lock for key %s, %s', self.lock_key, acquired)
return acquired
except redis_lock.AlreadyAcquired:
return False
except redis_lock.AlreadyStarted:
# refresh thread exists, but it also means we acquired the lock
return True
def release(self):
try:
self.lock.release()
except redis_lock.NotAcquired:
pass
return _RedisLockWrapper()