# -*- coding: utf-8 -*- # Copyright (C) 2015-2018 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import functools import beaker import logging import threading from beaker.cache import _cache_decorate, cache_regions, region_invalidate from sqlalchemy.exc import IntegrityError from rhodecode.lib.utils import safe_str, sha1 from rhodecode.model.db import Session, CacheKey log = logging.getLogger(__name__) FILE_TREE = 'cache_file_tree' FILE_TREE_META = 'cache_file_tree_metadata' FILE_SEARCH_TREE_META = 'cache_file_search_metadata' SUMMARY_STATS = 'cache_summary_stats' # This list of caches gets purged when invalidation happens USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META) DEFAULT_CACHE_MANAGER_CONFIG = { 'type': 'memorylru_base', 'max_items': 10240, 'key_length': 256, 'enabled': True } def get_default_cache_settings(settings): cache_settings = {} for key in settings.keys(): for prefix in ['beaker.cache.', 'cache.']: if key.startswith(prefix): name = key.split(prefix)[1].strip() cache_settings[name] = settings[key].strip() return cache_settings # set cache regions for beaker so celery can utilise it def configure_caches(settings, default_region_settings=None): cache_settings = {'regions': None} # main cache settings used as default ... cache_settings.update(get_default_cache_settings(settings)) default_region_settings = default_region_settings or \ {'type': DEFAULT_CACHE_MANAGER_CONFIG['type']} if cache_settings['regions']: for region in cache_settings['regions'].split(','): region = region.strip() region_settings = default_region_settings.copy() for key, value in cache_settings.items(): if key.startswith(region): region_settings[key.split(region + '.')[-1]] = value log.debug('Configuring cache region `%s` with settings %s', region, region_settings) configure_cache_region( region, region_settings, cache_settings) def configure_cache_region( region_name, region_settings, default_cache_kw, default_expire=60): default_type = default_cache_kw.get('type', 'memory') default_lock_dir = default_cache_kw.get('lock_dir') default_data_dir = default_cache_kw.get('data_dir') region_settings['lock_dir'] = region_settings.get('lock_dir', default_lock_dir) region_settings['data_dir'] = region_settings.get('data_dir', default_data_dir) region_settings['type'] = region_settings.get('type', default_type) region_settings['expire'] = int(region_settings.get('expire', default_expire)) beaker.cache.cache_regions[region_name] = region_settings def get_cache_manager(region_name, cache_name, custom_ttl=None): """ Creates a Beaker cache manager. Such instance can be used like that:: _namespace = caches.get_repo_namespace_key(caches.XXX, repo_name) cache_manager = caches.get_cache_manager('repo_cache_long', _namespace) _cache_key = caches.compute_key_from_params(repo_name, commit.raw_id) def heavy_compute(): ... result = cache_manager.get(_cache_key, createfunc=heavy_compute) :param region_name: region from ini file :param cache_name: custom cache name, usually prefix+repo_name. eg file_switcher_repo1 :param custom_ttl: override .ini file timeout on this cache :return: instance of cache manager """ cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG) if custom_ttl: log.debug('Updating region %s with custom ttl: %s', region_name, custom_ttl) cache_config.update({'expire': custom_ttl}) return beaker.cache.Cache._get_cache(cache_name, cache_config) def clear_cache_manager(cache_manager): """ namespace = 'foobar' cache_manager = get_cache_manager('repo_cache_long', namespace) clear_cache_manager(cache_manager) """ log.debug('Clearing all values for cache manager %s', cache_manager) cache_manager.clear() def clear_repo_caches(repo_name): # invalidate cache manager for this repo for prefix in USED_REPO_CACHES: namespace = get_repo_namespace_key(prefix, repo_name) cache_manager = get_cache_manager('repo_cache_long', namespace) clear_cache_manager(cache_manager) def compute_key_from_params(*args): """ Helper to compute key from given params to be used in cache manager """ return sha1("_".join(map(safe_str, args))) def get_repo_namespace_key(prefix, repo_name): return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name)) def conditional_cache(region, cache_namespace, condition, func): """ Conditional caching function use like:: def _c(arg): # heavy computation function return data # depending on the condition the compute is wrapped in cache or not compute = conditional_cache('short_term', 'cache_namespace_id', condition=True, func=func) return compute(arg) :param region: name of cache region :param cache_namespace: cache namespace :param condition: condition for cache to be triggered, and return data cached :param func: wrapped heavy function to compute """ wrapped = func if condition: log.debug('conditional_cache: True, wrapping call of ' 'func: %s into %s region cache', region, func) def _cache_wrap(region_name, cache_namespace): """Return a caching wrapper""" def decorate(func): @functools.wraps(func) def cached(*args, **kwargs): if kwargs: raise AttributeError( 'Usage of kwargs is not allowed. ' 'Use only positional arguments in wrapped function') manager = get_cache_manager(region_name, cache_namespace) cache_key = compute_key_from_params(*args) def go(): return func(*args, **kwargs) # save org function name go.__name__ = '_cached_%s' % (func.__name__,) return manager.get(cache_key, createfunc=go) return cached return decorate cached_region = _cache_wrap(region, cache_namespace) wrapped = cached_region(func) return wrapped class ActiveRegionCache(object): def __init__(self, context): self.context = context def invalidate(self, *args, **kwargs): return False def compute(self): log.debug('Context cache: getting obj %s from cache', self.context) return self.context.compute_func(self.context.cache_key) class FreshRegionCache(ActiveRegionCache): def invalidate(self): log.debug('Context cache: invalidating cache for %s', self.context) region_invalidate( self.context.compute_func, None, self.context.cache_key) return True class InvalidationContext(object): def __repr__(self): return ''.format( safe_str(self.repo_name), safe_str(self.cache_type)) def __init__(self, compute_func, repo_name, cache_type, raise_exception=False, thread_scoped=False): self.compute_func = compute_func self.repo_name = repo_name self.cache_type = cache_type self.cache_key = compute_key_from_params( repo_name, cache_type) self.raise_exception = raise_exception # Append the thread id to the cache key if this invalidation context # should be scoped to the current thread. if thread_scoped: thread_id = threading.current_thread().ident self.cache_key = '{cache_key}_{thread_id}'.format( cache_key=self.cache_key, thread_id=thread_id) def get_cache_obj(self): cache_key = CacheKey.get_cache_key( self.repo_name, self.cache_type) cache_obj = CacheKey.get_active_cache(cache_key) if not cache_obj: cache_obj = CacheKey(cache_key, self.repo_name) return cache_obj def __enter__(self): """ Test if current object is valid, and return CacheRegion function that does invalidation and calculation """ self.cache_obj = self.get_cache_obj() if self.cache_obj.cache_active: # means our cache obj is existing and marked as it's # cache is not outdated, we return BaseInvalidator self.skip_cache_active_change = True return ActiveRegionCache(self) # the key is either not existing or set to False, we return # the real invalidator which re-computes value. We additionally set # the flag to actually update the Database objects self.skip_cache_active_change = False return FreshRegionCache(self) def __exit__(self, exc_type, exc_val, exc_tb): if self.skip_cache_active_change: return try: self.cache_obj.cache_active = True Session().add(self.cache_obj) Session().commit() except IntegrityError: # if we catch integrity error, it means we inserted this object # assumption is that's really an edge race-condition case and # it's safe is to skip it Session().rollback() except Exception: log.exception('Failed to commit on cache key update') Session().rollback() if self.raise_exception: raise def includeme(config): configure_caches(config.registry.settings)