Show More
@@ -24,7 +24,6 b' from dogpile.cache import register_backe' | |||||
24 | from . import region_meta |
|
24 | from . import region_meta | |
25 | from .utils import ( |
|
25 | from .utils import ( | |
26 | ActiveRegionCache, |
|
26 | ActiveRegionCache, | |
27 | FreshRegionCache, |
|
|||
28 | InvalidationContext, |
|
27 | InvalidationContext, | |
29 | backend_key_generator, |
|
28 | backend_key_generator, | |
30 | clear_cache_namespace, |
|
29 | clear_cache_namespace, |
@@ -1,4 +1,4 b'' | |||||
1 |
# Copyright (C) 2015-202 |
|
1 | # Copyright (C) 2015-2024 RhodeCode GmbH | |
2 | # |
|
2 | # | |
3 | # This program is free software: you can redistribute it and/or modify |
|
3 | # This program is free software: you can redistribute it and/or modify | |
4 | # it under the terms of the GNU Affero General Public License, version 3 |
|
4 | # it under the terms of the GNU Affero General Public License, version 3 | |
@@ -255,21 +255,13 b' def clear_cache_namespace(cache_region: ' | |||||
255 |
|
255 | |||
256 |
|
256 | |||
257 | class ActiveRegionCache(object): |
|
257 | class ActiveRegionCache(object): | |
258 | def __init__(self, context, cache_data): |
|
258 | def __init__(self, context, cache_data: dict): | |
259 | self.context = context |
|
259 | self.context = context | |
260 | self.cache_data = cache_data |
|
260 | self.cache_data = cache_data | |
261 |
|
261 | |||
262 | def should_invalidate(self): |
|
262 | @property | |
263 | return False |
|
263 | def state_uid(self) -> str: | |
264 |
|
264 | return self.cache_data['cache_state_uid'] | ||
265 |
|
||||
266 | class FreshRegionCache(object): |
|
|||
267 | def __init__(self, context, cache_data): |
|
|||
268 | self.context = context |
|
|||
269 | self.cache_data = cache_data |
|
|||
270 |
|
||||
271 | def should_invalidate(self): |
|
|||
272 | return True |
|
|||
273 |
|
265 | |||
274 |
|
266 | |||
275 | class InvalidationContext(object): |
|
267 | class InvalidationContext(object): | |
@@ -278,44 +270,40 b' class InvalidationContext(object):' | |||||
278 |
|
270 | |||
279 | from rhodecode.lib import rc_cache |
|
271 | from rhodecode.lib import rc_cache | |
280 |
|
272 | |||
281 | cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1) |
|
273 | repo_namespace_key = 'some-cache-for-repo-id-100' | |
282 | region = rc_cache.get_or_create_region('some_region', cache_namespace_uid) |
|
274 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
|
275 | ||||
|
276 | def cache_generator(_state_uid): | |||
283 |
|
277 | |||
284 |
@region.conditional_cache_on_arguments(namespace= |
|
278 | @region.conditional_cache_on_arguments(namespace='some-common-namespace-100') | |
285 | def heavy_compute(cache_name, param1, param2): |
|
279 | def _dummy_func(*args): | |
286 | print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2)) |
|
280 | # compute heavy function | |
287 |
|
281 | return _state_uid, 'result' | ||
288 | # invalidation namespace is shared namespace key for all process caches |
|
|||
289 | # we use it to send a global signal |
|
|||
290 | invalidation_namespace = 'repo_cache:1' |
|
|||
291 |
|
282 | |||
292 | inv_context_manager = rc_cache.InvalidationContext( |
|
283 | return _dummy_func | |
293 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) |
|
284 | ||
294 | with inv_context_manager as invalidation_context: |
|
285 | with inv_context_manager as invalidation_context: | |
295 | args = ('one', 'two') |
|
286 | cache_state_uid = invalidation_context.state_uid | |
296 | # re-compute and store cache if we get invalidate signal |
|
287 | cache_func = cache_generator(cache_state_uid) | |
297 | if invalidation_context.should_invalidate(): |
|
288 | previous_state_uid, result = cache_func(*call_args) | |
298 | result = heavy_compute.refresh(*args) |
|
|||
299 | else: |
|
|||
300 | result = heavy_compute(*args) |
|
|||
301 |
|
289 | |||
302 | compute_time = inv_context_manager.compute_time |
|
290 | should_invalidate = previous_state_uid != cache_state_uid | |
303 | log.debug('result computed in %.4fs', compute_time) |
|
291 | if should_invalidate: | |
|
292 | _, result = cache_func.refresh(*call_args) | |||
304 |
|
293 | |||
305 | # To send global invalidation signal, simply run |
|
294 | # To send global invalidation signal, simply run | |
306 |
CacheKey.set_invalidate( |
|
295 | CacheKey.set_invalidate(repo_namespace_key) | |
307 |
|
296 | |||
308 | """ |
|
297 | """ | |
309 |
|
298 | |||
310 | def __repr__(self): |
|
299 | def __repr__(self): | |
311 |
return f'<InvalidationContext:{self.cache_key} |
|
300 | return f'<InvalidationContext:{self.cache_key}>' | |
312 |
|
301 | |||
313 | def __init__(self, uid, invalidation_namespace='', |
|
302 | def __init__(self, key, raise_exception=False, thread_scoped=None): | |
314 | raise_exception=False, thread_scoped=None): |
|
303 | self.cache_key = key | |
315 | self.uid = uid |
|
304 | ||
316 | self.invalidation_namespace = invalidation_namespace |
|
|||
317 | self.raise_exception = raise_exception |
|
305 | self.raise_exception = raise_exception | |
318 |
self.proc_id = rhodecode.C |
|
306 | self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT' | |
319 | self.thread_id = 'global' |
|
307 | self.thread_id = 'global' | |
320 |
|
308 | |||
321 | if thread_scoped is None: |
|
309 | if thread_scoped is None: | |
@@ -327,77 +315,44 b' class InvalidationContext(object):' | |||||
327 | if thread_scoped is True: |
|
315 | if thread_scoped is True: | |
328 | self.thread_id = threading.current_thread().ident |
|
316 | self.thread_id = threading.current_thread().ident | |
329 |
|
317 | |||
330 | self.cache_key = compute_key_from_params(uid) |
|
318 | self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}' | |
331 | self.cache_key = 'proc:{}|thread:{}|params:{}'.format( |
|
|||
332 | self.proc_id, self.thread_id, self.cache_key) |
|
|||
333 | self.proc_key = f'proc:{self.proc_id}' |
|
|||
334 | self.compute_time = 0 |
|
319 | self.compute_time = 0 | |
335 |
|
320 | |||
336 |
def get_or_create_cache_obj(self |
|
321 | def get_or_create_cache_obj(self): | |
337 | from rhodecode.model.db import CacheKey |
|
322 | from rhodecode.model.db import CacheKey, Session, IntegrityError | |
338 |
|
323 | |||
339 | invalidation_namespace = invalidation_namespace or self.invalidation_namespace |
|
324 | cache_obj = CacheKey.get_active_cache(self.cache_key) | |
340 | # fetch all cache keys for this namespace and convert them to a map to find if we |
|
|||
341 | # have specific cache_key object registered. We do this because we want to have |
|
|||
342 | # all consistent cache_state_uid for newly registered objects |
|
|||
343 | cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace) |
|
|||
344 | cache_obj = cache_obj_map.get(self.cache_key) |
|
|||
345 | log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key) |
|
325 | log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key) | |
346 |
|
326 | |||
347 | if not cache_obj: |
|
327 | if not cache_obj: | |
348 | new_cache_args = invalidation_namespace |
|
328 | # generate new UID for non-existing cache object | |
349 | first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None |
|
329 | cache_state_uid = CacheKey.generate_new_state_uid() | |
350 | cache_state_uid = None |
|
330 | cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}', | |
351 | if first_cache_obj: |
|
331 | cache_state_uid=cache_state_uid, cache_active=True) | |
352 | cache_state_uid = first_cache_obj.cache_state_uid |
|
332 | try: | |
353 | cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args, |
|
333 | Session().add(cache_obj) | |
354 | cache_state_uid=cache_state_uid) |
|
334 | Session().commit() | |
355 |
|
335 | except IntegrityError: | ||
|
336 | # if we catch integrity error, it means we inserted this object | |||
|
337 | # assumption is that's really an edge race-condition case and | |||
|
338 | # it's safe is to skip it | |||
|
339 | Session().rollback() | |||
|
340 | except Exception: | |||
|
341 | log.exception('Failed to commit on cache key update') | |||
|
342 | Session().rollback() | |||
|
343 | if self.raise_exception: | |||
|
344 | raise | |||
356 | return cache_obj |
|
345 | return cache_obj | |
357 |
|
346 | |||
358 | def __enter__(self): |
|
347 | def __enter__(self): | |
359 | """ |
|
348 | log.debug('Entering cache invalidation check context: %s', self) | |
360 | Test if current object is valid, and return CacheRegion function |
|
|||
361 | that does invalidation and calculation |
|
|||
362 | """ |
|
|||
363 | log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace) |
|
|||
364 | # register or get a new key based on uid |
|
|||
365 | self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid) |
|
|||
366 | cache_data = self.cache_obj.get_dict() |
|
|||
367 | self._start_time = time.time() |
|
349 | self._start_time = time.time() | |
368 | if self.cache_obj.cache_active: |
|
|||
369 | # means our cache obj is existing and marked as it's |
|
|||
370 | # cache is not outdated, we return ActiveRegionCache |
|
|||
371 | self.skip_cache_active_change = True |
|
|||
372 |
|
350 | |||
373 | return ActiveRegionCache(context=self, cache_data=cache_data) |
|
351 | self.cache_obj = self.get_or_create_cache_obj() | |
|
352 | cache_data = self.cache_obj.get_dict() | |||
374 |
|
353 | |||
375 | # the key is either not existing or set to False, we return |
|
354 | return ActiveRegionCache(context=self, cache_data=cache_data) | |
376 | # the real invalidator which re-computes value. We additionally set |
|
|||
377 | # the flag to actually update the Database objects |
|
|||
378 | self.skip_cache_active_change = False |
|
|||
379 | return FreshRegionCache(context=self, cache_data=cache_data) |
|
|||
380 |
|
355 | |||
381 | def __exit__(self, exc_type, exc_val, exc_tb): |
|
356 | def __exit__(self, exc_type, exc_val, exc_tb): | |
382 | from rhodecode.model.db import IntegrityError, Session |
|
|||
383 |
|
||||
384 | # save compute time |
|
357 | # save compute time | |
385 | self.compute_time = time.time() - self._start_time |
|
358 | self.compute_time = time.time() - self._start_time | |
386 |
|
||||
387 | if self.skip_cache_active_change: |
|
|||
388 | return |
|
|||
389 |
|
||||
390 | try: |
|
|||
391 | self.cache_obj.cache_active = True |
|
|||
392 | Session().add(self.cache_obj) |
|
|||
393 | Session().commit() |
|
|||
394 | except IntegrityError: |
|
|||
395 | # if we catch integrity error, it means we inserted this object |
|
|||
396 | # assumption is that's really an edge race-condition case and |
|
|||
397 | # it's safe is to skip it |
|
|||
398 | Session().rollback() |
|
|||
399 | except Exception: |
|
|||
400 | log.exception('Failed to commit on cache key update') |
|
|||
401 | Session().rollback() |
|
|||
402 | if self.raise_exception: |
|
|||
403 | raise |
|
@@ -2034,10 +2034,9 b' class Repository(Base, BaseModel):' | |||||
2034 | """ |
|
2034 | """ | |
2035 | Returns associated cache keys for that repo |
|
2035 | Returns associated cache keys for that repo | |
2036 | """ |
|
2036 | """ | |
2037 |
|
|
2037 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id) | |
2038 | repo_id=self.repo_id) |
|
|||
2039 | return CacheKey.query()\ |
|
2038 | return CacheKey.query()\ | |
2040 |
.filter(CacheKey.cache_ |
|
2039 | .filter(CacheKey.cache_key == repo_namespace_key)\ | |
2041 | .order_by(CacheKey.cache_key)\ |
|
2040 | .order_by(CacheKey.cache_key)\ | |
2042 | .all() |
|
2041 | .all() | |
2043 |
|
2042 | |||
@@ -2609,29 +2608,38 b' class Repository(Base, BaseModel):' | |||||
2609 | from rhodecode.lib import rc_cache |
|
2608 | from rhodecode.lib import rc_cache | |
2610 |
|
2609 | |||
2611 | cache_namespace_uid = f'repo_instance.{self.repo_id}' |
|
2610 | cache_namespace_uid = f'repo_instance.{self.repo_id}' | |
2612 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( |
|
|||
2613 | repo_id=self.repo_id) |
|
|||
2614 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) |
|
2611 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) | |
2615 |
|
2612 | |||
2616 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) |
|
|||
2617 | def get_instance_cached(repo_id, context_id, _cache_state_uid): |
|
|||
2618 | return self._get_instance(repo_state_uid=_cache_state_uid) |
|
|||
2619 |
|
||||
2620 | # we must use thread scoped cache here, |
|
2613 | # we must use thread scoped cache here, | |
2621 | # because each thread of gevent needs it's own not shared connection and cache |
|
2614 | # because each thread of gevent needs it's own not shared connection and cache | |
2622 | # we also alter `args` so the cache key is individual for every green thread. |
|
2615 | # we also alter `args` so the cache key is individual for every green thread. | |
2623 | inv_context_manager = rc_cache.InvalidationContext( |
|
2616 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=self.repo_id) | |
2624 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace, |
|
2617 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key, thread_scoped=True) | |
2625 | thread_scoped=True) |
|
2618 | ||
|
2619 | # our wrapped caching function that takes state_uid to save the previous state in | |||
|
2620 | def cache_generator(_state_uid): | |||
|
2621 | ||||
|
2622 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) | |||
|
2623 | def get_instance_cached(_repo_id, _process_context_id): | |||
|
2624 | # we save in cached func the generation state so we can detect a change and invalidate caches | |||
|
2625 | return _state_uid, self._get_instance(repo_state_uid=_state_uid) | |||
|
2626 | ||||
|
2627 | return get_instance_cached | |||
|
2628 | ||||
2626 | with inv_context_manager as invalidation_context: |
|
2629 | with inv_context_manager as invalidation_context: | |
2627 |
cache_state_uid = invalidation_context. |
|
2630 | cache_state_uid = invalidation_context.state_uid | |
2628 | args = (self.repo_id, inv_context_manager.cache_key, cache_state_uid) |
|
2631 | cache_func = cache_generator(cache_state_uid) | |
2629 |
|
2632 | |||
2630 | # re-compute and store cache if we get invalidate signal |
|
2633 | args = self.repo_id, inv_context_manager.proc_key | |
2631 | if invalidation_context.should_invalidate(): |
|
2634 | ||
2632 |
|
|
2635 | previous_state_uid, instance = cache_func(*args) | |
2633 | else: |
|
2636 | ||
2634 | instance = get_instance_cached(*args) |
|
2637 | if instance: | |
|
2638 | # now compare keys, the "cache" state vs expected state. | |||
|
2639 | if previous_state_uid != cache_state_uid: | |||
|
2640 | log.warning('Cached state uid %s is different than current state uid %s', | |||
|
2641 | previous_state_uid, cache_state_uid) | |||
|
2642 | _, instance = cache_func.refresh(*args) | |||
2635 |
|
2643 | |||
2636 | log.debug('Repo instance fetched in %.4fs', inv_context_manager.compute_time) |
|
2644 | log.debug('Repo instance fetched in %.4fs', inv_context_manager.compute_time) | |
2637 | return instance |
|
2645 | return instance | |
@@ -2652,6 +2660,7 b' class Repository(Base, BaseModel):' | |||||
2652 | _vcs_alias=self.repo_type) |
|
2660 | _vcs_alias=self.repo_type) | |
2653 | if repo is not None: |
|
2661 | if repo is not None: | |
2654 | repo.count() # cache rebuild |
|
2662 | repo.count() # cache rebuild | |
|
2663 | ||||
2655 | return repo |
|
2664 | return repo | |
2656 |
|
2665 | |||
2657 | def get_shadow_repository_path(self, workspace_id): |
|
2666 | def get_shadow_repository_path(self, workspace_id): | |
@@ -3675,10 +3684,10 b' class CacheKey(Base, BaseModel):' | |||||
3675 | cache_state_uid = Column("cache_state_uid", String(255), nullable=True, unique=None, default=None) |
|
3684 | cache_state_uid = Column("cache_state_uid", String(255), nullable=True, unique=None, default=None) | |
3676 | cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False) |
|
3685 | cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False) | |
3677 |
|
3686 | |||
3678 | def __init__(self, cache_key, cache_args='', cache_state_uid=None): |
|
3687 | def __init__(self, cache_key, cache_args='', cache_state_uid=None, cache_active=False): | |
3679 | self.cache_key = cache_key |
|
3688 | self.cache_key = cache_key | |
3680 | self.cache_args = cache_args |
|
3689 | self.cache_args = cache_args | |
3681 |
self.cache_active = |
|
3690 | self.cache_active = cache_active | |
3682 | # first key should be same for all entries, since all workers should share it |
|
3691 | # first key should be same for all entries, since all workers should share it | |
3683 | self.cache_state_uid = cache_state_uid or self.generate_new_state_uid() |
|
3692 | self.cache_state_uid = cache_state_uid or self.generate_new_state_uid() | |
3684 |
|
3693 | |||
@@ -3730,18 +3739,18 b' class CacheKey(Base, BaseModel):' | |||||
3730 | """ |
|
3739 | """ | |
3731 | Mark all caches of a repo as invalid in the database. |
|
3740 | Mark all caches of a repo as invalid in the database. | |
3732 | """ |
|
3741 | """ | |
3733 |
|
||||
3734 | try: |
|
3742 | try: | |
3735 |
qry = Session().query(cls).filter(cls.cache_ |
|
3743 | qry = Session().query(cls).filter(cls.cache_key == cache_uid) | |
3736 | if delete: |
|
3744 | if delete: | |
3737 | qry.delete() |
|
3745 | qry.delete() | |
3738 | log.debug('cache objects deleted for cache args %s', |
|
3746 | log.debug('cache objects deleted for cache args %s', | |
3739 | safe_str(cache_uid)) |
|
3747 | safe_str(cache_uid)) | |
3740 | else: |
|
3748 | else: | |
3741 | qry.update({"cache_active": False, |
|
3749 | new_uid = cls.generate_new_state_uid() | |
3742 |
|
|
3750 | qry.update({"cache_state_uid": new_uid, | |
3743 | log.debug('cache objects marked as invalid for cache args %s', |
|
3751 | "cache_args": f"repo_state:{time.time()}"}) | |
3744 | safe_str(cache_uid)) |
|
3752 | log.debug('cache object %s set new UID %s', | |
|
3753 | safe_str(cache_uid), new_uid) | |||
3745 |
|
3754 | |||
3746 | Session().commit() |
|
3755 | Session().commit() | |
3747 | except Exception: |
|
3756 | except Exception: |
@@ -299,9 +299,8 b' class ScmModel(BaseModel):' | |||||
299 | repo = Repository.get_by_repo_name(repo_name) |
|
299 | repo = Repository.get_by_repo_name(repo_name) | |
300 |
|
300 | |||
301 | if repo: |
|
301 | if repo: | |
302 |
|
|
302 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo.repo_id) | |
303 | repo_id=repo.repo_id) |
|
303 | CacheKey.set_invalidate(repo_namespace_key, delete=delete) | |
304 | CacheKey.set_invalidate(invalidation_namespace, delete=delete) |
|
|||
305 |
|
304 | |||
306 | repo_id = repo.repo_id |
|
305 | repo_id = repo.repo_id | |
307 | config = repo._config |
|
306 | config = repo._config |
@@ -1,5 +1,4 b'' | |||||
1 |
|
1 | # Copyright (C) 2010-2024 RhodeCode GmbH | ||
2 | # Copyright (C) 2010-2023 RhodeCode GmbH |
|
|||
3 | # |
|
2 | # | |
4 | # This program is free software: you can redistribute it and/or modify |
|
3 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License, version 3 |
|
4 | # it under the terms of the GNU Affero General Public License, version 3 | |
@@ -27,6 +26,7 b' import string' | |||||
27 | import mock |
|
26 | import mock | |
28 | import pytest |
|
27 | import pytest | |
29 | import functools |
|
28 | import functools | |
|
29 | import time | |||
30 |
|
30 | |||
31 | from rhodecode.tests import no_newline_id_generator |
|
31 | from rhodecode.tests import no_newline_id_generator | |
32 | from rhodecode.tests.utils import run_test_concurrently |
|
32 | from rhodecode.tests.utils import run_test_concurrently | |
@@ -619,126 +619,130 b' def test_get_repo_by_id(test, expected):' | |||||
619 |
|
619 | |||
620 | def test_invalidation_context(baseapp): |
|
620 | def test_invalidation_context(baseapp): | |
621 | repo_id = 9999 |
|
621 | repo_id = 9999 | |
|
622 | calls = [1, 2] | |||
|
623 | call_args = ('some-key',) | |||
|
624 | region = rc_cache.get_or_create_region('cache_repo_longterm') | |||
622 |
|
625 | |||
623 | cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( |
|
626 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) | |
624 | repo_id, CacheKey.CACHE_TYPE_FEED) |
|
627 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
625 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( |
|
|||
626 | repo_id=repo_id) |
|
|||
627 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) |
|
|||
628 |
|
628 | |||
629 | calls = [1, 2] |
|
629 | def cache_generator(_state_uid): | |
630 |
|
630 | |||
631 |
@region.conditional_cache_on_arguments(namespace= |
|
631 | @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') | |
632 |
def _dummy_func( |
|
632 | def _dummy_func(*args): | |
633 | val = calls.pop(0) |
|
633 | val = calls.pop(0) | |
634 |
return 'result:{}' |
|
634 | return _state_uid, f'result:{val}' | |
635 |
|
635 | |||
636 | inv_context_manager = rc_cache.InvalidationContext( |
|
636 | return _dummy_func | |
637 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) |
|
|||
638 |
|
637 | |||
639 | # 1st call, fresh caches |
|
638 | # 1st call, fresh caches | |
640 | with inv_context_manager as invalidation_context: |
|
639 | with inv_context_manager as invalidation_context: | |
641 |
|
|
640 | cache_state_uid = invalidation_context.state_uid | |
|
641 | cache_func = cache_generator(cache_state_uid) | |||
|
642 | previous_state_uid, result = cache_func(*call_args) | |||
|
643 | ||||
|
644 | should_invalidate = previous_state_uid != cache_state_uid | |||
642 | if should_invalidate: |
|
645 | if should_invalidate: | |
643 |
result = |
|
646 | _, result = cache_func.refresh(*call_args) | |
644 | else: |
|
|||
645 | result = _dummy_func('some-key') |
|
|||
646 |
|
647 | |||
647 | assert isinstance(invalidation_context, rc_cache.FreshRegionCache) |
|
648 | assert should_invalidate is False # 1st call, we don't need to invalidate | |
648 | assert should_invalidate is True |
|
|||
649 |
|
649 | |||
650 | assert 'result:1' == result |
|
650 | assert 'result:1' == result | |
651 |
# should be cached so calling it twice will give the same result |
|
651 | # should be already cached so calling it twice will give the same result! | |
652 |
result = |
|
652 | _, result = cache_func(*call_args) | |
653 | assert 'result:1' == result |
|
653 | assert 'result:1' == result | |
654 |
|
654 | |||
655 | # 2nd call, we create a new context manager, this should be now key aware, and |
|
655 | # 2nd call, we create a new context manager, this should be now key aware, and | |
656 | # return an active cache region |
|
656 | # return an active cache region from DB based on the same uid | |
657 | with inv_context_manager as invalidation_context: |
|
657 | with inv_context_manager as invalidation_context: | |
658 |
|
|
658 | cache_state_uid = invalidation_context.state_uid | |
659 | assert isinstance(invalidation_context, rc_cache.ActiveRegionCache) |
|
659 | cache_func = cache_generator(cache_state_uid) | |
660 | assert should_invalidate is False |
|
660 | previous_state_uid, result = cache_func(*call_args) | |
|
661 | ||||
|
662 | should_invalidate = previous_state_uid != cache_state_uid | |||
|
663 | if should_invalidate: | |||
|
664 | _, result = cache_func.refresh(*call_args) | |||
|
665 | ||||
|
666 | assert should_invalidate is False # 1st call, we don't need to invalidate | |||
661 |
|
667 | |||
662 | # Mark invalidation |
|
668 | # Mark invalidation | |
663 |
CacheKey.set_invalidate( |
|
669 | CacheKey.set_invalidate(repo_namespace_key) | |
664 |
|
670 | |||
665 | # 3nd call, fresh caches |
|
671 | # 3nd call, fresh caches | |
666 | with inv_context_manager as invalidation_context: |
|
672 | with inv_context_manager as invalidation_context: | |
667 |
|
|
673 | cache_state_uid = invalidation_context.state_uid | |
|
674 | cache_func = cache_generator(cache_state_uid) | |||
|
675 | previous_state_uid, result = cache_func(*call_args) | |||
|
676 | ||||
|
677 | should_invalidate = previous_state_uid != cache_state_uid | |||
668 | if should_invalidate: |
|
678 | if should_invalidate: | |
669 |
result = |
|
679 | _, result = cache_func.refresh(*call_args) | |
670 | else: |
|
|||
671 | result = _dummy_func('some-key') |
|
|||
672 |
|
680 | |||
673 | assert isinstance(invalidation_context, rc_cache.FreshRegionCache) |
|
|||
674 | assert should_invalidate is True |
|
681 | assert should_invalidate is True | |
675 |
|
682 | |||
676 | assert 'result:2' == result |
|
683 | assert 'result:2' == result | |
677 |
|
684 | |||
678 | # cached again, same result |
|
685 | # cached again, same result | |
679 |
result = |
|
686 | _, result = cache_func(*call_args) | |
680 | assert 'result:2' == result |
|
687 | assert 'result:2' == result | |
681 |
|
688 | |||
682 |
|
689 | |||
683 | def test_invalidation_context_exception_in_compute(baseapp): |
|
690 | def test_invalidation_context_exception_in_compute(baseapp): | |
684 | repo_id = 888 |
|
691 | repo_id = 888 | |
|
692 | region = rc_cache.get_or_create_region('cache_repo_longterm') | |||
685 |
|
693 | |||
686 | cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( |
|
694 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) | |
687 | repo_id, CacheKey.CACHE_TYPE_FEED) |
|
695 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
688 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( |
|
|||
689 | repo_id=repo_id) |
|
|||
690 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) |
|
|||
691 |
|
696 | |||
692 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) |
|
697 | def cache_generator(_state_uid): | |
693 | def _dummy_func(cache_key): |
|
698 | @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') | |
694 | raise Exception('Error in cache func') |
|
699 | def _dummy_func(*args): | |
|
700 | raise Exception('Error in cache func') | |||
|
701 | ||||
|
702 | return _dummy_func | |||
695 |
|
703 | |||
696 | with pytest.raises(Exception): |
|
704 | with pytest.raises(Exception): | |
697 | inv_context_manager = rc_cache.InvalidationContext( |
|
|||
698 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) |
|
|||
699 |
|
705 | |||
700 | # 1st call, fresh caches |
|
706 | # 1st call, fresh caches | |
701 | with inv_context_manager as invalidation_context: |
|
707 | with inv_context_manager as invalidation_context: | |
702 |
|
|
708 | cache_state_uid = invalidation_context.state_uid | |
703 | if should_invalidate: |
|
709 | cache_func = cache_generator(cache_state_uid) | |
704 | _dummy_func.refresh('some-key-2') |
|
710 | cache_func(1, 2, 3) | |
705 | else: |
|
|||
706 | _dummy_func('some-key-2') |
|
|||
707 |
|
711 | |||
708 |
|
712 | |||
709 | @pytest.mark.parametrize('execution_number', range(5)) |
|
713 | @pytest.mark.parametrize('execution_number', range(5)) | |
710 | def test_cache_invalidation_race_condition(execution_number, baseapp): |
|
714 | def test_cache_invalidation_race_condition(execution_number, baseapp): | |
711 | import time |
|
|||
712 |
|
715 | |||
713 | repo_id = 777 |
|
716 | repo_id = 777 | |
714 |
|
717 | |||
715 | cache_namespace_uid = 'cache_repo_instance.{}_{}'.format( |
|
718 | region = rc_cache.get_or_create_region('cache_repo_longterm') | |
716 | repo_id, CacheKey.CACHE_TYPE_FEED) |
|
719 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=repo_id) | |
717 | invalidation_namespace = CacheKey.REPO_INVALIDATION_NAMESPACE.format( |
|
|||
718 | repo_id=repo_id) |
|
|||
719 | region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid) |
|
|||
720 |
|
720 | |||
721 | @run_test_concurrently(25) |
|
721 | @run_test_concurrently(25) | |
722 | def test_create_and_delete_cache_keys(): |
|
722 | def test_create_and_delete_cache_keys(): | |
723 | time.sleep(0.2) |
|
723 | time.sleep(0.2) | |
724 |
|
724 | |||
725 | @region.conditional_cache_on_arguments(namespace=cache_namespace_uid) |
|
725 | def cache_generator(_state_uid): | |
726 | def _dummy_func(cache_key): |
|
|||
727 | val = 'async' |
|
|||
728 | return 'result:{}'.format(val) |
|
|||
729 |
|
726 | |||
730 | inv_context_manager = rc_cache.InvalidationContext( |
|
727 | @region.conditional_cache_on_arguments(namespace=f'some-common-namespace-{repo_id}') | |
731 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) |
|
728 | def _dummy_func(*args): | |
|
729 | return _state_uid, 'result:async' | |||
|
730 | ||||
|
731 | return _dummy_func | |||
|
732 | ||||
|
733 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |||
732 |
|
734 | |||
733 | # 1st call, fresh caches |
|
735 | # 1st call, fresh caches | |
734 | with inv_context_manager as invalidation_context: |
|
736 | with inv_context_manager as invalidation_context: | |
735 |
|
|
737 | cache_state_uid = invalidation_context.state_uid | |
|
738 | cache_func = cache_generator(cache_state_uid) | |||
|
739 | previous_state_uid, result = cache_func('doo') | |||
|
740 | ||||
|
741 | should_invalidate = previous_state_uid != cache_state_uid | |||
736 | if should_invalidate: |
|
742 | if should_invalidate: | |
737 |
_ |
|
743 | _, result = cache_func.refresh('doo') | |
738 | else: |
|
|||
739 | _dummy_func('some-key-3') |
|
|||
740 |
|
744 | |||
741 | # Mark invalidation |
|
745 | # Mark invalidation | |
742 |
CacheKey.set_invalidate( |
|
746 | CacheKey.set_invalidate(repo_namespace_key) | |
743 |
|
747 | |||
744 | test_create_and_delete_cache_keys() |
|
748 | test_create_and_delete_cache_keys() |
@@ -249,11 +249,9 b' class TestVCSOperations(object):' | |||||
249 | # init cache objects |
|
249 | # init cache objects | |
250 | CacheKey.delete_all_cache() |
|
250 | CacheKey.delete_all_cache() | |
251 | cache_namespace_uid = 'cache_push_test.{}'.format(hg_repo.repo_id) |
|
251 | cache_namespace_uid = 'cache_push_test.{}'.format(hg_repo.repo_id) | |
252 |
|
|
252 | repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=hg_repo.repo_id) | |
253 | repo_id=hg_repo.repo_id) |
|
|||
254 |
|
253 | |||
255 | inv_context_manager = rc_cache.InvalidationContext( |
|
254 | inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key) | |
256 | uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace) |
|
|||
257 |
|
255 | |||
258 | with inv_context_manager as invalidation_context: |
|
256 | with inv_context_manager as invalidation_context: | |
259 | # __enter__ will create and register cache objects |
|
257 | # __enter__ will create and register cache objects |
General Comments 0
You need to be logged in to leave comments.
Login now