# HG changeset patch # User RhodeCode Admin # Date 2023-07-18 09:27:55 # Node ID bf4fcdb77a8c970bfbce1ca84a8cc58d0d33098d # Parent ccd88b7c8d7297cdc8ab61c4c43bf43d49523af1 caches: new updated rc_cache with archive cache module and python3 changes diff --git a/rhodecode/lib/rc_cache/__init__.py b/rhodecode/lib/rc_cache/__init__.py --- a/rhodecode/lib/rc_cache/__init__.py +++ b/rhodecode/lib/rc_cache/__init__.py @@ -1,5 +1,3 @@ - - # Copyright (C) 2015-2020 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify @@ -19,7 +17,23 @@ # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging +import threading + from dogpile.cache import register_backend + +from . import region_meta +from .utils import ( + ActiveRegionCache, + FreshRegionCache, + InvalidationContext, + backend_key_generator, + clear_cache_namespace, + get_default_cache_settings, + get_or_create_region, + make_region, + str2bool, +) + module_name = 'rhodecode' register_backend( @@ -41,18 +55,28 @@ register_backend( log = logging.getLogger(__name__) -from . import region_meta -from .utils import ( - get_default_cache_settings, backend_key_generator, get_or_create_region, - clear_cache_namespace, make_region, InvalidationContext, - FreshRegionCache, ActiveRegionCache -) - FILE_TREE_CACHE_VER = 'v4' LICENSE_CACHE_VER = 'v2' +CLEAR_DELETE = 'delete' +CLEAR_INVALIDATE = 'invalidate' + + +def async_creation_runner(cache, somekey, creator, mutex): + + def runner(): + try: + value = creator() + cache.set(somekey, value) + finally: + mutex.release() + + thread = threading.Thread(target=runner) + thread.start() + + def configure_dogpile_cache(settings): cache_dir = settings.get('cache_dir') if cache_dir: @@ -72,13 +96,20 @@ def configure_dogpile_cache(settings): new_region = make_region( name=namespace_name, - function_key_generator=None + function_key_generator=None, + async_creation_runner=None ) - new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name)) + new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.') new_region.function_key_generator = backend_key_generator(new_region.actual_backend) + + async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false')) + if async_creator: + log.debug('configuring region %s with async creator', new_region) + new_region.async_creation_runner = async_creation_runner + if log.isEnabledFor(logging.DEBUG): - region_args = dict(backend=new_region.actual_backend.__class__, + region_args = dict(backend=new_region.actual_backend, region_invalidator=new_region.region_invalidator.__class__) log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args) diff --git a/rhodecode/lib/rc_cache/archive_cache.py b/rhodecode/lib/rc_cache/archive_cache.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/rc_cache/archive_cache.py @@ -0,0 +1,88 @@ +# Copyright (C) 2015-2020 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import logging +import os +import diskcache +from diskcache import RLock + +log = logging.getLogger(__name__) + +cache_meta = None + + +class ReentrantLock(RLock): + def __enter__(self): + reentrant_lock_key = self._key + + log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key) + #self.acquire() + log.debug('Lock for key=%s acquired', reentrant_lock_key) + + def __exit__(self, *exc_info): + #self.release() + pass + + +def get_archival_config(config): + + final_config = { + 'archive_cache.eviction_policy': 'least-frequently-used' + } + + for k, v in config.items(): + if k.startswith('archive_cache'): + final_config[k] = v + + return final_config + + +def get_archival_cache_store(config): + + global cache_meta + if cache_meta is not None: + return cache_meta + + config = get_archival_config(config) + + archive_cache_dir = config['archive_cache.store_dir'] + archive_cache_size_gb = config['archive_cache.cache_size_gb'] + archive_cache_shards = config['archive_cache.cache_shards'] + archive_cache_eviction_policy = config['archive_cache.eviction_policy'] + + log.debug('Initializing archival cache instance under %s', archive_cache_dir) + + # check if it's ok to write, and re-create the archive cache + if not os.path.isdir(archive_cache_dir): + os.makedirs(archive_cache_dir, exist_ok=True) + + d_cache = diskcache.FanoutCache( + archive_cache_dir, shards=archive_cache_shards, + cull_limit=0, # manual eviction required + size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, + eviction_policy=archive_cache_eviction_policy, + timeout=30 + ) + cache_meta = d_cache + return cache_meta + + +def includeme(config): + # init our cache at start + settings = config.get_settings() + get_archival_cache_store(settings) diff --git a/rhodecode/lib/rc_cache/backends.py b/rhodecode/lib/rc_cache/backends.py --- a/rhodecode/lib/rc_cache/backends.py +++ b/rhodecode/lib/rc_cache/backends.py @@ -1,4 +1,3 @@ - # Copyright (C) 2015-2020 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify @@ -17,31 +16,31 @@ # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ -import time import errno +import fcntl +import functools import logging -import functools +import os +import pickle +import time +import gevent import msgpack import redis -import gevent -import pickle -import fcntl + flock_org = fcntl.flock from typing import Union +from dogpile.cache.api import Deserializer, Serializer +from dogpile.cache.backends import file as file_backend from dogpile.cache.backends import memory as memory_backend -from dogpile.cache.backends import file as file_backend from dogpile.cache.backends import redis as redis_backend from dogpile.cache.backends.file import FileLock from dogpile.cache.util import memoized_property -from dogpile.cache.api import Serializer, Deserializer - -from pyramid.settings import asbool from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug -from rhodecode.lib.str_utils import safe_str, safe_bytes - +from rhodecode.lib.str_utils import safe_bytes, safe_str +from rhodecode.lib.type_utils import str2bool _default_max_size = 1024 @@ -53,14 +52,20 @@ class LRUMemoryBackend(memory_backend.Me pickle_values = False def __init__(self, arguments): - max_size = arguments.pop('max_size', _default_max_size) + self.max_size = arguments.pop('max_size', _default_max_size) LRUDictClass = LRUDict if arguments.pop('log_key_count', None): LRUDictClass = LRUDictDebug - arguments['cache_dict'] = LRUDictClass(max_size) - super(LRUMemoryBackend, self).__init__(arguments) + arguments['cache_dict'] = LRUDictClass(self.max_size) + super().__init__(arguments) + + def __repr__(self): + return f'{self.__class__}(maxsize=`{self.max_size}`)' + + def __str__(self): + return self.__repr__() def delete(self, key): try: @@ -75,19 +80,19 @@ class LRUMemoryBackend(memory_backend.Me class PickleSerializer: - serializer: Union[None, Serializer] = staticmethod( # type: ignore + serializer: None | Serializer = staticmethod( # type: ignore functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) ) - deserializer: Union[None, Deserializer] = staticmethod( # type: ignore + deserializer: None | Deserializer = staticmethod( # type: ignore functools.partial(pickle.loads) ) class MsgPackSerializer(object): - serializer: Union[None, Serializer] = staticmethod( # type: ignore + serializer: None | Serializer = staticmethod( # type: ignore msgpack.packb ) - deserializer: Union[None, Deserializer] = staticmethod( # type: ignore + deserializer: None | Deserializer = staticmethod( # type: ignore functools.partial(msgpack.unpackb, use_list=False) ) @@ -135,18 +140,28 @@ class FileNamespaceBackend(PickleSeriali arguments['lock_factory'] = CustomLockFactory db_file = arguments.get('filename') - log.debug('initialing %s DB in %s', self.__class__.__name__, db_file) + log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file) + db_file_dir = os.path.dirname(db_file) + if not os.path.isdir(db_file_dir): + os.makedirs(db_file_dir) + try: - super(FileNamespaceBackend, self).__init__(arguments) + super().__init__(arguments) except Exception: log.exception('Failed to initialize db at: %s', db_file) raise def __repr__(self): - return '{} `{}`'.format(self.__class__, self.filename) + return f'{self.__class__}(file=`{self.filename}`)' + + def __str__(self): + return self.__repr__() + + def _get_keys_pattern(self, prefix: bytes = b''): + return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix)) def list_keys(self, prefix: bytes = b''): - prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix)) + prefix = self._get_keys_pattern(prefix) def cond(dbm_key: bytes): if not prefix: @@ -171,14 +186,22 @@ class BaseRedisBackend(redis_backend.Red key_prefix = '' def __init__(self, arguments): - super(BaseRedisBackend, self).__init__(arguments) + self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host' + super().__init__(arguments) + self._lock_timeout = self.lock_timeout - self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True)) + self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True)) if self._lock_auto_renewal and not self._lock_timeout: # set default timeout for auto_renewal self._lock_timeout = 30 + def __repr__(self): + return f'{self.__class__}(conn=`{self.db_conn}`)' + + def __str__(self): + return self.__repr__() + def _create_client(self): args = {} @@ -197,8 +220,11 @@ class BaseRedisBackend(redis_backend.Red ) self.reader_client = self.writer_client - def list_keys(self, prefix=''): - prefix = '{}:{}*'.format(self.key_prefix, prefix) + def _get_keys_pattern(self, prefix: bytes = b''): + return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix)) + + def list_keys(self, prefix: bytes = b''): + prefix = self._get_keys_pattern(prefix) return self.reader_client.keys(prefix) def get_store(self): @@ -206,7 +232,7 @@ class BaseRedisBackend(redis_backend.Red def get_mutex(self, key): if self.distributed_lock: - lock_key = '_lock_{0}'.format(safe_str(key)) + lock_key = f'_lock_{safe_str(key)}' return get_mutex_lock( self.writer_client, lock_key, self._lock_timeout, @@ -243,10 +269,10 @@ def get_mutex_lock(client, lock_key, loc ) def __repr__(self): - return "{}:{}".format(self.__class__.__name__, lock_key) + return f"{self.__class__.__name__}:{lock_key}" def __str__(self): - return "{}:{}".format(self.__class__.__name__, lock_key) + return f"{self.__class__.__name__}:{lock_key}" def __init__(self): self.lock = self.get_lock() diff --git a/rhodecode/lib/rc_cache/cache_key_meta.py b/rhodecode/lib/rc_cache/cache_key_meta.py --- a/rhodecode/lib/rc_cache/cache_key_meta.py +++ b/rhodecode/lib/rc_cache/cache_key_meta.py @@ -18,11 +18,12 @@ # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ -import os -import sys import atexit import logging +import os import signal +import sys + import rhodecode log = logging.getLogger(__name__) @@ -41,7 +42,7 @@ def sigHandler(signo, frame): def free_cache_keys(*args): - from rhodecode.model.db import Session, CacheKey + from rhodecode.model.db import CacheKey, Session if rhodecode.is_test: return diff --git a/rhodecode/lib/rc_cache/region_meta.py b/rhodecode/lib/rc_cache/region_meta.py --- a/rhodecode/lib/rc_cache/region_meta.py +++ b/rhodecode/lib/rc_cache/region_meta.py @@ -1,5 +1,3 @@ - - # Copyright (C) 2015-2020 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify @@ -17,6 +15,7 @@ # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ + import os import tempfile diff --git a/rhodecode/lib/rc_cache/utils.py b/rhodecode/lib/rc_cache/utils.py --- a/rhodecode/lib/rc_cache/utils.py +++ b/rhodecode/lib/rc_cache/utils.py @@ -15,22 +15,22 @@ # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ -import os -import time -import logging + import functools -import decorator +import logging +import os import threading +import time +import decorator from dogpile.cache import CacheRegion import rhodecode from rhodecode.lib.hash_utils import sha1 +from rhodecode.lib.str_utils import safe_bytes from rhodecode.lib.type_utils import str2bool -from rhodecode.lib.str_utils import safe_bytes -from rhodecode.lib.rc_cache import cache_key_meta -from rhodecode.lib.rc_cache import region_meta +from . import region_meta, cache_key_meta log = logging.getLogger(__name__) @@ -44,6 +44,9 @@ def isCython(func): class RhodeCodeCacheRegion(CacheRegion): + def __repr__(self): + return f'{self.__class__}(name={self.name})' + def conditional_cache_on_arguments( self, namespace=None, expiration_time=None, @@ -53,15 +56,17 @@ class RhodeCodeCacheRegion(CacheRegion): condition=True): """ Custom conditional decorator, that will not touch any dogpile internals if - condition isn't meet. This works a bit different than should_cache_fn + condition isn't meet. This works a bit different from should_cache_fn And it's faster in cases we don't ever want to compute cached values """ expiration_time_is_callable = callable(expiration_time) + if not namespace: + namespace = getattr(self, '_default_namespace', None) if function_key_generator is None: function_key_generator = self.function_key_generator - def get_or_create_for_user_func(key_generator, user_func, *arg, **kw): + def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw): if not condition: log.debug('Calling un-cached method:%s', user_func.__name__) @@ -71,7 +76,7 @@ class RhodeCodeCacheRegion(CacheRegion): log.debug('un-cached method:%s took %.4fs', user_func.__name__, total) return result - key = key_generator(*arg, **kw) + key = func_key_generator(*arg, **kw) timeout = expiration_time() if expiration_time_is_callable \ else expiration_time @@ -146,54 +151,69 @@ def compute_key_from_params(*args): return sha1(safe_bytes("_".join(map(str, args)))) -def backend_key_generator(backend): - """ - Special wrapper that also sends over the backend to the key generator - """ - def wrapper(namespace, fn): - return key_generator(backend, namespace, fn) - return wrapper - - -def key_generator(backend, namespace, fn): - fname = fn.__name__ +def custom_key_generator(backend, namespace, fn): + func_name = fn.__name__ def generate_key(*args): - backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix' + backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix' namespace_pref = namespace or 'default_namespace' arg_key = compute_key_from_params(*args) - final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key) + final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}" return final_key return generate_key -def get_or_create_region(region_name, region_namespace=None): - from rhodecode.lib.rc_cache.backends import FileNamespaceBackend +def backend_key_generator(backend): + """ + Special wrapper that also sends over the backend to the key generator + """ + def wrapper(namespace, fn): + return custom_key_generator(backend, namespace, fn) + return wrapper + + +def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False): + from .backends import FileNamespaceBackend + from . import async_creation_runner + region_obj = region_meta.dogpile_cache_regions.get(region_name) if not region_obj: - raise EnvironmentError( - 'Region `{}` not in configured: {}.'.format( - region_name, list(region_meta.dogpile_cache_regions.keys()))) + reg_keys = list(region_meta.dogpile_cache_regions.keys()) + raise EnvironmentError(f'Region `{region_name}` not in configured: {reg_keys}.') + + region_uid_name = f'{region_name}:{region_namespace}' - region_uid_name = '{}:{}'.format(region_name, region_namespace) if isinstance(region_obj.actual_backend, FileNamespaceBackend): + if not region_namespace: + raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param') + region_exist = region_meta.dogpile_cache_regions.get(region_namespace) if region_exist: log.debug('Using already configured region: %s', region_namespace) return region_exist - cache_dir = region_meta.dogpile_config_defaults['cache_dir'] + expiration_time = region_obj.expiration_time - if not os.path.isdir(cache_dir): - os.makedirs(cache_dir) + cache_dir = region_meta.dogpile_config_defaults['cache_dir'] + namespace_cache_dir = cache_dir + + # we default the namespace_cache_dir to our default cache dir. + # however if this backend is configured with filename= param, we prioritize that + # so all caches within that particular region, even those namespaced end up in the same path + if region_obj.actual_backend.filename: + namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename) + + if not os.path.isdir(namespace_cache_dir): + os.makedirs(namespace_cache_dir) new_region = make_region( name=region_uid_name, function_key_generator=backend_key_generator(region_obj.actual_backend) ) + namespace_filename = os.path.join( - cache_dir, "{}.cache.dbm".format(region_namespace)) + namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db") # special type that allows 1db per namespace new_region.configure( backend='dogpile.cache.rc.file_namespace', @@ -205,19 +225,34 @@ def get_or_create_region(region_name, re log.debug('configuring new region: %s', region_uid_name) region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region + region_obj._default_namespace = region_namespace + if use_async_runner: + region_obj.async_creation_runner = async_creation_runner return region_obj -def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False): - region = get_or_create_region(cache_region, cache_namespace_uid) - cache_keys = region.backend.list_keys(prefix=cache_namespace_uid) - num_delete_keys = len(cache_keys) - if invalidate: - region.invalidate(hard=False) - else: - if num_delete_keys: - region.delete_multi(cache_keys) - return num_delete_keys +def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str): + from . import CLEAR_DELETE, CLEAR_INVALIDATE + + if not isinstance(cache_region, RhodeCodeCacheRegion): + cache_region = get_or_create_region(cache_region, cache_namespace_uid) + log.debug('clearing cache region: %s with method=%s', cache_region, method) + + num_affected_keys = None + + if method == CLEAR_INVALIDATE: + # NOTE: The CacheRegion.invalidate() method’s default mode of + # operation is to set a timestamp local to this CacheRegion in this Python process only. + # It does not impact other Python processes or regions as the timestamp is only stored locally in memory. + cache_region.invalidate(hard=True) + + if method == CLEAR_DELETE: + cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid) + num_affected_keys = len(cache_keys) + if num_affected_keys: + cache_region.delete_multi(cache_keys) + + return num_affected_keys class ActiveRegionCache(object): @@ -286,7 +321,7 @@ class InvalidationContext(object): if thread_scoped is None: # if we set "default" we can override this via .ini settings - thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped')) + thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped') # Append the thread id to the cache key if this invalidation context # should be scoped to the current thread. @@ -346,7 +381,7 @@ class InvalidationContext(object): return FreshRegionCache(context=self, cache_data=cache_data) def __exit__(self, exc_type, exc_val, exc_tb): - from rhodecode.model.db import Session, IntegrityError + from rhodecode.model.db import IntegrityError, Session # save compute time self.compute_time = time.time() - self._start_time