##// END OF EJS Templates
docs: update instructions for shared exception store in cluster setup
docs: update instructions for shared exception store in cluster setup

File last commit:

r3061:a44afbe5 default
r3067:9b7d4cbb default
Show More
utils.py
322 lines | 11.3 KiB | text/x-python | PythonLexer
caches: rewrite of auth/permission caches to dogpile.
r2845 # -*- coding: utf-8 -*-
# Copyright (C) 2015-2018 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 import time
caches: rewrite of auth/permission caches to dogpile.
r2845 import logging
caches: introduce new conditional cache function.
r2891 import functools
caches: new cache context managers....
r2932 import threading
caches: introduce new conditional cache function.
r2891
from dogpile.cache import CacheRegion
from dogpile.cache.util import compat
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: new cache context managers....
r2932 import rhodecode
caches: rewrite of auth/permission caches to dogpile.
r2845 from rhodecode.lib.utils import safe_str, sha1
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
r2935 from rhodecode.lib.utils2 import safe_unicode, str2bool
caches: new cache context managers....
r2932 from rhodecode.model.db import Session, CacheKey, IntegrityError
caches: rewrite of auth/permission caches to dogpile.
r2845 from . import region_meta
log = logging.getLogger(__name__)
caches: introduce new conditional cache function.
r2891 class RhodeCodeCacheRegion(CacheRegion):
def conditional_cache_on_arguments(
self, namespace=None,
expiration_time=None,
should_cache_fn=None,
to_str=compat.string_type,
function_key_generator=None,
condition=True):
"""
Custom conditional decorator, that will not touch any dogpile internals if
condition isn't meet. This works a bit different than should_cache_fn
And it's faster in cases we don't ever want to compute cached values
"""
expiration_time_is_callable = compat.callable(expiration_time)
if function_key_generator is None:
function_key_generator = self.function_key_generator
def decorator(fn):
if to_str is compat.string_type:
# backwards compatible
key_generator = function_key_generator(namespace, fn)
else:
key_generator = function_key_generator(namespace, fn, to_str=to_str)
@functools.wraps(fn)
def decorate(*arg, **kw):
key = key_generator(*arg, **kw)
@functools.wraps(fn)
def creator():
return fn(*arg, **kw)
if not condition:
return creator()
timeout = expiration_time() if expiration_time_is_callable \
else expiration_time
return self.get_or_create(key, creator, timeout, should_cache_fn)
def invalidate(*arg, **kw):
key = key_generator(*arg, **kw)
self.delete(key)
def set_(value, *arg, **kw):
key = key_generator(*arg, **kw)
self.set(key, value)
def get(*arg, **kw):
key = key_generator(*arg, **kw)
return self.get(key)
def refresh(*arg, **kw):
key = key_generator(*arg, **kw)
value = fn(*arg, **kw)
self.set(key, value)
return value
decorate.set = set_
decorate.invalidate = invalidate
decorate.refresh = refresh
decorate.get = get
decorate.original = fn
decorate.key_generator = key_generator
return decorate
return decorator
def make_region(*arg, **kw):
return RhodeCodeCacheRegion(*arg, **kw)
caches: rewrite of auth/permission caches to dogpile.
r2845 def get_default_cache_settings(settings, prefixes=None):
prefixes = prefixes or []
cache_settings = {}
for key in settings.keys():
for prefix in prefixes:
if key.startswith(prefix):
name = key.split(prefix)[1].strip()
val = settings[key]
if isinstance(val, basestring):
val = val.strip()
cache_settings[name] = val
return cache_settings
def compute_key_from_params(*args):
"""
Helper to compute key from given params to be used in cache manager
"""
return sha1("_".join(map(safe_str, args)))
def key_generator(namespace, fn):
fname = fn.__name__
def generate_key(*args):
namespace_pref = namespace or 'default'
arg_key = compute_key_from_params(*args)
final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
return final_key
return generate_key
def get_or_create_region(region_name, region_namespace=None):
from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
region_obj = region_meta.dogpile_cache_regions.get(region_name)
if not region_obj:
raise EnvironmentError(
'Region `{}` not in configured: {}.'.format(
region_name, region_meta.dogpile_cache_regions.keys()))
region_uid_name = '{}:{}'.format(region_name, region_namespace)
if isinstance(region_obj.actual_backend, FileNamespaceBackend):
region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
if region_exist:
log.debug('Using already configured region: %s', region_namespace)
return region_exist
cache_dir = region_meta.dogpile_config_defaults['cache_dir']
expiration_time = region_obj.expiration_time
if not os.path.isdir(cache_dir):
os.makedirs(cache_dir)
new_region = make_region(
name=region_uid_name, function_key_generator=key_generator
)
namespace_filename = os.path.join(
cache_dir, "{}.cache.dbm".format(region_namespace))
# special type that allows 1db per namespace
new_region.configure(
backend='dogpile.cache.rc.file_namespace',
expiration_time=expiration_time,
arguments={"filename": namespace_filename}
)
# create and save in region caches
log.debug('configuring new region: %s',region_uid_name)
region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
return region_obj
caches: don't use beaker for file caches anymore
r2846
def clear_cache_namespace(cache_region, cache_namespace_uid):
region = get_or_create_region(cache_region, cache_namespace_uid)
cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
caches: don't try to delete keys if there aren't any to delete....
r2969 num_delete_keys = len(cache_keys)
if num_delete_keys:
region.delete_multi(cache_keys)
return num_delete_keys
caches: new cache context managers....
r2932
class ActiveRegionCache(object):
def __init__(self, context):
self.context = context
def should_invalidate(self):
return False
class FreshRegionCache(object):
def __init__(self, context):
self.context = context
def should_invalidate(self):
return True
class InvalidationContext(object):
"""
usage::
from rhodecode.lib import rc_cache
caches: store computation time inside context manager as helper. Since the with block is full...
r2936
cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
caches: new cache context managers....
r2932 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
caches: new cache context managers....
r2932 def heavy_compute(cache_name, param1, param2):
print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 # invalidation namespace is shared namespace key for all process caches
# we use it to send a global signal
invalidation_namespace = 'repo_cache:1'
caches: new cache context managers....
r2932 inv_context_manager = rc_cache.InvalidationContext(
uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
with inv_context_manager as invalidation_context:
caches: use .refresh() instead of .invalidate()...
r2939 args = ('one', 'two')
# re-compute and store cache if we get invalidate signal
caches: new cache context managers....
r2932 if invalidation_context.should_invalidate():
caches: use .refresh() instead of .invalidate()...
r2939 result = heavy_compute.refresh(*args)
else:
result = heavy_compute(*args)
caches: new cache context managers....
r2932
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 compute_time = inv_context_manager.compute_time
logging: use lazy parameter evaluation in log calls.
r3061 log.debug('result computed in %.3fs', compute_time)
caches: new cache context managers....
r2932
# To send global invalidation signal, simply run
CacheKey.set_invalidate(invalidation_namespace)
"""
def __repr__(self):
return '<InvalidationContext:{}[{}]>'.format(
safe_str(self.cache_key), safe_str(self.uid))
def __init__(self, uid, invalidation_namespace='',
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
r2935 raise_exception=False, thread_scoped=None):
caches: new cache context managers....
r2932 self.uid = uid
self.invalidation_namespace = invalidation_namespace
self.raise_exception = raise_exception
self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
self.thread_id = 'global'
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
r2935 if thread_scoped is None:
# if we set "default" we can override this via .ini settings
thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
caches: new cache context managers....
r2932 # Append the thread id to the cache key if this invalidation context
# should be scoped to the current thread.
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
r2935 if thread_scoped is True:
caches: new cache context managers....
r2932 self.thread_id = threading.current_thread().ident
self.cache_key = compute_key_from_params(uid)
self.cache_key = 'proc:{}_thread:{}_{}'.format(
self.proc_id, self.thread_id, self.cache_key)
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 self.compute_time = 0
caches: new cache context managers....
r2932
def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
cache_obj = CacheKey.get_active_cache(self.cache_key)
caches: improve logging.
r2938 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
caches: new cache context managers....
r2932 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
if not cache_obj:
cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
return cache_obj
def __enter__(self):
"""
Test if current object is valid, and return CacheRegion function
that does invalidation and calculation
"""
# register or get a new key based on uid
self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 self._start_time = time.time()
caches: new cache context managers....
r2932 if self.cache_obj.cache_active:
# means our cache obj is existing and marked as it's
# cache is not outdated, we return ActiveRegionCache
self.skip_cache_active_change = True
caches: store computation time inside context manager as helper. Since the with block is full...
r2936
caches: new cache context managers....
r2932 return ActiveRegionCache(context=self)
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 # the key is either not existing or set to False, we return
caches: new cache context managers....
r2932 # the real invalidator which re-computes value. We additionally set
# the flag to actually update the Database objects
self.skip_cache_active_change = False
return FreshRegionCache(context=self)
def __exit__(self, exc_type, exc_val, exc_tb):
caches: store computation time inside context manager as helper. Since the with block is full...
r2936 # save compute time
self.compute_time = time.time() - self._start_time
caches: new cache context managers....
r2932
if self.skip_cache_active_change:
return
try:
self.cache_obj.cache_active = True
Session().add(self.cache_obj)
Session().commit()
except IntegrityError:
# if we catch integrity error, it means we inserted this object
# assumption is that's really an edge race-condition case and
# it's safe is to skip it
Session().rollback()
except Exception:
log.exception('Failed to commit on cache key update')
Session().rollback()
if self.raise_exception:
raise