##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5304:db210dc1 default
r5496:cab50adf default
Show More
utils.py
357 lines | 13.0 KiB | text/x-python | PythonLexer
# Copyright (C) 2015-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import functools
import logging
import os
import threading
import time
import decorator
from dogpile.cache import CacheRegion
import rhodecode
from rhodecode.lib.hash_utils import sha1
from rhodecode.lib.str_utils import safe_bytes
from rhodecode.lib.type_utils import str2bool # noqa :required by imports from .utils
from . import region_meta
log = logging.getLogger(__name__)
def isCython(func):
"""
Private helper that checks if a function is a cython function.
"""
return func.__class__.__name__ == 'cython_function_or_method'
class RhodeCodeCacheRegion(CacheRegion):
def __repr__(self):
return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
def conditional_cache_on_arguments(
self, namespace=None,
expiration_time=None,
should_cache_fn=None,
to_str=str,
function_key_generator=None,
condition=True):
"""
Custom conditional decorator, that will not touch any dogpile internals if
condition isn't meet. This works a bit different from should_cache_fn
And it's faster in cases we don't ever want to compute cached values
"""
expiration_time_is_callable = callable(expiration_time)
if not namespace:
namespace = getattr(self, '_default_namespace', None)
if function_key_generator is None:
function_key_generator = self.function_key_generator
def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
if not condition:
log.debug('Calling un-cached method:%s', user_func.__name__)
start = time.time()
result = user_func(*arg, **kw)
total = time.time() - start
log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
return result
key = func_key_generator(*arg, **kw)
timeout = expiration_time() if expiration_time_is_callable \
else expiration_time
log.debug('Calling cached method:`%s`', user_func.__name__)
return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
def cache_decorator(user_func):
if to_str is str:
# backwards compatible
key_generator = function_key_generator(namespace, user_func)
else:
key_generator = function_key_generator(namespace, user_func, to_str=to_str)
def refresh(*arg, **kw):
"""
Like invalidate, but regenerates the value instead
"""
key = key_generator(*arg, **kw)
value = user_func(*arg, **kw)
self.set(key, value)
return value
def invalidate(*arg, **kw):
key = key_generator(*arg, **kw)
self.delete(key)
def set_(value, *arg, **kw):
key = key_generator(*arg, **kw)
self.set(key, value)
def get(*arg, **kw):
key = key_generator(*arg, **kw)
return self.get(key)
user_func.set = set_
user_func.invalidate = invalidate
user_func.get = get
user_func.refresh = refresh
user_func.key_generator = key_generator
user_func.original = user_func
# Use `decorate` to preserve the signature of :param:`user_func`.
return decorator.decorate(user_func, functools.partial(
get_or_create_for_user_func, key_generator))
return cache_decorator
def make_region(*arg, **kw):
return RhodeCodeCacheRegion(*arg, **kw)
def get_default_cache_settings(settings, prefixes=None):
prefixes = prefixes or []
cache_settings = {}
for key in settings.keys():
for prefix in prefixes:
if key.startswith(prefix):
name = key.split(prefix)[1].strip()
val = settings[key]
if isinstance(val, str):
val = val.strip()
cache_settings[name] = val
return cache_settings
def compute_key_from_params(*args):
"""
Helper to compute key from given params to be used in cache manager
"""
return sha1(safe_bytes("_".join(map(str, args))))
def custom_key_generator(backend, namespace, fn):
func_name = fn.__name__
def generate_key(*args):
backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
namespace_pref = namespace or 'default_namespace'
arg_key = compute_key_from_params(*args)
final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
return final_key
return generate_key
def backend_key_generator(backend):
"""
Special wrapper that also sends over the backend to the key generator
"""
def wrapper(namespace, fn):
return custom_key_generator(backend, namespace, fn)
return wrapper
def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
from .backends import FileNamespaceBackend
from . import async_creation_runner
region_obj = region_meta.dogpile_cache_regions.get(region_name)
if not region_obj:
reg_keys = list(region_meta.dogpile_cache_regions.keys())
raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
region_uid_name = f'{region_name}:{region_namespace}'
# Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
if isinstance(region_obj.actual_backend, FileNamespaceBackend):
if not region_namespace:
raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
if region_exist:
log.debug('Using already configured region: %s', region_namespace)
return region_exist
expiration_time = region_obj.expiration_time
cache_dir = region_meta.dogpile_config_defaults['cache_dir']
namespace_cache_dir = cache_dir
# we default the namespace_cache_dir to our default cache dir.
# however, if this backend is configured with filename= param, we prioritize that
# so all caches within that particular region, even those namespaced end up in the same path
if region_obj.actual_backend.filename:
namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
if not os.path.isdir(namespace_cache_dir):
os.makedirs(namespace_cache_dir)
new_region = make_region(
name=region_uid_name,
function_key_generator=backend_key_generator(region_obj.actual_backend)
)
namespace_filename = os.path.join(
namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
# special type that allows 1db per namespace
new_region.configure(
backend='dogpile.cache.rc.file_namespace',
expiration_time=expiration_time,
arguments={"filename": namespace_filename}
)
# create and save in region caches
log.debug('configuring new region: %s', region_uid_name)
region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
region_obj._default_namespace = region_namespace
if use_async_runner:
region_obj.async_creation_runner = async_creation_runner
return region_obj
def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
from . import CLEAR_DELETE, CLEAR_INVALIDATE
if not isinstance(cache_region, RhodeCodeCacheRegion):
cache_region = get_or_create_region(cache_region, cache_namespace_uid)
log.debug('clearing cache region: %s [prefix:%s] with method=%s',
cache_region, cache_namespace_uid, method)
num_affected_keys = 0
if method == CLEAR_INVALIDATE:
# NOTE: The CacheRegion.invalidate() method’s default mode of
# operation is to set a timestamp local to this CacheRegion in this Python process only.
# It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
cache_region.invalidate(hard=True)
if method == CLEAR_DELETE:
num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
return num_affected_keys
class ActiveRegionCache(object):
def __init__(self, context, cache_data: dict):
self.context = context
self.cache_data = cache_data
@property
def state_uid(self) -> str:
return self.cache_data['cache_state_uid']
class InvalidationContext(object):
"""
usage::
from rhodecode.lib import rc_cache
repo_namespace_key = 'some-cache-for-repo-id-100'
inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
def cache_generator(_state_uid):
@region.conditional_cache_on_arguments(namespace='some-common-namespace-100')
def _dummy_func(*args):
# compute heavy function
return _state_uid, 'result'
return _dummy_func
with inv_context_manager as invalidation_context:
cache_state_uid = invalidation_context.state_uid
cache_func = cache_generator(cache_state_uid)
previous_state_uid, result = cache_func(*call_args)
should_invalidate = previous_state_uid != cache_state_uid
if should_invalidate:
_, result = cache_func.refresh(*call_args)
# To send global invalidation signal, simply run
CacheKey.set_invalidate(repo_namespace_key)
"""
def __repr__(self):
return f'<InvalidationContext:{self.cache_key}>'
def __init__(self, key, raise_exception=False, thread_scoped=None):
self.cache_key = key
self.raise_exception = raise_exception
self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT'
self.thread_id = 'global'
if thread_scoped is None:
# if we set "default" we can override this via .ini settings
thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
# Append the thread id to the cache key if this invalidation context
# should be scoped to the current thread.
if thread_scoped is True:
self.thread_id = threading.current_thread().ident
self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}'
self.compute_time = 0
def get_or_create_cache_obj(self):
from rhodecode.model.db import CacheKey, Session, IntegrityError
cache_obj = CacheKey.get_active_cache(self.cache_key)
log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
if not cache_obj:
# generate new UID for non-existing cache object
cache_state_uid = CacheKey.generate_new_state_uid()
cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}',
cache_state_uid=cache_state_uid, cache_active=True)
try:
Session().add(cache_obj)
Session().commit()
except IntegrityError:
# if we catch integrity error, it means we inserted this object
# assumption is that's really an edge race-condition case and
# it's safe is to skip it
Session().rollback()
except Exception:
log.exception('Failed to commit on cache key update')
Session().rollback()
if self.raise_exception:
raise
return cache_obj
def __enter__(self):
log.debug('Entering cache invalidation check context: %s', self)
self._start_time = time.time()
self.cache_obj = self.get_or_create_cache_obj()
cache_data = self.cache_obj.get_dict()
return ActiveRegionCache(context=self, cache_data=cache_data)
def __exit__(self, exc_type, exc_val, exc_tb):
# save compute time
self.compute_time = time.time() - self._start_time