##// END OF EJS Templates
feature(caches): refactor how invalidationContext works, fixes many issues with the previousl solution...
super-admin -
r5288:c652fe5b default
parent child Browse files
Show More
@@ -24,7 +24,6 b' from dogpile.cache import register_backe'
24 from . import region_meta
24 from . import region_meta
25 from .utils import (
25 from .utils import (
26 ActiveRegionCache,
26 ActiveRegionCache,
27 FreshRegionCache,
28 InvalidationContext,
27 InvalidationContext,
29 backend_key_generator,
28 backend_key_generator,
30 clear_cache_namespace,
29 clear_cache_namespace,
@@ -1,4 +1,4 b''
1 # Copyright (C) 2015-2023 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
@@ -255,21 +255,13 b' def clear_cache_namespace(cache_region: '
255
255
256
256
257 class ActiveRegionCache(object):
257 class ActiveRegionCache(object):
258 def __init__(self, context, cache_data):
258 def __init__(self, context, cache_data: dict):
259 self.context = context
259 self.context = context
260 self.cache_data = cache_data
260 self.cache_data = cache_data
261
261
262 def should_invalidate(self):
262 @property
263 return False
263 def state_uid(self) -> str:
264
264 return self.cache_data['cache_state_uid']
265
266 class FreshRegionCache(object):
267 def __init__(self, context, cache_data):
268 self.context = context
269 self.cache_data = cache_data
270
271 def should_invalidate(self):
272 return True
273
265
274
266
275 class InvalidationContext(object):
267 class InvalidationContext(object):
@@ -278,44 +270,40 b' class InvalidationContext(object):'
278
270
279 from rhodecode.lib import rc_cache
271 from rhodecode.lib import rc_cache
280
272
281 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
273 repo_namespace_key = 'some-cache-for-repo-id-100'
282 region = rc_cache.get_or_create_region('some_region', cache_namespace_uid)
274 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
275
276 def cache_generator(_state_uid):
283
277
284 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
278 @region.conditional_cache_on_arguments(namespace='some-common-namespace-100')
285 def heavy_compute(cache_name, param1, param2):
279 def _dummy_func(*args):
286 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
280 # compute heavy function
287
281 return _state_uid, 'result'
288 # invalidation namespace is shared namespace key for all process caches
289 # we use it to send a global signal
290 invalidation_namespace = 'repo_cache:1'
291
282
292 inv_context_manager = rc_cache.InvalidationContext(
283 return _dummy_func
293 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
284
294 with inv_context_manager as invalidation_context:
285 with inv_context_manager as invalidation_context:
295 args = ('one', 'two')
286 cache_state_uid = invalidation_context.state_uid
296 # re-compute and store cache if we get invalidate signal
287 cache_func = cache_generator(cache_state_uid)
297 if invalidation_context.should_invalidate():
288 previous_state_uid, result = cache_func(*call_args)
298 result = heavy_compute.refresh(*args)
299 else:
300 result = heavy_compute(*args)
301
289
302 compute_time = inv_context_manager.compute_time
290 should_invalidate = previous_state_uid != cache_state_uid
303 log.debug('result computed in %.4fs', compute_time)
291 if should_invalidate:
292 _, result = cache_func.refresh(*call_args)
304
293
305 # To send global invalidation signal, simply run
294 # To send global invalidation signal, simply run
306 CacheKey.set_invalidate(invalidation_namespace)
295 CacheKey.set_invalidate(repo_namespace_key)
307
296
308 """
297 """
309
298
310 def __repr__(self):
299 def __repr__(self):
311 return f'<InvalidationContext:{self.cache_key}[{self.uid}]>'
300 return f'<InvalidationContext:{self.cache_key}>'
312
301
313 def __init__(self, uid, invalidation_namespace='',
302 def __init__(self, key, raise_exception=False, thread_scoped=None):
314 raise_exception=False, thread_scoped=None):
303 self.cache_key = key
315 self.uid = uid
304
316 self.invalidation_namespace = invalidation_namespace
317 self.raise_exception = raise_exception
305 self.raise_exception = raise_exception
318 self.proc_id = rhodecode.CONFIG.get('instance_id') or 'DEFAULT'
306 self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT'
319 self.thread_id = 'global'
307 self.thread_id = 'global'
320
308
321 if thread_scoped is None:
309 if thread_scoped is None:
@@ -327,69 +315,22 b' class InvalidationContext(object):'
327 if thread_scoped is True:
315 if thread_scoped is True:
328 self.thread_id = threading.current_thread().ident
316 self.thread_id = threading.current_thread().ident
329
317
330 self.cache_key = compute_key_from_params(uid)
318 self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}'
331 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
332 self.proc_id, self.thread_id, self.cache_key)
333 self.proc_key = f'proc:{self.proc_id}'
334 self.compute_time = 0
319 self.compute_time = 0
335
320
336 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
321 def get_or_create_cache_obj(self):
337 from rhodecode.model.db import CacheKey
322 from rhodecode.model.db import CacheKey, Session, IntegrityError
338
323
339 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
324 cache_obj = CacheKey.get_active_cache(self.cache_key)
340 # fetch all cache keys for this namespace and convert them to a map to find if we
341 # have specific cache_key object registered. We do this because we want to have
342 # all consistent cache_state_uid for newly registered objects
343 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
344 cache_obj = cache_obj_map.get(self.cache_key)
345 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
325 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
346
326
347 if not cache_obj:
327 if not cache_obj:
348 new_cache_args = invalidation_namespace
328 # generate new UID for non-existing cache object
349 first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None
329 cache_state_uid = CacheKey.generate_new_state_uid()
350 cache_state_uid = None
330 cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}',
351 if first_cache_obj:
331 cache_state_uid=cache_state_uid, cache_active=True)
352 cache_state_uid = first_cache_obj.cache_state_uid
353 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
354 cache_state_uid=cache_state_uid)
355
356 return cache_obj
357
358 def __enter__(self):
359 """
360 Test if current object is valid, and return CacheRegion function
361 that does invalidation and calculation
362 """
363 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
364 # register or get a new key based on uid
365 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
366 cache_data = self.cache_obj.get_dict()
367 self._start_time = time.time()
368 if self.cache_obj.cache_active:
369 # means our cache obj is existing and marked as it's
370 # cache is not outdated, we return ActiveRegionCache
371 self.skip_cache_active_change = True
372
373 return ActiveRegionCache(context=self, cache_data=cache_data)
374
375 # the key is either not existing or set to False, we return
376 # the real invalidator which re-computes value. We additionally set
377 # the flag to actually update the Database objects
378 self.skip_cache_active_change = False
379 return FreshRegionCache(context=self, cache_data=cache_data)
380
381 def __exit__(self, exc_type, exc_val, exc_tb):
382 from rhodecode.model.db import IntegrityError, Session
383
384 # save compute time
385 self.compute_time = time.time() - self._start_time
386
387 if self.skip_cache_active_change:
388 return
389
390 try:
332 try:
391 self.cache_obj.cache_active = True
333 Session().add(cache_obj)
392 Session().add(self.cache_obj)
393 Session().commit()
334 Session().commit()
394 except IntegrityError:
335 except IntegrityError:
395 # if we catch integrity error, it means we inserted this object
336 # if we catch integrity error, it means we inserted this object
@@ -401,3 +342,17 b' class InvalidationContext(object):'
401 Session().rollback()
342 Session().rollback()
402 if self.raise_exception:
343 if self.raise_exception:
403 raise
344 raise
345 return cache_obj
346
347 def __enter__(self):
348 log.debug('Entering cache invalidation check context: %s', self)
349 self._start_time = time.time()
350
351 self.cache_obj = self.get_or_create_cache_obj()
352 cache_data = self.cache_obj.get_dict()
353
354 return ActiveRegionCache(context=self, cache_data=cache_data)
355
356 def __exit__(self, exc_type, exc_val, exc_tb):
357 # save compute time
358 self.compute_time = time.time() - self._start_time
@@ -2034,10 +2034,9 b' class Repository(Base, BaseModel):'
2034 """
2034 """
2035 Returns associated cache keys for that repo
2035 Returns associated cache keys for that repo
2036 """
2036 """
2037 invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format(
2037 repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id)
2038 repo_id=self.repo_id)
2039 return CacheKey.query()\
2038 return CacheKey.query()\
2040 .filter(CacheKey.cache_args == invalidation_namespace)\
2039 .filter(CacheKey.cache_key == repo_namespace_key)\
2041 .order_by(CacheKey.cache_key)\
2040 .order_by(CacheKey.cache_key)\
2042 .all()
2041 .all()
2043
2042
@@ -2609,29 +2608,38 b' class Repository(Base, BaseModel):'
2609 from rhodecode.lib import rc_cache
2608 from rhodecode.lib import rc_cache
2610
2609
2611 cache_namespace_uid = f'repo_instance.{self.repo_id}'
2610 cache_namespace_uid = f'repo_instance.{self.repo_id}'
2612 invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format(
2613 repo_id=self.repo_id)
2614 region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid)
2611 region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid)
2615
2612
2616 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid)
2617 def get_instance_cached(repo_id, context_id, _cache_state_uid):
2618 return self._get_instance(repo_state_uid=_cache_state_uid)
2619
2620 # we must use thread scoped cache here,
2613 # we must use thread scoped cache here,
2621 # because each thread of gevent needs it's own not shared connection and cache
2614 # because each thread of gevent needs it's own not shared connection and cache
2622 # we also alter `args` so the cache key is individual for every green thread.
2615 # we also alter `args` so the cache key is individual for every green thread.
2623 inv_context_manager = rc_cache.InvalidationContext(
2616 repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id)
2624 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace,
2617 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key, thread_scoped=True)
2625 thread_scoped=True)
2618
2619 # our wrapped caching function that takes state_uid to save the previous state in
2620 def cache_generator(_state_uid):
2621
2622 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid)
2623 def get_instance_cached(_repo_id, _process_context_id):
2624 # we save in cached func the generation state so we can detect a change and invalidate caches
2625 return _state_uid, self._get_instance(repo_state_uid=_state_uid)
2626
2627 return get_instance_cached
2628
2626 with inv_context_manager as invalidation_context:
2629 with inv_context_manager as invalidation_context:
2627 cache_state_uid = invalidation_context.cache_data['cache_state_uid']
2630 cache_state_uid = invalidation_context.state_uid
2628 args = (self.repo_id, inv_context_manager.cache_key, cache_state_uid)
2631 cache_func = cache_generator(cache_state_uid)
2629
2632
2630 # re-compute and store cache if we get invalidate signal
2633 args = self.repo_id, inv_context_manager.proc_key
2631 if invalidation_context.should_invalidate():
2634
2632 instance = get_instance_cached.refresh(*args)
2635 previous_state_uid, instance = cache_func(*args)
2633 else:
2636
2634 instance = get_instance_cached(*args)
2637 if instance:
2638 # now compare keys, the "cache" state vs expected state.
2639 if previous_state_uid != cache_state_uid:
2640 log.warning('Cached state uid %s is different than current state uid %s',
2641 previous_state_uid, cache_state_uid)
2642 _, instance = cache_func.refresh(*args)
2635
2643
2636 log.debug('Repo instance fetched in %.4fs', inv_context_manager.compute_time)
2644 log.debug('Repo instance fetched in %.4fs', inv_context_manager.compute_time)
2637 return instance
2645 return instance
@@ -2652,6 +2660,7 b' class Repository(Base, BaseModel):'
2652 _vcs_alias=self.repo_type)
2660 _vcs_alias=self.repo_type)
2653 if repo is not None:
2661 if repo is not None:
2654 repo.count() # cache rebuild
2662 repo.count() # cache rebuild
2663
2655 return repo
2664 return repo
2656
2665
2657 def get_shadow_repository_path(self, workspace_id):
2666 def get_shadow_repository_path(self, workspace_id):
@@ -3675,10 +3684,10 b' class CacheKey(Base, BaseModel):'
3675 cache_state_uid = Column("cache_state_uid", String(255), nullable=True, unique=None, default=None)
3684 cache_state_uid = Column("cache_state_uid", String(255), nullable=True, unique=None, default=None)
3676 cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False)
3685 cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False)
3677
3686
3678 def __init__(self, cache_key, cache_args='', cache_state_uid=None):
3687 def __init__(self, cache_key, cache_args='', cache_state_uid=None, cache_active=False):
3679 self.cache_key = cache_key
3688 self.cache_key = cache_key
3680 self.cache_args = cache_args
3689 self.cache_args = cache_args
3681 self.cache_active = False
3690 self.cache_active = cache_active
3682 # first key should be same for all entries, since all workers should share it
3691 # first key should be same for all entries, since all workers should share it
3683 self.cache_state_uid = cache_state_uid or self.generate_new_state_uid()
3692 self.cache_state_uid = cache_state_uid or self.generate_new_state_uid()
3684
3693
@@ -3730,18 +3739,18 b' class CacheKey(Base, BaseModel):'
3730 """
3739 """
3731 Mark all caches of a repo as invalid in the database.
3740 Mark all caches of a repo as invalid in the database.
3732 """
3741 """
3733
3734 try:
3742 try:
3735 qry = Session().query(cls).filter(cls.cache_args == cache_uid)
3743 qry = Session().query(cls).filter(cls.cache_key == cache_uid)
3736 if delete:
3744 if delete:
3737 qry.delete()
3745 qry.delete()
3738 log.debug('cache objects deleted for cache args %s',
3746 log.debug('cache objects deleted for cache args %s',
3739 safe_str(cache_uid))
3747 safe_str(cache_uid))
3740 else:
3748 else:
3741 qry.update({"cache_active": False,
3749 new_uid = cls.generate_new_state_uid()
3742 "cache_state_uid": cls.generate_new_state_uid()})
3750 qry.update({"cache_state_uid": new_uid,
3743 log.debug('cache objects marked as invalid for cache args %s',
3751 "cache_args": f"repo_state:{time.time()}"})
3744 safe_str(cache_uid))
3752 log.debug('cache object %s set new UID %s',
3753 safe_str(cache_uid), new_uid)
3745
3754
3746 Session().commit()
3755 Session().commit()
3747 except Exception:
3756 except Exception:
@@ -299,9 +299,8 b' class ScmModel(BaseModel):'
299 repo = Repository.get_by_repo_name(repo_name)
299 repo = Repository.get_by_repo_name(repo_name)
300
300
301 if repo:
301 if repo:
302 invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format(
302 repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo.repo_id)
303 repo_id=repo.repo_id)
303 CacheKey.set_invalidate(repo_namespace_key, delete=delete)
304 CacheKey.set_invalidate(invalidation_namespace, delete=delete)
305
304
306 repo_id = repo.repo_id
305 repo_id = repo.repo_id
307 config = repo._config
306 config = repo._config
@@ -1,5 +1,4 b''
1
1 # Copyright (C) 2010-2024 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
2 #
4 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
@@ -27,6 +26,7 b' import string'
27 import mock
26 import mock
28 import pytest
27 import pytest
29 import functools
28 import functools
29 import time
30
30
31 from rhodecode.tests import no_newline_id_generator
31 from rhodecode.tests import no_newline_id_generator
32 from rhodecode.tests.utils import run_test_concurrently
32 from rhodecode.tests.utils import run_test_concurrently
@@ -619,126 +619,130 b' def test_get_repo_by_id(test, expected):'
619
619
620 def test_invalidation_context(baseapp):
620 def test_invalidation_context(baseapp):
621 repo_id = 9999
621 repo_id = 9999
622 calls = [1, 2]
623 call_args = ('some-key',)
624 region = rc_cache.get_or_create_region('cache_repo_longterm')
622
625
623 cache_namespace_uid = 'cache_repo_instance.{}_{}'.format(
626 repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id)
624 repo_id, CacheKey.CACHE_TYPE_FEED)
627 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
625 invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format(
626 repo_id=repo_id)
627 region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid)
628
628
629 calls = [1, 2]
629 def cache_generator(_state_uid):
630
630
631 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid)
631 @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}')
632 def _dummy_func(cache_key):
632 def _dummy_func(*args):
633 val = calls.pop(0)
633 val = calls.pop(0)
634 return 'result:{}'.format(val)
634 return _state_uid, f'result:{val}'
635
635
636 inv_context_manager = rc_cache.InvalidationContext(
636 return _dummy_func
637 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
638
637
639 # 1st call, fresh caches
638 # 1st call, fresh caches
640 with inv_context_manager as invalidation_context:
639 with inv_context_manager as invalidation_context:
641 should_invalidate = invalidation_context.should_invalidate()
640 cache_state_uid = invalidation_context.state_uid
641 cache_func = cache_generator(cache_state_uid)
642 previous_state_uid, result = cache_func(*call_args)
643
644 should_invalidate = previous_state_uid != cache_state_uid
642 if should_invalidate:
645 if should_invalidate:
643 result = _dummy_func.refresh('some-key')
646 _, result = cache_func.refresh(*call_args)
644 else:
645 result = _dummy_func('some-key')
646
647
647 assert isinstance(invalidation_context, rc_cache.FreshRegionCache)
648 assert should_invalidate is False # 1st call, we don't need to invalidate
648 assert should_invalidate is True
649
649
650 assert 'result:1' == result
650 assert 'result:1' == result
651 # should be cached so calling it twice will give the same result !
651 # should be already cached so calling it twice will give the same result!
652 result = _dummy_func('some-key')
652 _, result = cache_func(*call_args)
653 assert 'result:1' == result
653 assert 'result:1' == result
654
654
655 # 2nd call, we create a new context manager, this should be now key aware, and
655 # 2nd call, we create a new context manager, this should be now key aware, and
656 # return an active cache region
656 # return an active cache region from DB based on the same uid
657 with inv_context_manager as invalidation_context:
657 with inv_context_manager as invalidation_context:
658 should_invalidate = invalidation_context.should_invalidate()
658 cache_state_uid = invalidation_context.state_uid
659 assert isinstance(invalidation_context, rc_cache.ActiveRegionCache)
659 cache_func = cache_generator(cache_state_uid)
660 assert should_invalidate is False
660 previous_state_uid, result = cache_func(*call_args)
661
662 should_invalidate = previous_state_uid != cache_state_uid
663 if should_invalidate:
664 _, result = cache_func.refresh(*call_args)
665
666 assert should_invalidate is False # 1st call, we don't need to invalidate
661
667
662 # Mark invalidation
668 # Mark invalidation
663 CacheKey.set_invalidate(invalidation_namespace)
669 CacheKey.set_invalidate(repo_namespace_key)
664
670
665 # 3nd call, fresh caches
671 # 3nd call, fresh caches
666 with inv_context_manager as invalidation_context:
672 with inv_context_manager as invalidation_context:
667 should_invalidate = invalidation_context.should_invalidate()
673 cache_state_uid = invalidation_context.state_uid
674 cache_func = cache_generator(cache_state_uid)
675 previous_state_uid, result = cache_func(*call_args)
676
677 should_invalidate = previous_state_uid != cache_state_uid
668 if should_invalidate:
678 if should_invalidate:
669 result = _dummy_func.refresh('some-key')
679 _, result = cache_func.refresh(*call_args)
670 else:
671 result = _dummy_func('some-key')
672
680
673 assert isinstance(invalidation_context, rc_cache.FreshRegionCache)
674 assert should_invalidate is True
681 assert should_invalidate is True
675
682
676 assert 'result:2' == result
683 assert 'result:2' == result
677
684
678 # cached again, same result
685 # cached again, same result
679 result = _dummy_func('some-key')
686 _, result = cache_func(*call_args)
680 assert 'result:2' == result
687 assert 'result:2' == result
681
688
682
689
683 def test_invalidation_context_exception_in_compute(baseapp):
690 def test_invalidation_context_exception_in_compute(baseapp):
684 repo_id = 888
691 repo_id = 888
692 region = rc_cache.get_or_create_region('cache_repo_longterm')
685
693
686 cache_namespace_uid = 'cache_repo_instance.{}_{}'.format(
694 repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id)
687 repo_id, CacheKey.CACHE_TYPE_FEED)
695 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
688 invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format(
689 repo_id=repo_id)
690 region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid)
691
696
692 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid)
697 def cache_generator(_state_uid):
693 def _dummy_func(cache_key):
698 @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}')
699 def _dummy_func(*args):
694 raise Exception('Error in cache func')
700 raise Exception('Error in cache func')
695
701
702 return _dummy_func
703
696 with pytest.raises(Exception):
704 with pytest.raises(Exception):
697 inv_context_manager = rc_cache.InvalidationContext(
698 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
699
705
700 # 1st call, fresh caches
706 # 1st call, fresh caches
701 with inv_context_manager as invalidation_context:
707 with inv_context_manager as invalidation_context:
702 should_invalidate = invalidation_context.should_invalidate()
708 cache_state_uid = invalidation_context.state_uid
703 if should_invalidate:
709 cache_func = cache_generator(cache_state_uid)
704 _dummy_func.refresh('some-key-2')
710 cache_func(1, 2, 3)
705 else:
706 _dummy_func('some-key-2')
707
711
708
712
709 @pytest.mark.parametrize('execution_number', range(5))
713 @pytest.mark.parametrize('execution_number', range(5))
710 def test_cache_invalidation_race_condition(execution_number, baseapp):
714 def test_cache_invalidation_race_condition(execution_number, baseapp):
711 import time
712
715
713 repo_id = 777
716 repo_id = 777
714
717
715 cache_namespace_uid = 'cache_repo_instance.{}_{}'.format(
718 region = rc_cache.get_or_create_region('cache_repo_longterm')
716 repo_id, CacheKey.CACHE_TYPE_FEED)
719 repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id)
717 invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format(
718 repo_id=repo_id)
719 region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid)
720
720
721 @run_test_concurrently(25)
721 @run_test_concurrently(25)
722 def test_create_and_delete_cache_keys():
722 def test_create_and_delete_cache_keys():
723 time.sleep(0.2)
723 time.sleep(0.2)
724
724
725 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid)
725 def cache_generator(_state_uid):
726 def _dummy_func(cache_key):
727 val = 'async'
728 return 'result:{}'.format(val)
729
726
730 inv_context_manager = rc_cache.InvalidationContext(
727 @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}')
731 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
728 def _dummy_func(*args):
729 return _state_uid, 'result:async'
730
731 return _dummy_func
732
733 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
732
734
733 # 1st call, fresh caches
735 # 1st call, fresh caches
734 with inv_context_manager as invalidation_context:
736 with inv_context_manager as invalidation_context:
735 should_invalidate = invalidation_context.should_invalidate()
737 cache_state_uid = invalidation_context.state_uid
738 cache_func = cache_generator(cache_state_uid)
739 previous_state_uid, result = cache_func('doo')
740
741 should_invalidate = previous_state_uid != cache_state_uid
736 if should_invalidate:
742 if should_invalidate:
737 _dummy_func.refresh('some-key-3')
743 _, result = cache_func.refresh('doo')
738 else:
739 _dummy_func('some-key-3')
740
744
741 # Mark invalidation
745 # Mark invalidation
742 CacheKey.set_invalidate(invalidation_namespace)
746 CacheKey.set_invalidate(repo_namespace_key)
743
747
744 test_create_and_delete_cache_keys()
748 test_create_and_delete_cache_keys()
@@ -249,11 +249,9 b' class TestVCSOperations(object):'
249 # init cache objects
249 # init cache objects
250 CacheKey.delete_all_cache()
250 CacheKey.delete_all_cache()
251 cache_namespace_uid = 'cache_push_test.{}'.format(hg_repo.repo_id)
251 cache_namespace_uid = 'cache_push_test.{}'.format(hg_repo.repo_id)
252 invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format(
252 repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=hg_repo.repo_id)
253 repo_id=hg_repo.repo_id)
254
253
255 inv_context_manager = rc_cache.InvalidationContext(
254 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
256 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
257
255
258 with inv_context_manager as invalidation_context:
256 with inv_context_manager as invalidation_context:
259 # __enter__ will create and register cache objects
257 # __enter__ will create and register cache objects
General Comments 0
You need to be logged in to leave comments. Login now