caches.py
295 lines
| 10.4 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r2487 | # Copyright (C) 2015-2018 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r2814 | import functools | |||
r1 | ||||
import beaker | ||||
import logging | ||||
Martin Bornhold
|
r613 | import threading | ||
r1 | ||||
from beaker.cache import _cache_decorate, cache_regions, region_invalidate | ||||
Martin Bornhold
|
r895 | from sqlalchemy.exc import IntegrityError | ||
r1 | ||||
r2834 | from rhodecode.lib.utils import safe_str, sha1 | |||
Martin Bornhold
|
r895 | from rhodecode.model.db import Session, CacheKey | ||
r1 | ||||
log = logging.getLogger(__name__) | ||||
FILE_TREE = 'cache_file_tree' | ||||
FILE_TREE_META = 'cache_file_tree_metadata' | ||||
FILE_SEARCH_TREE_META = 'cache_file_search_metadata' | ||||
SUMMARY_STATS = 'cache_summary_stats' | ||||
# This list of caches gets purged when invalidation happens | ||||
r423 | USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META) | |||
r1 | ||||
DEFAULT_CACHE_MANAGER_CONFIG = { | ||||
'type': 'memorylru_base', | ||||
'max_items': 10240, | ||||
'key_length': 256, | ||||
'enabled': True | ||||
} | ||||
r2367 | def get_default_cache_settings(settings): | |||
cache_settings = {} | ||||
for key in settings.keys(): | ||||
for prefix in ['beaker.cache.', 'cache.']: | ||||
if key.startswith(prefix): | ||||
name = key.split(prefix)[1].strip() | ||||
cache_settings[name] = settings[key].strip() | ||||
return cache_settings | ||||
# set cache regions for beaker so celery can utilise it | ||||
def configure_caches(settings, default_region_settings=None): | ||||
cache_settings = {'regions': None} | ||||
# main cache settings used as default ... | ||||
cache_settings.update(get_default_cache_settings(settings)) | ||||
default_region_settings = default_region_settings or \ | ||||
{'type': DEFAULT_CACHE_MANAGER_CONFIG['type']} | ||||
if cache_settings['regions']: | ||||
for region in cache_settings['regions'].split(','): | ||||
region = region.strip() | ||||
region_settings = default_region_settings.copy() | ||||
for key, value in cache_settings.items(): | ||||
if key.startswith(region): | ||||
r2759 | region_settings[key.split(region + '.')[-1]] = value | |||
r2367 | log.debug('Configuring cache region `%s` with settings %s', | |||
region, region_settings) | ||||
configure_cache_region( | ||||
region, region_settings, cache_settings) | ||||
r1 | def configure_cache_region( | |||
r2367 | region_name, region_settings, default_cache_kw, default_expire=60): | |||
r1 | default_type = default_cache_kw.get('type', 'memory') | |||
default_lock_dir = default_cache_kw.get('lock_dir') | ||||
default_data_dir = default_cache_kw.get('data_dir') | ||||
r2367 | region_settings['lock_dir'] = region_settings.get('lock_dir', default_lock_dir) | |||
region_settings['data_dir'] = region_settings.get('data_dir', default_data_dir) | ||||
region_settings['type'] = region_settings.get('type', default_type) | ||||
region_settings['expire'] = int(region_settings.get('expire', default_expire)) | ||||
r1 | ||||
r2367 | beaker.cache.cache_regions[region_name] = region_settings | |||
r1 | ||||
def get_cache_manager(region_name, cache_name, custom_ttl=None): | ||||
""" | ||||
Creates a Beaker cache manager. Such instance can be used like that:: | ||||
_namespace = caches.get_repo_namespace_key(caches.XXX, repo_name) | ||||
cache_manager = caches.get_cache_manager('repo_cache_long', _namespace) | ||||
_cache_key = caches.compute_key_from_params(repo_name, commit.raw_id) | ||||
def heavy_compute(): | ||||
... | ||||
result = cache_manager.get(_cache_key, createfunc=heavy_compute) | ||||
:param region_name: region from ini file | ||||
:param cache_name: custom cache name, usually prefix+repo_name. eg | ||||
file_switcher_repo1 | ||||
:param custom_ttl: override .ini file timeout on this cache | ||||
:return: instance of cache manager | ||||
""" | ||||
cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG) | ||||
if custom_ttl: | ||||
log.debug('Updating region %s with custom ttl: %s', | ||||
region_name, custom_ttl) | ||||
cache_config.update({'expire': custom_ttl}) | ||||
return beaker.cache.Cache._get_cache(cache_name, cache_config) | ||||
def clear_cache_manager(cache_manager): | ||||
r276 | """ | |||
namespace = 'foobar' | ||||
cache_manager = get_cache_manager('repo_cache_long', namespace) | ||||
clear_cache_manager(cache_manager) | ||||
""" | ||||
r1 | log.debug('Clearing all values for cache manager %s', cache_manager) | |||
cache_manager.clear() | ||||
def clear_repo_caches(repo_name): | ||||
# invalidate cache manager for this repo | ||||
for prefix in USED_REPO_CACHES: | ||||
namespace = get_repo_namespace_key(prefix, repo_name) | ||||
cache_manager = get_cache_manager('repo_cache_long', namespace) | ||||
clear_cache_manager(cache_manager) | ||||
def compute_key_from_params(*args): | ||||
""" | ||||
Helper to compute key from given params to be used in cache manager | ||||
""" | ||||
r2834 | return sha1("_".join(map(safe_str, args))) | |||
r1 | ||||
def get_repo_namespace_key(prefix, repo_name): | ||||
return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name)) | ||||
r2814 | def conditional_cache(region, cache_namespace, condition, func): | |||
r1 | """ | |||
Conditional caching function use like:: | ||||
def _c(arg): | ||||
# heavy computation function | ||||
return data | ||||
# depending on the condition the compute is wrapped in cache or not | ||||
r2815 | compute = conditional_cache('short_term', 'cache_namespace_id', | |||
r1 | condition=True, func=func) | |||
return compute(arg) | ||||
:param region: name of cache region | ||||
r2814 | :param cache_namespace: cache namespace | |||
r1 | :param condition: condition for cache to be triggered, and | |||
return data cached | ||||
:param func: wrapped heavy function to compute | ||||
""" | ||||
wrapped = func | ||||
if condition: | ||||
log.debug('conditional_cache: True, wrapping call of ' | ||||
'func: %s into %s region cache', region, func) | ||||
r2814 | ||||
def _cache_wrap(region_name, cache_namespace): | ||||
"""Return a caching wrapper""" | ||||
def decorate(func): | ||||
@functools.wraps(func) | ||||
def cached(*args, **kwargs): | ||||
if kwargs: | ||||
raise AttributeError( | ||||
'Usage of kwargs is not allowed. ' | ||||
'Use only positional arguments in wrapped function') | ||||
manager = get_cache_manager(region_name, cache_namespace) | ||||
cache_key = compute_key_from_params(*args) | ||||
def go(): | ||||
return func(*args, **kwargs) | ||||
# save org function name | ||||
go.__name__ = '_cached_%s' % (func.__name__,) | ||||
return manager.get(cache_key, createfunc=go) | ||||
return cached | ||||
return decorate | ||||
cached_region = _cache_wrap(region, cache_namespace) | ||||
r1 | wrapped = cached_region(func) | |||
r2814 | ||||
r1 | return wrapped | |||
class ActiveRegionCache(object): | ||||
def __init__(self, context): | ||||
self.context = context | ||||
def invalidate(self, *args, **kwargs): | ||||
return False | ||||
def compute(self): | ||||
log.debug('Context cache: getting obj %s from cache', self.context) | ||||
return self.context.compute_func(self.context.cache_key) | ||||
class FreshRegionCache(ActiveRegionCache): | ||||
def invalidate(self): | ||||
log.debug('Context cache: invalidating cache for %s', self.context) | ||||
region_invalidate( | ||||
self.context.compute_func, None, self.context.cache_key) | ||||
return True | ||||
class InvalidationContext(object): | ||||
def __repr__(self): | ||||
return '<InvalidationContext:{}[{}]>'.format( | ||||
r293 | safe_str(self.repo_name), safe_str(self.cache_type)) | |||
r1 | ||||
def __init__(self, compute_func, repo_name, cache_type, | ||||
Martin Bornhold
|
r613 | raise_exception=False, thread_scoped=False): | ||
r1 | self.compute_func = compute_func | |||
self.repo_name = repo_name | ||||
self.cache_type = cache_type | ||||
self.cache_key = compute_key_from_params( | ||||
repo_name, cache_type) | ||||
self.raise_exception = raise_exception | ||||
Martin Bornhold
|
r613 | # Append the thread id to the cache key if this invalidation context | ||
# should be scoped to the current thread. | ||||
if thread_scoped: | ||||
thread_id = threading.current_thread().ident | ||||
self.cache_key = '{cache_key}_{thread_id}'.format( | ||||
cache_key=self.cache_key, thread_id=thread_id) | ||||
r1 | def get_cache_obj(self): | |||
cache_key = CacheKey.get_cache_key( | ||||
self.repo_name, self.cache_type) | ||||
cache_obj = CacheKey.get_active_cache(cache_key) | ||||
if not cache_obj: | ||||
cache_obj = CacheKey(cache_key, self.repo_name) | ||||
return cache_obj | ||||
def __enter__(self): | ||||
""" | ||||
Test if current object is valid, and return CacheRegion function | ||||
that does invalidation and calculation | ||||
""" | ||||
self.cache_obj = self.get_cache_obj() | ||||
if self.cache_obj.cache_active: | ||||
# means our cache obj is existing and marked as it's | ||||
# cache is not outdated, we return BaseInvalidator | ||||
self.skip_cache_active_change = True | ||||
return ActiveRegionCache(self) | ||||
# the key is either not existing or set to False, we return | ||||
# the real invalidator which re-computes value. We additionally set | ||||
# the flag to actually update the Database objects | ||||
self.skip_cache_active_change = False | ||||
return FreshRegionCache(self) | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
if self.skip_cache_active_change: | ||||
return | ||||
try: | ||||
self.cache_obj.cache_active = True | ||||
Session().add(self.cache_obj) | ||||
Session().commit() | ||||
except IntegrityError: | ||||
# if we catch integrity error, it means we inserted this object | ||||
# assumption is that's really an edge race-condition case and | ||||
# it's safe is to skip it | ||||
Session().rollback() | ||||
except Exception: | ||||
log.exception('Failed to commit on cache key update') | ||||
Session().rollback() | ||||
if self.raise_exception: | ||||
raise | ||||
r2367 | ||||
def includeme(config): | ||||
configure_caches(config.registry.settings) | ||||