##// END OF EJS Templates
dependencies: bumped psutils to 5.4.6
dependencies: bumped psutils to 5.4.6

File last commit:

r2815:506cc0a8 default
r2826:422e7c9b default
Show More
caches.py
295 lines | 10.4 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
release: update copyright year to 2018
r2487 # Copyright (C) 2015-2018 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
caches: use different namespaces for conditional_cache wrapper....
r2814 import functools
project: added all source files and assets
r1
import beaker
import logging
Martin Bornhold
caches: Add an argument to make the cache context thread scoped.
r613 import threading
project: added all source files and assets
r1
from beaker.cache import _cache_decorate, cache_regions, region_invalidate
Martin Bornhold
models: Remove unused imports.
r895 from sqlalchemy.exc import IntegrityError
project: added all source files and assets
r1
from rhodecode.lib.utils import safe_str, md5
Martin Bornhold
models: Remove unused imports.
r895 from rhodecode.model.db import Session, CacheKey
project: added all source files and assets
r1
log = logging.getLogger(__name__)
FILE_TREE = 'cache_file_tree'
FILE_TREE_META = 'cache_file_tree_metadata'
FILE_SEARCH_TREE_META = 'cache_file_search_metadata'
SUMMARY_STATS = 'cache_summary_stats'
# This list of caches gets purged when invalidation happens
file-browser: refactor how we load metadata for file trees....
r423 USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META)
project: added all source files and assets
r1
DEFAULT_CACHE_MANAGER_CONFIG = {
'type': 'memorylru_base',
'max_items': 10240,
'key_length': 256,
'enabled': True
}
caches: configure defaults for all defined regions....
r2367 def get_default_cache_settings(settings):
cache_settings = {}
for key in settings.keys():
for prefix in ['beaker.cache.', 'cache.']:
if key.startswith(prefix):
name = key.split(prefix)[1].strip()
cache_settings[name] = settings[key].strip()
return cache_settings
# set cache regions for beaker so celery can utilise it
def configure_caches(settings, default_region_settings=None):
cache_settings = {'regions': None}
# main cache settings used as default ...
cache_settings.update(get_default_cache_settings(settings))
default_region_settings = default_region_settings or \
{'type': DEFAULT_CACHE_MANAGER_CONFIG['type']}
if cache_settings['regions']:
for region in cache_settings['regions'].split(','):
region = region.strip()
region_settings = default_region_settings.copy()
for key, value in cache_settings.items():
if key.startswith(region):
beaker: fix cache sqlalchemy options.
r2759 region_settings[key.split(region + '.')[-1]] = value
caches: configure defaults for all defined regions....
r2367 log.debug('Configuring cache region `%s` with settings %s',
region, region_settings)
configure_cache_region(
region, region_settings, cache_settings)
project: added all source files and assets
r1 def configure_cache_region(
caches: configure defaults for all defined regions....
r2367 region_name, region_settings, default_cache_kw, default_expire=60):
project: added all source files and assets
r1 default_type = default_cache_kw.get('type', 'memory')
default_lock_dir = default_cache_kw.get('lock_dir')
default_data_dir = default_cache_kw.get('data_dir')
caches: configure defaults for all defined regions....
r2367 region_settings['lock_dir'] = region_settings.get('lock_dir', default_lock_dir)
region_settings['data_dir'] = region_settings.get('data_dir', default_data_dir)
region_settings['type'] = region_settings.get('type', default_type)
region_settings['expire'] = int(region_settings.get('expire', default_expire))
project: added all source files and assets
r1
caches: configure defaults for all defined regions....
r2367 beaker.cache.cache_regions[region_name] = region_settings
project: added all source files and assets
r1
def get_cache_manager(region_name, cache_name, custom_ttl=None):
"""
Creates a Beaker cache manager. Such instance can be used like that::
_namespace = caches.get_repo_namespace_key(caches.XXX, repo_name)
cache_manager = caches.get_cache_manager('repo_cache_long', _namespace)
_cache_key = caches.compute_key_from_params(repo_name, commit.raw_id)
def heavy_compute():
...
result = cache_manager.get(_cache_key, createfunc=heavy_compute)
:param region_name: region from ini file
:param cache_name: custom cache name, usually prefix+repo_name. eg
file_switcher_repo1
:param custom_ttl: override .ini file timeout on this cache
:return: instance of cache manager
"""
cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG)
if custom_ttl:
log.debug('Updating region %s with custom ttl: %s',
region_name, custom_ttl)
cache_config.update({'expire': custom_ttl})
return beaker.cache.Cache._get_cache(cache_name, cache_config)
def clear_cache_manager(cache_manager):
settings: use invalidate after updating settings.
r276 """
namespace = 'foobar'
cache_manager = get_cache_manager('repo_cache_long', namespace)
clear_cache_manager(cache_manager)
"""
project: added all source files and assets
r1 log.debug('Clearing all values for cache manager %s', cache_manager)
cache_manager.clear()
def clear_repo_caches(repo_name):
# invalidate cache manager for this repo
for prefix in USED_REPO_CACHES:
namespace = get_repo_namespace_key(prefix, repo_name)
cache_manager = get_cache_manager('repo_cache_long', namespace)
clear_cache_manager(cache_manager)
def compute_key_from_params(*args):
"""
Helper to compute key from given params to be used in cache manager
"""
return md5("_".join(map(safe_str, args)))
def get_repo_namespace_key(prefix, repo_name):
return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name))
caches: use different namespaces for conditional_cache wrapper....
r2814 def conditional_cache(region, cache_namespace, condition, func):
project: added all source files and assets
r1 """
Conditional caching function use like::
def _c(arg):
# heavy computation function
return data
# depending on the condition the compute is wrapped in cache or not
caches: switch other permission cache to use same cache region.
r2815 compute = conditional_cache('short_term', 'cache_namespace_id',
project: added all source files and assets
r1 condition=True, func=func)
return compute(arg)
:param region: name of cache region
caches: use different namespaces for conditional_cache wrapper....
r2814 :param cache_namespace: cache namespace
project: added all source files and assets
r1 :param condition: condition for cache to be triggered, and
return data cached
:param func: wrapped heavy function to compute
"""
wrapped = func
if condition:
log.debug('conditional_cache: True, wrapping call of '
'func: %s into %s region cache', region, func)
caches: use different namespaces for conditional_cache wrapper....
r2814
def _cache_wrap(region_name, cache_namespace):
"""Return a caching wrapper"""
def decorate(func):
@functools.wraps(func)
def cached(*args, **kwargs):
if kwargs:
raise AttributeError(
'Usage of kwargs is not allowed. '
'Use only positional arguments in wrapped function')
manager = get_cache_manager(region_name, cache_namespace)
cache_key = compute_key_from_params(*args)
def go():
return func(*args, **kwargs)
# save org function name
go.__name__ = '_cached_%s' % (func.__name__,)
return manager.get(cache_key, createfunc=go)
return cached
return decorate
cached_region = _cache_wrap(region, cache_namespace)
project: added all source files and assets
r1 wrapped = cached_region(func)
caches: use different namespaces for conditional_cache wrapper....
r2814
project: added all source files and assets
r1 return wrapped
class ActiveRegionCache(object):
def __init__(self, context):
self.context = context
def invalidate(self, *args, **kwargs):
return False
def compute(self):
log.debug('Context cache: getting obj %s from cache', self.context)
return self.context.compute_func(self.context.cache_key)
class FreshRegionCache(ActiveRegionCache):
def invalidate(self):
log.debug('Context cache: invalidating cache for %s', self.context)
region_invalidate(
self.context.compute_func, None, self.context.cache_key)
return True
class InvalidationContext(object):
def __repr__(self):
return '<InvalidationContext:{}[{}]>'.format(
caches: make sure __repr__ of invalidaiton context always return strings....
r293 safe_str(self.repo_name), safe_str(self.cache_type))
project: added all source files and assets
r1
def __init__(self, compute_func, repo_name, cache_type,
Martin Bornhold
caches: Add an argument to make the cache context thread scoped.
r613 raise_exception=False, thread_scoped=False):
project: added all source files and assets
r1 self.compute_func = compute_func
self.repo_name = repo_name
self.cache_type = cache_type
self.cache_key = compute_key_from_params(
repo_name, cache_type)
self.raise_exception = raise_exception
Martin Bornhold
caches: Add an argument to make the cache context thread scoped.
r613 # Append the thread id to the cache key if this invalidation context
# should be scoped to the current thread.
if thread_scoped:
thread_id = threading.current_thread().ident
self.cache_key = '{cache_key}_{thread_id}'.format(
cache_key=self.cache_key, thread_id=thread_id)
project: added all source files and assets
r1 def get_cache_obj(self):
cache_key = CacheKey.get_cache_key(
self.repo_name, self.cache_type)
cache_obj = CacheKey.get_active_cache(cache_key)
if not cache_obj:
cache_obj = CacheKey(cache_key, self.repo_name)
return cache_obj
def __enter__(self):
"""
Test if current object is valid, and return CacheRegion function
that does invalidation and calculation
"""
self.cache_obj = self.get_cache_obj()
if self.cache_obj.cache_active:
# means our cache obj is existing and marked as it's
# cache is not outdated, we return BaseInvalidator
self.skip_cache_active_change = True
return ActiveRegionCache(self)
# the key is either not existing or set to False, we return
# the real invalidator which re-computes value. We additionally set
# the flag to actually update the Database objects
self.skip_cache_active_change = False
return FreshRegionCache(self)
def __exit__(self, exc_type, exc_val, exc_tb):
if self.skip_cache_active_change:
return
try:
self.cache_obj.cache_active = True
Session().add(self.cache_obj)
Session().commit()
except IntegrityError:
# if we catch integrity error, it means we inserted this object
# assumption is that's really an edge race-condition case and
# it's safe is to skip it
Session().rollback()
except Exception:
log.exception('Failed to commit on cache key update')
Session().rollback()
if self.raise_exception:
raise
caches: configure defaults for all defined regions....
r2367
def includeme(config):
configure_caches(config.registry.settings)