Show More
@@ -24,7 +24,6 b' from dogpile.cache import register_backe' | |||
|
24 | 24 | from . import region_meta |
|
25 | 25 | from .utils import ( |
|
26 | 26 | ActiveRegionCache, |
|
27 | FreshRegionCache, | |
|
28 | 27 | InvalidationContext, |
|
29 | 28 | backend_key_generator, |
|
30 | 29 | clear_cache_namespace, |
@@ -1,4 +1,4 b'' | |||
|
1 |
# Copyright (C) 2015-202 |
|
|
1 | # Copyright (C) 2015-2024 RhodeCode GmbH | |
|
2 | 2 | # |
|
3 | 3 | # This program is free software: you can redistribute it and/or modify |
|
4 | 4 | # it under the terms of the GNU Affero General Public License, version 3 |
@@ -255,21 +255,13 b' def clear_cache_namespace(cache_region: ' | |||
|
255 | 255 | |
|
256 | 256 | |
|
257 | 257 | class ActiveRegionCache(object): |
|
258 | def __init__(self, context, cache_data): | |
|
258 | def __init__(self, context, cache_data: dict): | |
|
259 | 259 | self.context = context |
|
260 | 260 | self.cache_data = cache_data |
|
261 | 261 | |
|
262 | def should_invalidate(self): | |
|
263 | return False | |
|
264 | ||
|
265 | ||
|
266 | class FreshRegionCache(object): | |
|
267 | def __init__(self, context, cache_data): | |
|
268 | self.context = context | |
|
269 | self.cache_data = cache_data | |
|
270 | ||
|
271 | def should_invalidate(self): | |
|
272 | return True | |
|
262 | @property | |
|
263 | def state_uid(self) -> str: | |
|
264 | return self.cache_data['cache_state_uid'] | |
|
273 | 265 | |
|
274 | 266 | |
|
275 | 267 | class InvalidationContext(object): |
@@ -278,44 +270,40 b' class InvalidationContext(object):' | |||
|
278 | 270 | |
|
279 | 271 | from rhodecode.lib import rc_cache |
|
280 | 272 | |
|
281 | cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1) | |
|
282 | region = rc_cache.get_or_create_region('some_region', cache_namespace_uid) | |
|
273 | repo_namespace_key = 'some-cache-for-repo-id-100' | |
|
274 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
|
275 | ||
|
276 | def cache_generator(_state_uid): | |
|
283 | 277 | |
|
284 |
@region.conditional_cache_on_arguments(namespace= |
|
|
285 | def heavy_compute(cache_name, param1, param2): | |
|
286 | print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2)) | |
|
287 | ||
|
288 | # invalidation namespace is shared namespace key for all process caches | |
|
289 | # we use it to send a global signal | |
|
290 | invalidation_namespace = 'repo_cache:1' | |
|
278 | @region.conditional_cache_on_arguments(namespace='some-common-namespace-100') | |
|
279 | def _dummy_func(*args): | |
|
280 | # compute heavy function | |
|
281 | return _state_uid, 'result' | |
|
291 | 282 | |
|
292 | inv_context_manager = rc_cache.InvalidationContext( | |
|
293 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) | |
|
283 | return _dummy_func | |
|
284 | ||
|
294 | 285 | with inv_context_manager as invalidation_context: |
|
295 | args = ('one', 'two') | |
|
296 | # re-compute and store cache if we get invalidate signal | |
|
297 | if invalidation_context.should_invalidate(): | |
|
298 | result = heavy_compute.refresh(*args) | |
|
299 | else: | |
|
300 | result = heavy_compute(*args) | |
|
286 | cache_state_uid = invalidation_context.state_uid | |
|
287 | cache_func = cache_generator(cache_state_uid) | |
|
288 | previous_state_uid, result = cache_func(*call_args) | |
|
301 | 289 | |
|
302 | compute_time = inv_context_manager.compute_time | |
|
303 | log.debug('result computed in %.4fs', compute_time) | |
|
290 | should_invalidate = previous_state_uid != cache_state_uid | |
|
291 | if should_invalidate: | |
|
292 | _, result = cache_func.refresh(*call_args) | |
|
304 | 293 | |
|
305 | 294 | # To send global invalidation signal, simply run |
|
306 |
CacheKey.set_invalidate( |
|
|
295 | CacheKey.set_invalidate(repo_namespace_key) | |
|
307 | 296 | |
|
308 | 297 | """ |
|
309 | 298 | |
|
310 | 299 | def __repr__(self): |
|
311 |
return f'<InvalidationContext:{self.cache_key} |
|
|
300 | return f'<InvalidationContext:{self.cache_key}>' | |
|
312 | 301 | |
|
313 | def __init__(self, uid, invalidation_namespace='', | |
|
314 | raise_exception=False, thread_scoped=None): | |
|
315 | self.uid = uid | |
|
316 | self.invalidation_namespace = invalidation_namespace | |
|
302 | def __init__(self, key, raise_exception=False, thread_scoped=None): | |
|
303 | self.cache_key = key | |
|
304 | ||
|
317 | 305 | self.raise_exception = raise_exception |
|
318 |
self.proc_id = rhodecode.C |
|
|
306 | self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT' | |
|
319 | 307 | self.thread_id = 'global' |
|
320 | 308 | |
|
321 | 309 | if thread_scoped is None: |
@@ -327,77 +315,44 b' class InvalidationContext(object):' | |||
|
327 | 315 | if thread_scoped is True: |
|
328 | 316 | self.thread_id = threading.current_thread().ident |
|
329 | 317 | |
|
330 | self.cache_key = compute_key_from_params(uid) | |
|
331 | self.cache_key = 'proc:{}|thread:{}|params:{}'.format( | |
|
332 | self.proc_id, self.thread_id, self.cache_key) | |
|
333 | self.proc_key = f'proc:{self.proc_id}' | |
|
318 | self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}' | |
|
334 | 319 | self.compute_time = 0 |
|
335 | 320 | |
|
336 |
def get_or_create_cache_obj(self |
|
|
337 | from rhodecode.model.db import CacheKey | |
|
321 | def get_or_create_cache_obj(self): | |
|
322 | from rhodecode.model.db import CacheKey, Session, IntegrityError | |
|
338 | 323 | |
|
339 | invalidation_namespace = invalidation_namespace or self.invalidation_namespace | |
|
340 | # fetch all cache keys for this namespace and convert them to a map to find if we | |
|
341 | # have specific cache_key object registered. We do this because we want to have | |
|
342 | # all consistent cache_state_uid for newly registered objects | |
|
343 | cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace) | |
|
344 | cache_obj = cache_obj_map.get(self.cache_key) | |
|
324 | cache_obj = CacheKey.get_active_cache(self.cache_key) | |
|
345 | 325 | log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key) |
|
346 | 326 | |
|
347 | 327 | if not cache_obj: |
|
348 | new_cache_args = invalidation_namespace | |
|
349 | first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None | |
|
350 | cache_state_uid = None | |
|
351 | if first_cache_obj: | |
|
352 | cache_state_uid = first_cache_obj.cache_state_uid | |
|
353 | cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args, | |
|
354 | cache_state_uid=cache_state_uid) | |
|
355 | ||
|
328 | # generate new UID for non-existing cache object | |
|
329 | cache_state_uid = CacheKey.generate_new_state_uid() | |
|
330 | cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}', | |
|
331 | cache_state_uid=cache_state_uid, cache_active=True) | |
|
332 | try: | |
|
333 | Session().add(cache_obj) | |
|
334 | Session().commit() | |
|
335 | except IntegrityError: | |
|
336 | # if we catch integrity error, it means we inserted this object | |
|
337 | # assumption is that's really an edge race-condition case and | |
|
338 | # it's safe is to skip it | |
|
339 | Session().rollback() | |
|
340 | except Exception: | |
|
341 | log.exception('Failed to commit on cache key update') | |
|
342 | Session().rollback() | |
|
343 | if self.raise_exception: | |
|
344 | raise | |
|
356 | 345 | return cache_obj |
|
357 | 346 | |
|
358 | 347 | def __enter__(self): |
|
359 | """ | |
|
360 | Test if current object is valid, and return CacheRegion function | |
|
361 | that does invalidation and calculation | |
|
362 | """ | |
|
363 | log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace) | |
|
364 | # register or get a new key based on uid | |
|
365 | self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid) | |
|
366 | cache_data = self.cache_obj.get_dict() | |
|
348 | log.debug('Entering cache invalidation check context: %s', self) | |
|
367 | 349 | self._start_time = time.time() |
|
368 | if self.cache_obj.cache_active: | |
|
369 | # means our cache obj is existing and marked as it's | |
|
370 | # cache is not outdated, we return ActiveRegionCache | |
|
371 | self.skip_cache_active_change = True | |
|
372 | 350 | |
|
373 | return ActiveRegionCache(context=self, cache_data=cache_data) | |
|
351 | self.cache_obj = self.get_or_create_cache_obj() | |
|
352 | cache_data = self.cache_obj.get_dict() | |
|
374 | 353 | |
|
375 | # the key is either not existing or set to False, we return | |
|
376 | # the real invalidator which re-computes value. We additionally set | |
|
377 | # the flag to actually update the Database objects | |
|
378 | self.skip_cache_active_change = False | |
|
379 | return FreshRegionCache(context=self, cache_data=cache_data) | |
|
354 | return ActiveRegionCache(context=self, cache_data=cache_data) | |
|
380 | 355 | |
|
381 | 356 | def __exit__(self, exc_type, exc_val, exc_tb): |
|
382 | from rhodecode.model.db import IntegrityError, Session | |
|
383 | ||
|
384 | 357 | # save compute time |
|
385 | 358 | self.compute_time = time.time() - self._start_time |
|
386 | ||
|
387 | if self.skip_cache_active_change: | |
|
388 | return | |
|
389 | ||
|
390 | try: | |
|
391 | self.cache_obj.cache_active = True | |
|
392 | Session().add(self.cache_obj) | |
|
393 | Session().commit() | |
|
394 | except IntegrityError: | |
|
395 | # if we catch integrity error, it means we inserted this object | |
|
396 | # assumption is that's really an edge race-condition case and | |
|
397 | # it's safe is to skip it | |
|
398 | Session().rollback() | |
|
399 | except Exception: | |
|
400 | log.exception('Failed to commit on cache key update') | |
|
401 | Session().rollback() | |
|
402 | if self.raise_exception: | |
|
403 | raise |
@@ -2034,10 +2034,9 b' class Repository(Base, BaseModel):' | |||
|
2034 | 2034 | """ |
|
2035 | 2035 | Returns associated cache keys for that repo |
|
2036 | 2036 | """ |
|
2037 |
|
|
|
2038 | repo_id=self.repo_id) | |
|
2037 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id) | |
|
2039 | 2038 | return CacheKey.query()\ |
|
2040 |
.filter(CacheKey.cache_ |
|
|
2039 | .filter(CacheKey.cache_key == repo_namespace_key)\ | |
|
2041 | 2040 | .order_by(CacheKey.cache_key)\ |
|
2042 | 2041 | .all() |
|
2043 | 2042 | |
@@ -2609,29 +2608,38 b' class Repository(Base, BaseModel):' | |||
|
2609 | 2608 | from rhodecode.lib import rc_cache |
|
2610 | 2609 | |
|
2611 | 2610 | cache_namespace_uid = f'repo_instance.{self.repo_id}' |
|
2612 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( | |
|
2613 | repo_id=self.repo_id) | |
|
2614 | 2611 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) |
|
2615 | 2612 | |
|
2616 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) | |
|
2617 | def get_instance_cached(repo_id, context_id, _cache_state_uid): | |
|
2618 | return self._get_instance(repo_state_uid=_cache_state_uid) | |
|
2619 | ||
|
2620 | 2613 | # we must use thread scoped cache here, |
|
2621 | 2614 | # because each thread of gevent needs it's own not shared connection and cache |
|
2622 | 2615 | # we also alter `args` so the cache key is individual for every green thread. |
|
2623 | inv_context_manager = rc_cache.InvalidationContext( | |
|
2624 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace, | |
|
2625 | thread_scoped=True) | |
|
2616 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id) | |
|
2617 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key, thread_scoped=True) | |
|
2618 | ||
|
2619 | # our wrapped caching function that takes state_uid to save the previous state in | |
|
2620 | def cache_generator(_state_uid): | |
|
2621 | ||
|
2622 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) | |
|
2623 | def get_instance_cached(_repo_id, _process_context_id): | |
|
2624 | # we save in cached func the generation state so we can detect a change and invalidate caches | |
|
2625 | return _state_uid, self._get_instance(repo_state_uid=_state_uid) | |
|
2626 | ||
|
2627 | return get_instance_cached | |
|
2628 | ||
|
2626 | 2629 | with inv_context_manager as invalidation_context: |
|
2627 |
cache_state_uid = invalidation_context. |
|
|
2628 | args = (self.repo_id, inv_context_manager.cache_key, cache_state_uid) | |
|
2629 | ||
|
2630 | # re-compute and store cache if we get invalidate signal | |
|
2631 | if invalidation_context.should_invalidate(): | |
|
2632 |
|
|
|
2633 | else: | |
|
2634 | instance = get_instance_cached(*args) | |
|
2630 | cache_state_uid = invalidation_context.state_uid | |
|
2631 | cache_func = cache_generator(cache_state_uid) | |
|
2632 | ||
|
2633 | args = self.repo_id, inv_context_manager.proc_key | |
|
2634 | ||
|
2635 | previous_state_uid, instance = cache_func(*args) | |
|
2636 | ||
|
2637 | if instance: | |
|
2638 | # now compare keys, the "cache" state vs expected state. | |
|
2639 | if previous_state_uid != cache_state_uid: | |
|
2640 | log.warning('Cached state uid %s is different than current state uid %s', | |
|
2641 | previous_state_uid, cache_state_uid) | |
|
2642 | _, instance = cache_func.refresh(*args) | |
|
2635 | 2643 | |
|
2636 | 2644 | log.debug('Repo instance fetched in %.4fs', inv_context_manager.compute_time) |
|
2637 | 2645 | return instance |
@@ -2652,6 +2660,7 b' class Repository(Base, BaseModel):' | |||
|
2652 | 2660 | _vcs_alias=self.repo_type) |
|
2653 | 2661 | if repo is not None: |
|
2654 | 2662 | repo.count() # cache rebuild |
|
2663 | ||
|
2655 | 2664 | return repo |
|
2656 | 2665 | |
|
2657 | 2666 | def get_shadow_repository_path(self, workspace_id): |
@@ -3675,10 +3684,10 b' class CacheKey(Base, BaseModel):' | |||
|
3675 | 3684 | cache_state_uid = Column("cache_state_uid", String(255), nullable=True, unique=None, default=None) |
|
3676 | 3685 | cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False) |
|
3677 | 3686 | |
|
3678 | def __init__(self, cache_key, cache_args='', cache_state_uid=None): | |
|
3687 | def __init__(self, cache_key, cache_args='', cache_state_uid=None, cache_active=False): | |
|
3679 | 3688 | self.cache_key = cache_key |
|
3680 | 3689 | self.cache_args = cache_args |
|
3681 |
self.cache_active = |
|
|
3690 | self.cache_active = cache_active | |
|
3682 | 3691 | # first key should be same for all entries, since all workers should share it |
|
3683 | 3692 | self.cache_state_uid = cache_state_uid or self.generate_new_state_uid() |
|
3684 | 3693 | |
@@ -3730,18 +3739,18 b' class CacheKey(Base, BaseModel):' | |||
|
3730 | 3739 | """ |
|
3731 | 3740 | Mark all caches of a repo as invalid in the database. |
|
3732 | 3741 | """ |
|
3733 | ||
|
3734 | 3742 | try: |
|
3735 |
qry = Session().query(cls).filter(cls.cache_ |
|
|
3743 | qry = Session().query(cls).filter(cls.cache_key == cache_uid) | |
|
3736 | 3744 | if delete: |
|
3737 | 3745 | qry.delete() |
|
3738 | 3746 | log.debug('cache objects deleted for cache args %s', |
|
3739 | 3747 | safe_str(cache_uid)) |
|
3740 | 3748 | else: |
|
3741 | qry.update({"cache_active": False, | |
|
3742 |
|
|
|
3743 | log.debug('cache objects marked as invalid for cache args %s', | |
|
3744 | safe_str(cache_uid)) | |
|
3749 | new_uid = cls.generate_new_state_uid() | |
|
3750 | qry.update({"cache_state_uid": new_uid, | |
|
3751 | "cache_args": f"repo_state:{time.time()}"}) | |
|
3752 | log.debug('cache object %s set new UID %s', | |
|
3753 | safe_str(cache_uid), new_uid) | |
|
3745 | 3754 | |
|
3746 | 3755 | Session().commit() |
|
3747 | 3756 | except Exception: |
@@ -299,9 +299,8 b' class ScmModel(BaseModel):' | |||
|
299 | 299 | repo = Repository.get_by_repo_name(repo_name) |
|
300 | 300 | |
|
301 | 301 | if repo: |
|
302 |
|
|
|
303 | repo_id=repo.repo_id) | |
|
304 | CacheKey.set_invalidate(invalidation_namespace, delete=delete) | |
|
302 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo.repo_id) | |
|
303 | CacheKey.set_invalidate(repo_namespace_key, delete=delete) | |
|
305 | 304 | |
|
306 | 305 | repo_id = repo.repo_id |
|
307 | 306 | config = repo._config |
@@ -1,5 +1,4 b'' | |||
|
1 | ||
|
2 | # Copyright (C) 2010-2023 RhodeCode GmbH | |
|
1 | # Copyright (C) 2010-2024 RhodeCode GmbH | |
|
3 | 2 | # |
|
4 | 3 | # This program is free software: you can redistribute it and/or modify |
|
5 | 4 | # it under the terms of the GNU Affero General Public License, version 3 |
@@ -27,6 +26,7 b' import string' | |||
|
27 | 26 | import mock |
|
28 | 27 | import pytest |
|
29 | 28 | import functools |
|
29 | import time | |
|
30 | 30 | |
|
31 | 31 | from rhodecode.tests import no_newline_id_generator |
|
32 | 32 | from rhodecode.tests.utils import run_test_concurrently |
@@ -619,126 +619,130 b' def test_get_repo_by_id(test, expected):' | |||
|
619 | 619 | |
|
620 | 620 | def test_invalidation_context(baseapp): |
|
621 | 621 | repo_id = 9999 |
|
622 | calls = [1, 2] | |
|
623 | call_args = ('some-key',) | |
|
624 | region = rc_cache.get_or_create_region('cache_repo_longterm') | |
|
622 | 625 | |
|
623 | cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( | |
|
624 | repo_id, CacheKey.CACHE_TYPE_FEED) | |
|
625 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( | |
|
626 | repo_id=repo_id) | |
|
627 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) | |
|
626 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) | |
|
627 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
|
628 | 628 | |
|
629 | calls = [1, 2] | |
|
629 | def cache_generator(_state_uid): | |
|
630 | 630 | |
|
631 |
@region.conditional_cache_on_arguments(namespace= |
|
|
632 |
def _dummy_func( |
|
|
633 | val = calls.pop(0) | |
|
634 |
return 'result:{}' |
|
|
631 | @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') | |
|
632 | def _dummy_func(*args): | |
|
633 | val = calls.pop(0) | |
|
634 | return _state_uid, f'result:{val}' | |
|
635 | 635 | |
|
636 | inv_context_manager = rc_cache.InvalidationContext( | |
|
637 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) | |
|
636 | return _dummy_func | |
|
638 | 637 | |
|
639 | 638 | # 1st call, fresh caches |
|
640 | 639 | with inv_context_manager as invalidation_context: |
|
641 |
|
|
|
640 | cache_state_uid = invalidation_context.state_uid | |
|
641 | cache_func = cache_generator(cache_state_uid) | |
|
642 | previous_state_uid, result = cache_func(*call_args) | |
|
643 | ||
|
644 | should_invalidate = previous_state_uid != cache_state_uid | |
|
642 | 645 | if should_invalidate: |
|
643 |
result = |
|
|
644 | else: | |
|
645 | result = _dummy_func('some-key') | |
|
646 | _, result = cache_func.refresh(*call_args) | |
|
646 | 647 | |
|
647 | assert isinstance(invalidation_context, rc_cache.FreshRegionCache) | |
|
648 | assert should_invalidate is True | |
|
648 | assert should_invalidate is False # 1st call, we don't need to invalidate | |
|
649 | 649 | |
|
650 | 650 | assert 'result:1' == result |
|
651 |
# should be cached so calling it twice will give the same result |
|
|
652 |
result = |
|
|
651 | # should be already cached so calling it twice will give the same result! | |
|
652 | _, result = cache_func(*call_args) | |
|
653 | 653 | assert 'result:1' == result |
|
654 | 654 | |
|
655 | 655 | # 2nd call, we create a new context manager, this should be now key aware, and |
|
656 | # return an active cache region | |
|
656 | # return an active cache region from DB based on the same uid | |
|
657 | 657 | with inv_context_manager as invalidation_context: |
|
658 |
|
|
|
659 | assert isinstance(invalidation_context, rc_cache.ActiveRegionCache) | |
|
660 | assert should_invalidate is False | |
|
658 | cache_state_uid = invalidation_context.state_uid | |
|
659 | cache_func = cache_generator(cache_state_uid) | |
|
660 | previous_state_uid, result = cache_func(*call_args) | |
|
661 | ||
|
662 | should_invalidate = previous_state_uid != cache_state_uid | |
|
663 | if should_invalidate: | |
|
664 | _, result = cache_func.refresh(*call_args) | |
|
665 | ||
|
666 | assert should_invalidate is False # 1st call, we don't need to invalidate | |
|
661 | 667 | |
|
662 | 668 | # Mark invalidation |
|
663 |
CacheKey.set_invalidate( |
|
|
669 | CacheKey.set_invalidate(repo_namespace_key) | |
|
664 | 670 | |
|
665 | 671 | # 3nd call, fresh caches |
|
666 | 672 | with inv_context_manager as invalidation_context: |
|
667 |
|
|
|
673 | cache_state_uid = invalidation_context.state_uid | |
|
674 | cache_func = cache_generator(cache_state_uid) | |
|
675 | previous_state_uid, result = cache_func(*call_args) | |
|
676 | ||
|
677 | should_invalidate = previous_state_uid != cache_state_uid | |
|
668 | 678 | if should_invalidate: |
|
669 |
result = |
|
|
670 | else: | |
|
671 | result = _dummy_func('some-key') | |
|
679 | _, result = cache_func.refresh(*call_args) | |
|
672 | 680 | |
|
673 | assert isinstance(invalidation_context, rc_cache.FreshRegionCache) | |
|
674 | 681 | assert should_invalidate is True |
|
675 | 682 | |
|
676 | 683 | assert 'result:2' == result |
|
677 | 684 | |
|
678 | 685 | # cached again, same result |
|
679 |
result = |
|
|
686 | _, result = cache_func(*call_args) | |
|
680 | 687 | assert 'result:2' == result |
|
681 | 688 | |
|
682 | 689 | |
|
683 | 690 | def test_invalidation_context_exception_in_compute(baseapp): |
|
684 | 691 | repo_id = 888 |
|
692 | region = rc_cache.get_or_create_region('cache_repo_longterm') | |
|
685 | 693 | |
|
686 | cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( | |
|
687 | repo_id, CacheKey.CACHE_TYPE_FEED) | |
|
688 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( | |
|
689 | repo_id=repo_id) | |
|
690 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) | |
|
694 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) | |
|
695 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
|
691 | 696 | |
|
692 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) | |
|
693 | def _dummy_func(cache_key): | |
|
694 | raise Exception('Error in cache func') | |
|
697 | def cache_generator(_state_uid): | |
|
698 | @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') | |
|
699 | def _dummy_func(*args): | |
|
700 | raise Exception('Error in cache func') | |
|
701 | ||
|
702 | return _dummy_func | |
|
695 | 703 | |
|
696 | 704 | with pytest.raises(Exception): |
|
697 | inv_context_manager = rc_cache.InvalidationContext( | |
|
698 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) | |
|
699 | 705 | |
|
700 | 706 | # 1st call, fresh caches |
|
701 | 707 | with inv_context_manager as invalidation_context: |
|
702 |
|
|
|
703 | if should_invalidate: | |
|
704 | _dummy_func.refresh('some-key-2') | |
|
705 | else: | |
|
706 | _dummy_func('some-key-2') | |
|
708 | cache_state_uid = invalidation_context.state_uid | |
|
709 | cache_func = cache_generator(cache_state_uid) | |
|
710 | cache_func(1, 2, 3) | |
|
707 | 711 | |
|
708 | 712 | |
|
709 | 713 | @pytest.mark.parametrize('execution_number', range(5)) |
|
710 | 714 | def test_cache_invalidation_race_condition(execution_number, baseapp): |
|
711 | import time | |
|
712 | 715 | |
|
713 | 716 | repo_id = 777 |
|
714 | 717 | |
|
715 | cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( | |
|
716 | repo_id, CacheKey.CACHE_TYPE_FEED) | |
|
717 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( | |
|
718 | repo_id=repo_id) | |
|
719 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) | |
|
718 | region = rc_cache.get_or_create_region('cache_repo_longterm') | |
|
719 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) | |
|
720 | 720 | |
|
721 | 721 | @run_test_concurrently(25) |
|
722 | 722 | def test_create_and_delete_cache_keys(): |
|
723 | 723 | time.sleep(0.2) |
|
724 | 724 | |
|
725 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) | |
|
726 | def _dummy_func(cache_key): | |
|
727 | val = 'async' | |
|
728 | return 'result:{}'.format(val) | |
|
725 | def cache_generator(_state_uid): | |
|
729 | 726 | |
|
730 | inv_context_manager = rc_cache.InvalidationContext( | |
|
731 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) | |
|
727 | @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') | |
|
728 | def _dummy_func(*args): | |
|
729 | return _state_uid, 'result:async' | |
|
730 | ||
|
731 | return _dummy_func | |
|
732 | ||
|
733 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
|
732 | 734 | |
|
733 | 735 | # 1st call, fresh caches |
|
734 | 736 | with inv_context_manager as invalidation_context: |
|
735 |
|
|
|
737 | cache_state_uid = invalidation_context.state_uid | |
|
738 | cache_func = cache_generator(cache_state_uid) | |
|
739 | previous_state_uid, result = cache_func('doo') | |
|
740 | ||
|
741 | should_invalidate = previous_state_uid != cache_state_uid | |
|
736 | 742 | if should_invalidate: |
|
737 |
_ |
|
|
738 | else: | |
|
739 | _dummy_func('some-key-3') | |
|
743 | _, result = cache_func.refresh('doo') | |
|
740 | 744 | |
|
741 | 745 | # Mark invalidation |
|
742 |
CacheKey.set_invalidate( |
|
|
746 | CacheKey.set_invalidate(repo_namespace_key) | |
|
743 | 747 | |
|
744 | 748 | test_create_and_delete_cache_keys() |
@@ -249,11 +249,9 b' class TestVCSOperations(object):' | |||
|
249 | 249 | # init cache objects |
|
250 | 250 | CacheKey.delete_all_cache() |
|
251 | 251 | cache_namespace_uid = 'cache_push_test.{}'.format(hg_repo.repo_id) |
|
252 |
|
|
|
253 | repo_id=hg_repo.repo_id) | |
|
252 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=hg_repo.repo_id) | |
|
254 | 253 | |
|
255 | inv_context_manager = rc_cache.InvalidationContext( | |
|
256 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) | |
|
254 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
|
257 | 255 | |
|
258 | 256 | with inv_context_manager as invalidation_context: |
|
259 | 257 | # __enter__ will create and register cache objects |
General Comments 0
You need to be logged in to leave comments.
Login now