diff --git a/rhodecode/lib/rc_cache/__init__.py b/rhodecode/lib/rc_cache/__init__.py --- a/rhodecode/lib/rc_cache/__init__.py +++ b/rhodecode/lib/rc_cache/__init__.py @@ -24,7 +24,6 @@ from dogpile.cache import register_backe from . import region_meta from .utils import ( ActiveRegionCache, - FreshRegionCache, InvalidationContext, backend_key_generator, clear_cache_namespace, diff --git a/rhodecode/lib/rc_cache/utils.py b/rhodecode/lib/rc_cache/utils.py --- a/rhodecode/lib/rc_cache/utils.py +++ b/rhodecode/lib/rc_cache/utils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2023 RhodeCode GmbH +# Copyright (C) 2015-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 @@ -255,21 +255,13 @@ def clear_cache_namespace(cache_region: class ActiveRegionCache(object): - def __init__(self, context, cache_data): + def __init__(self, context, cache_data: dict): self.context = context self.cache_data = cache_data - def should_invalidate(self): - return False - - -class FreshRegionCache(object): - def __init__(self, context, cache_data): - self.context = context - self.cache_data = cache_data - - def should_invalidate(self): - return True + @property + def state_uid(self) -> str: + return self.cache_data['cache_state_uid'] class InvalidationContext(object): @@ -278,44 +270,40 @@ class InvalidationContext(object): from rhodecode.lib import rc_cache - cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1) - region = rc_cache.get_or_create_region('some_region', cache_namespace_uid) + repo_namespace_key = 'some-cache-for-repo-id-100' + inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) + + def cache_generator(_state_uid): - @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True) - def heavy_compute(cache_name, param1, param2): - print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2)) - - # invalidation namespace is shared namespace key for all process caches - # we use it to send a global signal - invalidation_namespace = 'repo_cache:1' + @region.conditional_cache_on_arguments(namespace='some-common-namespace-100') + def _dummy_func(*args): + # compute heavy function + return _state_uid, 'result' - inv_context_manager = rc_cache.InvalidationContext( - uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) + return _dummy_func + with inv_context_manager as invalidation_context: - args = ('one', 'two') - # re-compute and store cache if we get invalidate signal - if invalidation_context.should_invalidate(): - result = heavy_compute.refresh(*args) - else: - result = heavy_compute(*args) + cache_state_uid = invalidation_context.state_uid + cache_func = cache_generator(cache_state_uid) + previous_state_uid, result = cache_func(*call_args) - compute_time = inv_context_manager.compute_time - log.debug('result computed in %.4fs', compute_time) + should_invalidate = previous_state_uid != cache_state_uid + if should_invalidate: + _, result = cache_func.refresh(*call_args) # To send global invalidation signal, simply run - CacheKey.set_invalidate(invalidation_namespace) + CacheKey.set_invalidate(repo_namespace_key) """ def __repr__(self): - return f'' + return f'' - def __init__(self, uid, invalidation_namespace='', - raise_exception=False, thread_scoped=None): - self.uid = uid - self.invalidation_namespace = invalidation_namespace + def __init__(self, key, raise_exception=False, thread_scoped=None): + self.cache_key = key + self.raise_exception = raise_exception - self.proc_id = rhodecode.CONFIG.get('instance_id') or 'DEFAULT' + self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT' self.thread_id = 'global' if thread_scoped is None: @@ -327,77 +315,44 @@ class InvalidationContext(object): if thread_scoped is True: self.thread_id = threading.current_thread().ident - self.cache_key = compute_key_from_params(uid) - self.cache_key = 'proc:{}|thread:{}|params:{}'.format( - self.proc_id, self.thread_id, self.cache_key) - self.proc_key = f'proc:{self.proc_id}' + self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}' self.compute_time = 0 - def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''): - from rhodecode.model.db import CacheKey + def get_or_create_cache_obj(self): + from rhodecode.model.db import CacheKey, Session, IntegrityError - invalidation_namespace = invalidation_namespace or self.invalidation_namespace - # fetch all cache keys for this namespace and convert them to a map to find if we - # have specific cache_key object registered. We do this because we want to have - # all consistent cache_state_uid for newly registered objects - cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace) - cache_obj = cache_obj_map.get(self.cache_key) + cache_obj = CacheKey.get_active_cache(self.cache_key) log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key) if not cache_obj: - new_cache_args = invalidation_namespace - first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None - cache_state_uid = None - if first_cache_obj: - cache_state_uid = first_cache_obj.cache_state_uid - cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args, - cache_state_uid=cache_state_uid) - + # generate new UID for non-existing cache object + cache_state_uid = CacheKey.generate_new_state_uid() + cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}', + cache_state_uid=cache_state_uid, cache_active=True) + try: + Session().add(cache_obj) + Session().commit() + except IntegrityError: + # if we catch integrity error, it means we inserted this object + # assumption is that's really an edge race-condition case and + # it's safe is to skip it + Session().rollback() + except Exception: + log.exception('Failed to commit on cache key update') + Session().rollback() + if self.raise_exception: + raise return cache_obj def __enter__(self): - """ - Test if current object is valid, and return CacheRegion function - that does invalidation and calculation - """ - log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace) - # register or get a new key based on uid - self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid) - cache_data = self.cache_obj.get_dict() + log.debug('Entering cache invalidation check context: %s', self) self._start_time = time.time() - if self.cache_obj.cache_active: - # means our cache obj is existing and marked as it's - # cache is not outdated, we return ActiveRegionCache - self.skip_cache_active_change = True - return ActiveRegionCache(context=self, cache_data=cache_data) + self.cache_obj = self.get_or_create_cache_obj() + cache_data = self.cache_obj.get_dict() - # the key is either not existing or set to False, we return - # the real invalidator which re-computes value. We additionally set - # the flag to actually update the Database objects - self.skip_cache_active_change = False - return FreshRegionCache(context=self, cache_data=cache_data) + return ActiveRegionCache(context=self, cache_data=cache_data) def __exit__(self, exc_type, exc_val, exc_tb): - from rhodecode.model.db import IntegrityError, Session - # save compute time self.compute_time = time.time() - self._start_time - - if self.skip_cache_active_change: - return - - try: - self.cache_obj.cache_active = True - Session().add(self.cache_obj) - Session().commit() - except IntegrityError: - # if we catch integrity error, it means we inserted this object - # assumption is that's really an edge race-condition case and - # it's safe is to skip it - Session().rollback() - except Exception: - log.exception('Failed to commit on cache key update') - Session().rollback() - if self.raise_exception: - raise diff --git a/rhodecode/model/db.py b/rhodecode/model/db.py --- a/rhodecode/model/db.py +++ b/rhodecode/model/db.py @@ -2034,10 +2034,9 @@ class Repository(Base, BaseModel): """ Returns associated cache keys for that repo """ - invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( - repo_id=self.repo_id) + repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id) return CacheKey.query()\ - .filter(CacheKey.cache_args == invalidation_namespace)\ + .filter(CacheKey.cache_key == repo_namespace_key)\ .order_by(CacheKey.cache_key)\ .all() @@ -2609,29 +2608,38 @@ class Repository(Base, BaseModel): from rhodecode.lib import rc_cache cache_namespace_uid = f'repo_instance.{self.repo_id}' - invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( - repo_id=self.repo_id) region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) - @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) - def get_instance_cached(repo_id, context_id, _cache_state_uid): - return self._get_instance(repo_state_uid=_cache_state_uid) - # we must use thread scoped cache here, # because each thread of gevent needs it's own not shared connection and cache # we also alter `args` so the cache key is individual for every green thread. - inv_context_manager = rc_cache.InvalidationContext( - uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace, - thread_scoped=True) + repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id) + inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key, thread_scoped=True) + + # our wrapped caching function that takes state_uid to save the previous state in + def cache_generator(_state_uid): + + @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) + def get_instance_cached(_repo_id, _process_context_id): + # we save in cached func the generation state so we can detect a change and invalidate caches + return _state_uid, self._get_instance(repo_state_uid=_state_uid) + + return get_instance_cached + with inv_context_manager as invalidation_context: - cache_state_uid = invalidation_context.cache_data['cache_state_uid'] - args = (self.repo_id, inv_context_manager.cache_key, cache_state_uid) - - # re-compute and store cache if we get invalidate signal - if invalidation_context.should_invalidate(): - instance = get_instance_cached.refresh(*args) - else: - instance = get_instance_cached(*args) + cache_state_uid = invalidation_context.state_uid + cache_func = cache_generator(cache_state_uid) + + args = self.repo_id, inv_context_manager.proc_key + + previous_state_uid, instance = cache_func(*args) + + if instance: + # now compare keys, the "cache" state vs expected state. + if previous_state_uid != cache_state_uid: + log.warning('Cached state uid %s is different than current state uid %s', + previous_state_uid, cache_state_uid) + _, instance = cache_func.refresh(*args) log.debug('Repo instance fetched in %.4fs', inv_context_manager.compute_time) return instance @@ -2652,6 +2660,7 @@ class Repository(Base, BaseModel): _vcs_alias=self.repo_type) if repo is not None: repo.count() # cache rebuild + return repo def get_shadow_repository_path(self, workspace_id): @@ -3675,10 +3684,10 @@ class CacheKey(Base, BaseModel): cache_state_uid = Column("cache_state_uid", String(255), nullable=True, unique=None, default=None) cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False) - def __init__(self, cache_key, cache_args='', cache_state_uid=None): + def __init__(self, cache_key, cache_args='', cache_state_uid=None, cache_active=False): self.cache_key = cache_key self.cache_args = cache_args - self.cache_active = False + self.cache_active = cache_active # first key should be same for all entries, since all workers should share it self.cache_state_uid = cache_state_uid or self.generate_new_state_uid() @@ -3730,18 +3739,18 @@ class CacheKey(Base, BaseModel): """ Mark all caches of a repo as invalid in the database. """ - try: - qry = Session().query(cls).filter(cls.cache_args == cache_uid) + qry = Session().query(cls).filter(cls.cache_key == cache_uid) if delete: qry.delete() log.debug('cache objects deleted for cache args %s', safe_str(cache_uid)) else: - qry.update({"cache_active": False, - "cache_state_uid": cls.generate_new_state_uid()}) - log.debug('cache objects marked as invalid for cache args %s', - safe_str(cache_uid)) + new_uid = cls.generate_new_state_uid() + qry.update({"cache_state_uid": new_uid, + "cache_args": f"repo_state:{time.time()}"}) + log.debug('cache object %s set new UID %s', + safe_str(cache_uid), new_uid) Session().commit() except Exception: diff --git a/rhodecode/model/scm.py b/rhodecode/model/scm.py --- a/rhodecode/model/scm.py +++ b/rhodecode/model/scm.py @@ -299,9 +299,8 @@ class ScmModel(BaseModel): repo = Repository.get_by_repo_name(repo_name) if repo: - invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( - repo_id=repo.repo_id) - CacheKey.set_invalidate(invalidation_namespace, delete=delete) + repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo.repo_id) + CacheKey.set_invalidate(repo_namespace_key, delete=delete) repo_id = repo.repo_id config = repo._config diff --git a/rhodecode/tests/lib/test_libs.py b/rhodecode/tests/lib/test_libs.py --- a/rhodecode/tests/lib/test_libs.py +++ b/rhodecode/tests/lib/test_libs.py @@ -1,5 +1,4 @@ - -# Copyright (C) 2010-2023 RhodeCode GmbH +# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 @@ -27,6 +26,7 @@ import string import mock import pytest import functools +import time from rhodecode.tests import no_newline_id_generator from rhodecode.tests.utils import run_test_concurrently @@ -619,126 +619,130 @@ def test_get_repo_by_id(test, expected): def test_invalidation_context(baseapp): repo_id = 9999 + calls = [1, 2] + call_args = ('some-key',) + region = rc_cache.get_or_create_region('cache_repo_longterm') - cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( - repo_id, CacheKey.CACHE_TYPE_FEED) - invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( - repo_id=repo_id) - region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) + repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) + inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) - calls = [1, 2] + def cache_generator(_state_uid): - @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) - def _dummy_func(cache_key): - val = calls.pop(0) - return 'result:{}'.format(val) + @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') + def _dummy_func(*args): + val = calls.pop(0) + return _state_uid, f'result:{val}' - inv_context_manager = rc_cache.InvalidationContext( - uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) + return _dummy_func # 1st call, fresh caches with inv_context_manager as invalidation_context: - should_invalidate = invalidation_context.should_invalidate() + cache_state_uid = invalidation_context.state_uid + cache_func = cache_generator(cache_state_uid) + previous_state_uid, result = cache_func(*call_args) + + should_invalidate = previous_state_uid != cache_state_uid if should_invalidate: - result = _dummy_func.refresh('some-key') - else: - result = _dummy_func('some-key') + _, result = cache_func.refresh(*call_args) - assert isinstance(invalidation_context, rc_cache.FreshRegionCache) - assert should_invalidate is True + assert should_invalidate is False # 1st call, we don't need to invalidate assert 'result:1' == result - # should be cached so calling it twice will give the same result ! - result = _dummy_func('some-key') + # should be already cached so calling it twice will give the same result! + _, result = cache_func(*call_args) assert 'result:1' == result # 2nd call, we create a new context manager, this should be now key aware, and - # return an active cache region + # return an active cache region from DB based on the same uid with inv_context_manager as invalidation_context: - should_invalidate = invalidation_context.should_invalidate() - assert isinstance(invalidation_context, rc_cache.ActiveRegionCache) - assert should_invalidate is False + cache_state_uid = invalidation_context.state_uid + cache_func = cache_generator(cache_state_uid) + previous_state_uid, result = cache_func(*call_args) + + should_invalidate = previous_state_uid != cache_state_uid + if should_invalidate: + _, result = cache_func.refresh(*call_args) + + assert should_invalidate is False # 1st call, we don't need to invalidate # Mark invalidation - CacheKey.set_invalidate(invalidation_namespace) + CacheKey.set_invalidate(repo_namespace_key) # 3nd call, fresh caches with inv_context_manager as invalidation_context: - should_invalidate = invalidation_context.should_invalidate() + cache_state_uid = invalidation_context.state_uid + cache_func = cache_generator(cache_state_uid) + previous_state_uid, result = cache_func(*call_args) + + should_invalidate = previous_state_uid != cache_state_uid if should_invalidate: - result = _dummy_func.refresh('some-key') - else: - result = _dummy_func('some-key') + _, result = cache_func.refresh(*call_args) - assert isinstance(invalidation_context, rc_cache.FreshRegionCache) assert should_invalidate is True assert 'result:2' == result # cached again, same result - result = _dummy_func('some-key') + _, result = cache_func(*call_args) assert 'result:2' == result def test_invalidation_context_exception_in_compute(baseapp): repo_id = 888 + region = rc_cache.get_or_create_region('cache_repo_longterm') - cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( - repo_id, CacheKey.CACHE_TYPE_FEED) - invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( - repo_id=repo_id) - region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) + repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) + inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) - @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) - def _dummy_func(cache_key): - raise Exception('Error in cache func') + def cache_generator(_state_uid): + @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') + def _dummy_func(*args): + raise Exception('Error in cache func') + + return _dummy_func with pytest.raises(Exception): - inv_context_manager = rc_cache.InvalidationContext( - uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) # 1st call, fresh caches with inv_context_manager as invalidation_context: - should_invalidate = invalidation_context.should_invalidate() - if should_invalidate: - _dummy_func.refresh('some-key-2') - else: - _dummy_func('some-key-2') + cache_state_uid = invalidation_context.state_uid + cache_func = cache_generator(cache_state_uid) + cache_func(1, 2, 3) @pytest.mark.parametrize('execution_number', range(5)) def test_cache_invalidation_race_condition(execution_number, baseapp): - import time repo_id = 777 - cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( - repo_id, CacheKey.CACHE_TYPE_FEED) - invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( - repo_id=repo_id) - region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) + region = rc_cache.get_or_create_region('cache_repo_longterm') + repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) @run_test_concurrently(25) def test_create_and_delete_cache_keys(): time.sleep(0.2) - @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) - def _dummy_func(cache_key): - val = 'async' - return 'result:{}'.format(val) + def cache_generator(_state_uid): - inv_context_manager = rc_cache.InvalidationContext( - uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) + @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') + def _dummy_func(*args): + return _state_uid, 'result:async' + + return _dummy_func + + inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) # 1st call, fresh caches with inv_context_manager as invalidation_context: - should_invalidate = invalidation_context.should_invalidate() + cache_state_uid = invalidation_context.state_uid + cache_func = cache_generator(cache_state_uid) + previous_state_uid, result = cache_func('doo') + + should_invalidate = previous_state_uid != cache_state_uid if should_invalidate: - _dummy_func.refresh('some-key-3') - else: - _dummy_func('some-key-3') + _, result = cache_func.refresh('doo') # Mark invalidation - CacheKey.set_invalidate(invalidation_namespace) + CacheKey.set_invalidate(repo_namespace_key) test_create_and_delete_cache_keys() diff --git a/rhodecode/tests/vcs_operations/test_vcs_operations.py b/rhodecode/tests/vcs_operations/test_vcs_operations.py --- a/rhodecode/tests/vcs_operations/test_vcs_operations.py +++ b/rhodecode/tests/vcs_operations/test_vcs_operations.py @@ -249,11 +249,9 @@ class TestVCSOperations(object): # init cache objects CacheKey.delete_all_cache() cache_namespace_uid = 'cache_push_test.{}'.format(hg_repo.repo_id) - invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( - repo_id=hg_repo.repo_id) + repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=hg_repo.repo_id) - inv_context_manager = rc_cache.InvalidationContext( - uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) + inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) with inv_context_manager as invalidation_context: # __enter__ will create and register cache objects