# Copyright (C) 2015-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import functools import logging import os import threading import time import decorator from dogpile.cache import CacheRegion import rhodecode from ...lib.hash_utils import sha1 from ...lib.str_utils import safe_bytes from ...lib.type_utils import str2bool # noqa :required by imports from .utils from . import region_meta log = logging.getLogger(__name__) def isCython(func): """ Private helper that checks if a function is a cython function. """ return func.__class__.__name__ == 'cython_function_or_method' class RhodeCodeCacheRegion(CacheRegion): def __repr__(self): return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`' def conditional_cache_on_arguments( self, namespace=None, expiration_time=None, should_cache_fn=None, to_str=str, function_key_generator=None, condition=True): """ Custom conditional decorator, that will not touch any dogpile internals if condition isn't meet. This works a bit different from should_cache_fn And it's faster in cases we don't ever want to compute cached values """ expiration_time_is_callable = callable(expiration_time) if not namespace: namespace = getattr(self, '_default_namespace', None) if function_key_generator is None: function_key_generator = self.function_key_generator def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw): if not condition: log.debug('Calling un-cached method:%s', user_func.__name__) start = time.time() result = user_func(*arg, **kw) total = time.time() - start log.debug('un-cached method:%s took %.4fs', user_func.__name__, total) return result key = func_key_generator(*arg, **kw) timeout = expiration_time() if expiration_time_is_callable \ else expiration_time log.debug('Calling cached method:`%s`', user_func.__name__) return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw)) def cache_decorator(user_func): if to_str is str: # backwards compatible key_generator = function_key_generator(namespace, user_func) else: key_generator = function_key_generator(namespace, user_func, to_str=to_str) def refresh(*arg, **kw): """ Like invalidate, but regenerates the value instead """ key = key_generator(*arg, **kw) value = user_func(*arg, **kw) self.set(key, value) return value def invalidate(*arg, **kw): key = key_generator(*arg, **kw) self.delete(key) def set_(value, *arg, **kw): key = key_generator(*arg, **kw) self.set(key, value) def get(*arg, **kw): key = key_generator(*arg, **kw) return self.get(key) user_func.set = set_ user_func.invalidate = invalidate user_func.get = get user_func.refresh = refresh user_func.key_generator = key_generator user_func.original = user_func # Use `decorate` to preserve the signature of :param:`user_func`. return decorator.decorate(user_func, functools.partial( get_or_create_for_user_func, key_generator)) return cache_decorator def make_region(*arg, **kw): return RhodeCodeCacheRegion(*arg, **kw) def get_default_cache_settings(settings, prefixes=None): prefixes = prefixes or [] cache_settings = {} for key in settings.keys(): for prefix in prefixes: if key.startswith(prefix): name = key.split(prefix)[1].strip() val = settings[key] if isinstance(val, str): val = val.strip() cache_settings[name] = val return cache_settings def compute_key_from_params(*args): """ Helper to compute key from given params to be used in cache manager """ return sha1(safe_bytes("_".join(map(str, args)))) def custom_key_generator(backend, namespace, fn): func_name = fn.__name__ def generate_key(*args): backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix' namespace_pref = namespace or 'default_namespace' arg_key = compute_key_from_params(*args) final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}" return final_key return generate_key def backend_key_generator(backend): """ Special wrapper that also sends over the backend to the key generator """ def wrapper(namespace, fn): return custom_key_generator(backend, namespace, fn) return wrapper def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False, force=False): from .backends import FileNamespaceBackend from . import async_creation_runner region_obj = region_meta.dogpile_cache_regions.get(region_name) if not region_obj: reg_keys = list(region_meta.dogpile_cache_regions.keys()) raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.') region_uid_name = f'{region_name}:{region_namespace}' # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region if isinstance(region_obj.actual_backend, FileNamespaceBackend): if not region_namespace: raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param') region_exist = region_meta.dogpile_cache_regions.get(region_namespace) if region_exist and not force: log.debug('Using already configured region: %s', region_namespace) return region_exist expiration_time = region_obj.expiration_time cache_dir = region_meta.dogpile_config_defaults['cache_dir'] namespace_cache_dir = cache_dir # we default the namespace_cache_dir to our default cache dir. # however, if this backend is configured with filename= param, we prioritize that # so all caches within that particular region, even those namespaced end up in the same path if region_obj.actual_backend.filename: namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename) if not os.path.isdir(namespace_cache_dir): os.makedirs(namespace_cache_dir) new_region = make_region( name=region_uid_name, function_key_generator=backend_key_generator(region_obj.actual_backend) ) namespace_filename = os.path.join( namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db") # special type that allows 1db per namespace new_region.configure( backend='dogpile.cache.rc.file_namespace', expiration_time=expiration_time, arguments={"filename": namespace_filename} ) # create and save in region caches log.debug('configuring new region: %s', region_uid_name) region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region region_obj._default_namespace = region_namespace if use_async_runner: region_obj.async_creation_runner = async_creation_runner return region_obj def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int: from . import CLEAR_DELETE, CLEAR_INVALIDATE if not isinstance(cache_region, RhodeCodeCacheRegion): cache_region = get_or_create_region(cache_region, cache_namespace_uid) log.debug('clearing cache region: %s [prefix:%s] with method=%s', cache_region, cache_namespace_uid, method) num_affected_keys = 0 if method == CLEAR_INVALIDATE: # NOTE: The CacheRegion.invalidate() method’s default mode of # operation is to set a timestamp local to this CacheRegion in this Python process only. # It does not impact other Python processes or regions as the timestamp is only stored locally in memory. cache_region.invalidate(hard=True) if method == CLEAR_DELETE: num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid) return num_affected_keys class ActiveRegionCache(object): def __init__(self, context, cache_data: dict): self.context = context self.cache_data = cache_data @property def state_uid(self) -> str: return self.cache_data['cache_state_uid'] class InvalidationContext(object): """ usage:: from rhodecode.lib import rc_cache repo_namespace_key = 'some-cache-for-repo-id-100' inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) def cache_generator(_state_uid): @region.conditional_cache_on_arguments(namespace='some-common-namespace-100') def _dummy_func(*args): # compute heavy function return _state_uid, 'result' return _dummy_func with inv_context_manager as invalidation_context: cache_state_uid = invalidation_context.state_uid cache_func = cache_generator(cache_state_uid) previous_state_uid, result = cache_func(*call_args) should_invalidate = previous_state_uid != cache_state_uid if should_invalidate: _, result = cache_func.refresh(*call_args) # To send global invalidation signal, simply run CacheKey.set_invalidate(repo_namespace_key) """ def __repr__(self): return f'' def __init__(self, key, raise_exception=False, thread_scoped=None): self.cache_key = key self.raise_exception = raise_exception self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT' self.thread_id = 'global' if thread_scoped is None: # if we set "default" we can override this via .ini settings thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped') # Append the thread id to the cache key if this invalidation context # should be scoped to the current thread. if thread_scoped is True: self.thread_id = threading.current_thread().ident self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}' self.compute_time = 0 def get_or_create_cache_obj(self): from rhodecode.model.db import CacheKey, Session, IntegrityError cache_obj = CacheKey.get_active_cache(self.cache_key) log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key) if not cache_obj: # generate new UID for non-existing cache object cache_state_uid = CacheKey.generate_new_state_uid() cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}', cache_state_uid=cache_state_uid, cache_active=True) try: Session().add(cache_obj) Session().commit() except IntegrityError: # if we catch integrity error, it means we inserted this object # assumption is that's really an edge race-condition case and # it's safe is to skip it Session().rollback() except Exception: log.exception('Failed to commit on cache key update') Session().rollback() if self.raise_exception: raise return cache_obj def __enter__(self): log.debug('Entering cache invalidation check context: %s', self) self._start_time = time.time() self.cache_obj = self.get_or_create_cache_obj() cache_data = self.cache_obj.get_dict() return ActiveRegionCache(context=self, cache_data=cache_data) def __exit__(self, exc_type, exc_val, exc_tb): # save compute time self.compute_time = time.time() - self._start_time