utils.py
358 lines
| 12.9 KiB
| text/x-python
|
PythonLexer
r5288 | # Copyright (C) 2015-2024 RhodeCode GmbH | |||
r2845 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r5067 | ||||
r2891 | import functools | |||
r5067 | import logging | |||
import os | ||||
r2932 | import threading | |||
r5067 | import time | |||
r2891 | ||||
r5067 | import decorator | |||
r2891 | from dogpile.cache import CacheRegion | |||
r2845 | ||||
r2932 | import rhodecode | |||
r4922 | from rhodecode.lib.hash_utils import sha1 | |||
r5067 | from rhodecode.lib.str_utils import safe_bytes | |||
r5106 | from rhodecode.lib.type_utils import str2bool # noqa :required by imports from .utils | |||
r2932 | ||||
r5287 | from . import region_meta | |||
r2845 | ||||
log = logging.getLogger(__name__) | ||||
r4175 | def isCython(func): | |||
""" | ||||
Private helper that checks if a function is a cython function. | ||||
""" | ||||
return func.__class__.__name__ == 'cython_function_or_method' | ||||
r2891 | class RhodeCodeCacheRegion(CacheRegion): | |||
r5067 | def __repr__(self): | |||
return f'{self.__class__}(name={self.name})' | ||||
r2891 | def conditional_cache_on_arguments( | |||
self, namespace=None, | ||||
expiration_time=None, | ||||
should_cache_fn=None, | ||||
r4916 | to_str=str, | |||
r2891 | function_key_generator=None, | |||
condition=True): | ||||
""" | ||||
Custom conditional decorator, that will not touch any dogpile internals if | ||||
r5067 | condition isn't meet. This works a bit different from should_cache_fn | |||
r2891 | And it's faster in cases we don't ever want to compute cached values | |||
""" | ||||
r4916 | expiration_time_is_callable = callable(expiration_time) | |||
r5067 | if not namespace: | |||
namespace = getattr(self, '_default_namespace', None) | ||||
r2891 | ||||
if function_key_generator is None: | ||||
function_key_generator = self.function_key_generator | ||||
r5067 | def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw): | |||
r3863 | ||||
if not condition: | ||||
r4916 | log.debug('Calling un-cached method:%s', user_func.__name__) | |||
r4733 | start = time.time() | |||
result = user_func(*arg, **kw) | ||||
total = time.time() - start | ||||
r4916 | log.debug('un-cached method:%s took %.4fs', user_func.__name__, total) | |||
r4733 | return result | |||
r3863 | ||||
r5067 | key = func_key_generator(*arg, **kw) | |||
r3863 | ||||
timeout = expiration_time() if expiration_time_is_callable \ | ||||
else expiration_time | ||||
r4916 | log.debug('Calling cached method:`%s`', user_func.__name__) | |||
r3863 | return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw)) | |||
def cache_decorator(user_func): | ||||
r4916 | if to_str is str: | |||
r2891 | # backwards compatible | |||
r3863 | key_generator = function_key_generator(namespace, user_func) | |||
r2891 | else: | |||
r3863 | key_generator = function_key_generator(namespace, user_func, to_str=to_str) | |||
r2891 | ||||
r3863 | def refresh(*arg, **kw): | |||
""" | ||||
Like invalidate, but regenerates the value instead | ||||
""" | ||||
key = key_generator(*arg, **kw) | ||||
value = user_func(*arg, **kw) | ||||
self.set(key, value) | ||||
return value | ||||
r2891 | ||||
def invalidate(*arg, **kw): | ||||
key = key_generator(*arg, **kw) | ||||
self.delete(key) | ||||
def set_(value, *arg, **kw): | ||||
key = key_generator(*arg, **kw) | ||||
self.set(key, value) | ||||
def get(*arg, **kw): | ||||
key = key_generator(*arg, **kw) | ||||
return self.get(key) | ||||
r3863 | user_func.set = set_ | |||
user_func.invalidate = invalidate | ||||
user_func.get = get | ||||
user_func.refresh = refresh | ||||
user_func.key_generator = key_generator | ||||
user_func.original = user_func | ||||
r2891 | ||||
r3863 | # Use `decorate` to preserve the signature of :param:`user_func`. | |||
r4175 | return decorator.decorate(user_func, functools.partial( | |||
r3863 | get_or_create_for_user_func, key_generator)) | |||
r2891 | ||||
r3863 | return cache_decorator | |||
r2891 | ||||
def make_region(*arg, **kw): | ||||
return RhodeCodeCacheRegion(*arg, **kw) | ||||
r2845 | def get_default_cache_settings(settings, prefixes=None): | |||
prefixes = prefixes or [] | ||||
cache_settings = {} | ||||
for key in settings.keys(): | ||||
for prefix in prefixes: | ||||
if key.startswith(prefix): | ||||
name = key.split(prefix)[1].strip() | ||||
val = settings[key] | ||||
r4908 | if isinstance(val, str): | |||
r2845 | val = val.strip() | |||
cache_settings[name] = val | ||||
return cache_settings | ||||
def compute_key_from_params(*args): | ||||
""" | ||||
Helper to compute key from given params to be used in cache manager | ||||
""" | ||||
r4916 | return sha1(safe_bytes("_".join(map(str, args)))) | |||
r2845 | ||||
r5067 | def custom_key_generator(backend, namespace, fn): | |||
func_name = fn.__name__ | ||||
r2845 | ||||
def generate_key(*args): | ||||
r5067 | backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix' | |||
r3851 | namespace_pref = namespace or 'default_namespace' | |||
r2845 | arg_key = compute_key_from_params(*args) | |||
r5067 | final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}" | |||
r2845 | ||||
return final_key | ||||
return generate_key | ||||
r5067 | def backend_key_generator(backend): | |||
""" | ||||
Special wrapper that also sends over the backend to the key generator | ||||
""" | ||||
def wrapper(namespace, fn): | ||||
return custom_key_generator(backend, namespace, fn) | ||||
return wrapper | ||||
def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False): | ||||
from .backends import FileNamespaceBackend | ||||
from . import async_creation_runner | ||||
r2845 | region_obj = region_meta.dogpile_cache_regions.get(region_name) | |||
if not region_obj: | ||||
r5067 | reg_keys = list(region_meta.dogpile_cache_regions.keys()) | |||
r5096 | raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.') | |||
r5067 | ||||
region_uid_name = f'{region_name}:{region_namespace}' | ||||
r2845 | ||||
r5106 | # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region | |||
r2845 | if isinstance(region_obj.actual_backend, FileNamespaceBackend): | |||
r5067 | if not region_namespace: | |||
raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param') | ||||
r2845 | region_exist = region_meta.dogpile_cache_regions.get(region_namespace) | |||
if region_exist: | ||||
log.debug('Using already configured region: %s', region_namespace) | ||||
return region_exist | ||||
r5067 | ||||
r2845 | expiration_time = region_obj.expiration_time | |||
r5067 | cache_dir = region_meta.dogpile_config_defaults['cache_dir'] | |||
namespace_cache_dir = cache_dir | ||||
# we default the namespace_cache_dir to our default cache dir. | ||||
r5106 | # however, if this backend is configured with filename= param, we prioritize that | |||
r5067 | # so all caches within that particular region, even those namespaced end up in the same path | |||
if region_obj.actual_backend.filename: | ||||
namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename) | ||||
if not os.path.isdir(namespace_cache_dir): | ||||
os.makedirs(namespace_cache_dir) | ||||
r2845 | new_region = make_region( | |||
r3851 | name=region_uid_name, | |||
function_key_generator=backend_key_generator(region_obj.actual_backend) | ||||
r2845 | ) | |||
r5067 | ||||
r2845 | namespace_filename = os.path.join( | |||
r5067 | namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db") | |||
r2845 | # special type that allows 1db per namespace | |||
new_region.configure( | ||||
backend='dogpile.cache.rc.file_namespace', | ||||
expiration_time=expiration_time, | ||||
arguments={"filename": namespace_filename} | ||||
) | ||||
# create and save in region caches | ||||
r3851 | log.debug('configuring new region: %s', region_uid_name) | |||
r2845 | region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region | |||
r5067 | region_obj._default_namespace = region_namespace | |||
if use_async_runner: | ||||
region_obj.async_creation_runner = async_creation_runner | ||||
r2845 | return region_obj | |||
r2846 | ||||
r5266 | def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int: | |||
r5067 | from . import CLEAR_DELETE, CLEAR_INVALIDATE | |||
if not isinstance(cache_region, RhodeCodeCacheRegion): | ||||
cache_region = get_or_create_region(cache_region, cache_namespace_uid) | ||||
r5266 | log.debug('clearing cache region: %s [prefix:%s] with method=%s', | |||
cache_region, cache_namespace_uid, method) | ||||
r5067 | ||||
r5266 | num_affected_keys = 0 | |||
r5067 | ||||
if method == CLEAR_INVALIDATE: | ||||
# NOTE: The CacheRegion.invalidate() method’s default mode of | ||||
# operation is to set a timestamp local to this CacheRegion in this Python process only. | ||||
# It does not impact other Python processes or regions as the timestamp is only stored locally in memory. | ||||
cache_region.invalidate(hard=True) | ||||
if method == CLEAR_DELETE: | ||||
r5266 | num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid) | |||
r5067 | ||||
return num_affected_keys | ||||
r2932 | ||||
class ActiveRegionCache(object): | ||||
r5288 | def __init__(self, context, cache_data: dict): | |||
r2932 | self.context = context | |||
r3848 | self.cache_data = cache_data | |||
r2932 | ||||
r5288 | @property | |||
def state_uid(self) -> str: | ||||
return self.cache_data['cache_state_uid'] | ||||
r2932 | ||||
class InvalidationContext(object): | ||||
""" | ||||
usage:: | ||||
from rhodecode.lib import rc_cache | ||||
r2936 | ||||
r5288 | repo_namespace_key = 'some-cache-for-repo-id-100' | |||
inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | ||||
def cache_generator(_state_uid): | ||||
r2932 | ||||
r5288 | @region.conditional_cache_on_arguments(namespace='some-common-namespace-100') | |||
def _dummy_func(*args): | ||||
# compute heavy function | ||||
return _state_uid, 'result' | ||||
r2936 | ||||
r5288 | return _dummy_func | |||
r2932 | with inv_context_manager as invalidation_context: | |||
r5288 | cache_state_uid = invalidation_context.state_uid | |||
cache_func = cache_generator(cache_state_uid) | ||||
previous_state_uid, result = cache_func(*call_args) | ||||
r2932 | ||||
r5288 | should_invalidate = previous_state_uid != cache_state_uid | |||
if should_invalidate: | ||||
_, result = cache_func.refresh(*call_args) | ||||
r2932 | ||||
# To send global invalidation signal, simply run | ||||
r5288 | CacheKey.set_invalidate(repo_namespace_key) | |||
r2932 | ||||
""" | ||||
def __repr__(self): | ||||
r5288 | return f'<InvalidationContext:{self.cache_key}>' | |||
r2932 | ||||
r5288 | def __init__(self, key, raise_exception=False, thread_scoped=None): | |||
self.cache_key = key | ||||
r2932 | self.raise_exception = raise_exception | |||
r5288 | self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT' | |||
r2932 | self.thread_id = 'global' | |||
r2935 | if thread_scoped is None: | |||
# if we set "default" we can override this via .ini settings | ||||
r5067 | thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped') | |||
r2935 | ||||
r2932 | # Append the thread id to the cache key if this invalidation context | |||
# should be scoped to the current thread. | ||||
r2935 | if thread_scoped is True: | |||
r2932 | self.thread_id = threading.current_thread().ident | |||
r5288 | self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}' | |||
r2936 | self.compute_time = 0 | |||
r2932 | ||||
r5288 | def get_or_create_cache_obj(self): | |||
from rhodecode.model.db import CacheKey, Session, IntegrityError | ||||
r5000 | ||||
r5288 | cache_obj = CacheKey.get_active_cache(self.cache_key) | |||
r2938 | log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key) | |||
r4963 | ||||
r2932 | if not cache_obj: | |||
r5288 | # generate new UID for non-existing cache object | |||
cache_state_uid = CacheKey.generate_new_state_uid() | ||||
cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}', | ||||
cache_state_uid=cache_state_uid, cache_active=True) | ||||
try: | ||||
Session().add(cache_obj) | ||||
Session().commit() | ||||
except IntegrityError: | ||||
# if we catch integrity error, it means we inserted this object | ||||
# assumption is that's really an edge race-condition case and | ||||
# it's safe is to skip it | ||||
Session().rollback() | ||||
except Exception: | ||||
log.exception('Failed to commit on cache key update') | ||||
Session().rollback() | ||||
if self.raise_exception: | ||||
raise | ||||
r2932 | return cache_obj | |||
def __enter__(self): | ||||
r5288 | log.debug('Entering cache invalidation check context: %s', self) | |||
r2936 | self._start_time = time.time() | |||
r5288 | self.cache_obj = self.get_or_create_cache_obj() | |||
cache_data = self.cache_obj.get_dict() | ||||
r2932 | ||||
r5288 | return ActiveRegionCache(context=self, cache_data=cache_data) | |||
r2932 | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
r2936 | # save compute time | |||
self.compute_time = time.time() - self._start_time | ||||