##// END OF EJS Templates
deps: bumped celery==5.4.0
deps: bumped celery==5.4.0

File last commit:

r5579:54bb9264 default
r5591:91d53725 default
Show More
utils.py
355 lines | 13.0 KiB | text/x-python | PythonLexer
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 # Copyright (C) 2015-2024 RhodeCode GmbH
caches: rewrite of auth/permission caches to dogpile.
r2845 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
caches: new updated rc_cache with archive cache module and python3 changes
r5067
caches: introduce new conditional cache function.
r2891 import functools
caches: new updated rc_cache with archive cache module and python3 changes
r5067 import logging
import os
caches: new cache context managers....
r2932 import threading
caches: new updated rc_cache with archive cache module and python3 changes
r5067 import time
caches: introduce new conditional cache function.
r2891
caches: new updated rc_cache with archive cache module and python3 changes
r5067 import decorator
caches: introduce new conditional cache function.
r2891 from dogpile.cache import CacheRegion
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: new cache context managers....
r2932 import rhodecode
chore(caches): cleanup cache lib and added cleanup backend
r5497 from ...lib.hash_utils import sha1
from ...lib.str_utils import safe_bytes
from ...lib.type_utils import str2bool # noqa :required by imports from .utils
caches: new cache context managers....
r2932
fix(caches): removed cacheKey cleanup logic as its proven to fail, and is not reliable.
r5287 from . import region_meta
caches: rewrite of auth/permission caches to dogpile.
r2845
log = logging.getLogger(__name__)
cython: fixed problems with cythonized decorators on py2....
r4175 def isCython(func):
"""
Private helper that checks if a function is a cython function.
"""
return func.__class__.__name__ == 'cython_function_or_method'
caches: introduce new conditional cache function.
r2891 class RhodeCodeCacheRegion(CacheRegion):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 def __repr__(self):
chore(rc-cache): synced with CE code
r5304 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
caches: new updated rc_cache with archive cache module and python3 changes
r5067
caches: introduce new conditional cache function.
r2891 def conditional_cache_on_arguments(
self, namespace=None,
expiration_time=None,
should_cache_fn=None,
python3: fixed some compat problems
r4916 to_str=str,
caches: introduce new conditional cache function.
r2891 function_key_generator=None,
condition=True):
"""
Custom conditional decorator, that will not touch any dogpile internals if
caches: new updated rc_cache with archive cache module and python3 changes
r5067 condition isn't meet. This works a bit different from should_cache_fn
caches: introduce new conditional cache function.
r2891 And it's faster in cases we don't ever want to compute cached values
"""
python3: fixed some compat problems
r4916 expiration_time_is_callable = callable(expiration_time)
caches: new updated rc_cache with archive cache module and python3 changes
r5067 if not namespace:
namespace = getattr(self, '_default_namespace', None)
caches: introduce new conditional cache function.
r2891
if function_key_generator is None:
function_key_generator = self.function_key_generator
caches: new updated rc_cache with archive cache module and python3 changes
r5067 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
cache: updated cache decorators based on latest code from dogpile
r3863
if not condition:
caches: improve logging
r5579 log.debug('Calling un-cached method:`%s`', user_func.__name__)
caches: added debug and timings
r4733 start = time.time()
result = user_func(*arg, **kw)
total = time.time() - start
caches: improve logging
r5579 log.debug('Call for un-cached method:`%s` took %.4fs', user_func.__name__, total)
caches: added debug and timings
r4733 return result
cache: updated cache decorators based on latest code from dogpile
r3863
caches: new updated rc_cache with archive cache module and python3 changes
r5067 key = func_key_generator(*arg, **kw)
caches: improve logging
r5579 timeout = expiration_time() if expiration_time_is_callable else expiration_time
log.debug('Calling cached (timeout=%s) method:`%s`', timeout, user_func.__name__)
cache: updated cache decorators based on latest code from dogpile
r3863
return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
def cache_decorator(user_func):
python3: fixed some compat problems
r4916 if to_str is str:
caches: introduce new conditional cache function.
r2891 # backwards compatible
cache: updated cache decorators based on latest code from dogpile
r3863 key_generator = function_key_generator(namespace, user_func)
caches: introduce new conditional cache function.
r2891 else:
cache: updated cache decorators based on latest code from dogpile
r3863 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
caches: introduce new conditional cache function.
r2891
cache: updated cache decorators based on latest code from dogpile
r3863 def refresh(*arg, **kw):
"""
Like invalidate, but regenerates the value instead
"""
key = key_generator(*arg, **kw)
value = user_func(*arg, **kw)
self.set(key, value)
return value
caches: introduce new conditional cache function.
r2891
def invalidate(*arg, **kw):
key = key_generator(*arg, **kw)
self.delete(key)
def set_(value, *arg, **kw):
key = key_generator(*arg, **kw)
self.set(key, value)
def get(*arg, **kw):
key = key_generator(*arg, **kw)
return self.get(key)
cache: updated cache decorators based on latest code from dogpile
r3863 user_func.set = set_
user_func.invalidate = invalidate
user_func.get = get
user_func.refresh = refresh
user_func.key_generator = key_generator
user_func.original = user_func
caches: introduce new conditional cache function.
r2891
cache: updated cache decorators based on latest code from dogpile
r3863 # Use `decorate` to preserve the signature of :param:`user_func`.
cython: fixed problems with cythonized decorators on py2....
r4175 return decorator.decorate(user_func, functools.partial(
cache: updated cache decorators based on latest code from dogpile
r3863 get_or_create_for_user_func, key_generator))
caches: introduce new conditional cache function.
r2891
cache: updated cache decorators based on latest code from dogpile
r3863 return cache_decorator
caches: introduce new conditional cache function.
r2891
def make_region(*arg, **kw):
return RhodeCodeCacheRegion(*arg, **kw)
caches: rewrite of auth/permission caches to dogpile.
r2845 def get_default_cache_settings(settings, prefixes=None):
prefixes = prefixes or []
cache_settings = {}
for key in settings.keys():
for prefix in prefixes:
if key.startswith(prefix):
name = key.split(prefix)[1].strip()
val = settings[key]
py3: remove use of pyramid.compat
r4908 if isinstance(val, str):
caches: rewrite of auth/permission caches to dogpile.
r2845 val = val.strip()
cache_settings[name] = val
return cache_settings
def compute_key_from_params(*args):
"""
Helper to compute key from given params to be used in cache manager
"""
python3: fixed some compat problems
r4916 return sha1(safe_bytes("_".join(map(str, args))))
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: new updated rc_cache with archive cache module and python3 changes
r5067 def custom_key_generator(backend, namespace, fn):
func_name = fn.__name__
caches: rewrite of auth/permission caches to dogpile.
r2845
def generate_key(*args):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
caches: synced cache logic with vcsserver.
r3851 namespace_pref = namespace or 'default_namespace'
caches: rewrite of auth/permission caches to dogpile.
r2845 arg_key = compute_key_from_params(*args)
caches: new updated rc_cache with archive cache module and python3 changes
r5067 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
caches: rewrite of auth/permission caches to dogpile.
r2845
return final_key
return generate_key
caches: new updated rc_cache with archive cache module and python3 changes
r5067 def backend_key_generator(backend):
"""
Special wrapper that also sends over the backend to the key generator
"""
def wrapper(namespace, fn):
return custom_key_generator(backend, namespace, fn)
return wrapper
caches: allow force re-init of cache region for re-creating the cache
r5573 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False, force=False):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 from .backends import FileNamespaceBackend
from . import async_creation_runner
caches: rewrite of auth/permission caches to dogpile.
r2845 region_obj = region_meta.dogpile_cache_regions.get(region_name)
if not region_obj:
caches: new updated rc_cache with archive cache module and python3 changes
r5067 reg_keys = list(region_meta.dogpile_cache_regions.keys())
modernize: python3 updates
r5096 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
caches: new updated rc_cache with archive cache module and python3 changes
r5067
region_uid_name = f'{region_name}:{region_namespace}'
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: make sure the global cache namespace prefixes are used....
r5106 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
caches: rewrite of auth/permission caches to dogpile.
r2845 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
caches: new updated rc_cache with archive cache module and python3 changes
r5067 if not region_namespace:
raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
caches: rewrite of auth/permission caches to dogpile.
r2845 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
caches: allow force re-init of cache region for re-creating the cache
r5573 if region_exist and not force:
caches: rewrite of auth/permission caches to dogpile.
r2845 log.debug('Using already configured region: %s', region_namespace)
return region_exist
caches: new updated rc_cache with archive cache module and python3 changes
r5067
caches: rewrite of auth/permission caches to dogpile.
r2845 expiration_time = region_obj.expiration_time
caches: new updated rc_cache with archive cache module and python3 changes
r5067 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
namespace_cache_dir = cache_dir
# we default the namespace_cache_dir to our default cache dir.
caches: make sure the global cache namespace prefixes are used....
r5106 # however, if this backend is configured with filename= param, we prioritize that
caches: new updated rc_cache with archive cache module and python3 changes
r5067 # so all caches within that particular region, even those namespaced end up in the same path
if region_obj.actual_backend.filename:
namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
if not os.path.isdir(namespace_cache_dir):
os.makedirs(namespace_cache_dir)
caches: rewrite of auth/permission caches to dogpile.
r2845 new_region = make_region(
caches: synced cache logic with vcsserver.
r3851 name=region_uid_name,
function_key_generator=backend_key_generator(region_obj.actual_backend)
caches: rewrite of auth/permission caches to dogpile.
r2845 )
caches: new updated rc_cache with archive cache module and python3 changes
r5067
caches: rewrite of auth/permission caches to dogpile.
r2845 namespace_filename = os.path.join(
caches: new updated rc_cache with archive cache module and python3 changes
r5067 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
caches: rewrite of auth/permission caches to dogpile.
r2845 # special type that allows 1db per namespace
new_region.configure(
backend='dogpile.cache.rc.file_namespace',
expiration_time=expiration_time,
arguments={"filename": namespace_filename}
)
# create and save in region caches
caches: synced cache logic with vcsserver.
r3851 log.debug('configuring new region: %s', region_uid_name)
caches: rewrite of auth/permission caches to dogpile.
r2845 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
caches: new updated rc_cache with archive cache module and python3 changes
r5067 region_obj._default_namespace = region_namespace
if use_async_runner:
region_obj.async_creation_runner = async_creation_runner
caches: rewrite of auth/permission caches to dogpile.
r2845 return region_obj
caches: don't use beaker for file caches anymore
r2846
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
caches: new updated rc_cache with archive cache module and python3 changes
r5067 from . import CLEAR_DELETE, CLEAR_INVALIDATE
if not isinstance(cache_region, RhodeCodeCacheRegion):
cache_region = get_or_create_region(cache_region, cache_namespace_uid)
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
cache_region, cache_namespace_uid, method)
caches: new updated rc_cache with archive cache module and python3 changes
r5067
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 num_affected_keys = 0
caches: new updated rc_cache with archive cache module and python3 changes
r5067
if method == CLEAR_INVALIDATE:
# NOTE: The CacheRegion.invalidate() method’s default mode of
# operation is to set a timestamp local to this CacheRegion in this Python process only.
# It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
cache_region.invalidate(hard=True)
if method == CLEAR_DELETE:
fix(permission-flush): use delete method for permission cache invalidation as it's multi-process safe....
r5266 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
caches: new updated rc_cache with archive cache module and python3 changes
r5067 return num_affected_keys
caches: new cache context managers....
r2932
class ActiveRegionCache(object):
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 def __init__(self, context, cache_data: dict):
caches: new cache context managers....
r2932 self.context = context
caches: updated cache backend to new vcsserver caches implementation.
r3848 self.cache_data = cache_data
caches: new cache context managers....
r2932
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 @property
def state_uid(self) -> str:
return self.cache_data['cache_state_uid']
caches: new cache context managers....
r2932
class InvalidationContext(object):
"""
usage::
from rhodecode.lib import rc_cache
caches: store computation time inside context manager as helper. Since the with block is full...
r2936
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 repo_namespace_key = 'some-cache-for-repo-id-100'
inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
def cache_generator(_state_uid):
caches: new cache context managers....
r2932
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 @region.conditional_cache_on_arguments(namespace='some-common-namespace-100')
def _dummy_func(*args):
# compute heavy function
return _state_uid, 'result'
caches: store computation time inside context manager as helper. Since the with block is full...
r2936
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 return _dummy_func
caches: new cache context managers....
r2932 with inv_context_manager as invalidation_context:
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 cache_state_uid = invalidation_context.state_uid
cache_func = cache_generator(cache_state_uid)
previous_state_uid, result = cache_func(*call_args)
caches: new cache context managers....
r2932
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 should_invalidate = previous_state_uid != cache_state_uid
if should_invalidate:
_, result = cache_func.refresh(*call_args)
caches: new cache context managers....
r2932
# To send global invalidation signal, simply run
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 CacheKey.set_invalidate(repo_namespace_key)
caches: new cache context managers....
r2932
"""
def __repr__(self):
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 return f'<InvalidationContext:{self.cache_key}>'
caches: new cache context managers....
r2932
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 def __init__(self, key, raise_exception=False, thread_scoped=None):
self.cache_key = key
caches: new cache context managers....
r2932 self.raise_exception = raise_exception
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT'
caches: new cache context managers....
r2932 self.thread_id = 'global'
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
r2935 if thread_scoped is None:
# if we set "default" we can override this via .ini settings
caches: new updated rc_cache with archive cache module and python3 changes
r5067 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
r2935
caches: new cache context managers....
r2932 # Append the thread id to the cache key if this invalidation context
# should be scoped to the current thread.
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
r2935 if thread_scoped is True:
caches: new cache context managers....
r2932 self.thread_id = threading.current_thread().ident
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}'
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 self.compute_time = 0
caches: new cache context managers....
r2932
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 def get_or_create_cache_obj(self):
from rhodecode.model.db import CacheKey, Session, IntegrityError
caches: use new sqlalchemy 1.4 caching query approach
r5000
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 cache_obj = CacheKey.get_active_cache(self.cache_key)
caches: improve logging.
r2938 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
python3: fix itervalues
r4963
caches: new cache context managers....
r2932 if not cache_obj:
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 # generate new UID for non-existing cache object
cache_state_uid = CacheKey.generate_new_state_uid()
cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}',
cache_state_uid=cache_state_uid, cache_active=True)
try:
Session().add(cache_obj)
Session().commit()
except IntegrityError:
# if we catch integrity error, it means we inserted this object
# assumption is that's really an edge race-condition case and
# it's safe is to skip it
Session().rollback()
except Exception:
log.exception('Failed to commit on cache key update')
Session().rollback()
if self.raise_exception:
raise
caches: new cache context managers....
r2932 return cache_obj
def __enter__(self):
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 log.debug('Entering cache invalidation check context: %s', self)
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 self._start_time = time.time()
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 self.cache_obj = self.get_or_create_cache_obj()
cache_data = self.cache_obj.get_dict()
caches: new cache context managers....
r2932
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
r5288 return ActiveRegionCache(context=self, cache_data=cache_data)
caches: new cache context managers....
r2932
def __exit__(self, exc_type, exc_val, exc_tb):
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 # save compute time
self.compute_time = time.time() - self._start_time