##// END OF EJS Templates
beaker: fix cache sqlalchemy options.
marcink -
r2789:1a9efc1b stable
parent child Browse files
Show More
@@ -1,269 +1,269 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21
21
22 import beaker
22 import beaker
23 import logging
23 import logging
24 import threading
24 import threading
25
25
26 from beaker.cache import _cache_decorate, cache_regions, region_invalidate
26 from beaker.cache import _cache_decorate, cache_regions, region_invalidate
27 from sqlalchemy.exc import IntegrityError
27 from sqlalchemy.exc import IntegrityError
28
28
29 from rhodecode.lib.utils import safe_str, md5
29 from rhodecode.lib.utils import safe_str, md5
30 from rhodecode.model.db import Session, CacheKey
30 from rhodecode.model.db import Session, CacheKey
31
31
32 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
33
33
34 FILE_TREE = 'cache_file_tree'
34 FILE_TREE = 'cache_file_tree'
35 FILE_TREE_META = 'cache_file_tree_metadata'
35 FILE_TREE_META = 'cache_file_tree_metadata'
36 FILE_SEARCH_TREE_META = 'cache_file_search_metadata'
36 FILE_SEARCH_TREE_META = 'cache_file_search_metadata'
37 SUMMARY_STATS = 'cache_summary_stats'
37 SUMMARY_STATS = 'cache_summary_stats'
38
38
39 # This list of caches gets purged when invalidation happens
39 # This list of caches gets purged when invalidation happens
40 USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META)
40 USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META)
41
41
42 DEFAULT_CACHE_MANAGER_CONFIG = {
42 DEFAULT_CACHE_MANAGER_CONFIG = {
43 'type': 'memorylru_base',
43 'type': 'memorylru_base',
44 'max_items': 10240,
44 'max_items': 10240,
45 'key_length': 256,
45 'key_length': 256,
46 'enabled': True
46 'enabled': True
47 }
47 }
48
48
49
49
50 def get_default_cache_settings(settings):
50 def get_default_cache_settings(settings):
51 cache_settings = {}
51 cache_settings = {}
52 for key in settings.keys():
52 for key in settings.keys():
53 for prefix in ['beaker.cache.', 'cache.']:
53 for prefix in ['beaker.cache.', 'cache.']:
54 if key.startswith(prefix):
54 if key.startswith(prefix):
55 name = key.split(prefix)[1].strip()
55 name = key.split(prefix)[1].strip()
56 cache_settings[name] = settings[key].strip()
56 cache_settings[name] = settings[key].strip()
57 return cache_settings
57 return cache_settings
58
58
59
59
60 # set cache regions for beaker so celery can utilise it
60 # set cache regions for beaker so celery can utilise it
61 def configure_caches(settings, default_region_settings=None):
61 def configure_caches(settings, default_region_settings=None):
62 cache_settings = {'regions': None}
62 cache_settings = {'regions': None}
63 # main cache settings used as default ...
63 # main cache settings used as default ...
64 cache_settings.update(get_default_cache_settings(settings))
64 cache_settings.update(get_default_cache_settings(settings))
65 default_region_settings = default_region_settings or \
65 default_region_settings = default_region_settings or \
66 {'type': DEFAULT_CACHE_MANAGER_CONFIG['type']}
66 {'type': DEFAULT_CACHE_MANAGER_CONFIG['type']}
67 if cache_settings['regions']:
67 if cache_settings['regions']:
68 for region in cache_settings['regions'].split(','):
68 for region in cache_settings['regions'].split(','):
69 region = region.strip()
69 region = region.strip()
70 region_settings = default_region_settings.copy()
70 region_settings = default_region_settings.copy()
71 for key, value in cache_settings.items():
71 for key, value in cache_settings.items():
72 if key.startswith(region):
72 if key.startswith(region):
73 region_settings[key.split('.')[1]] = value
73 region_settings[key.split(region + '.')[-1]] = value
74 log.debug('Configuring cache region `%s` with settings %s',
74 log.debug('Configuring cache region `%s` with settings %s',
75 region, region_settings)
75 region, region_settings)
76 configure_cache_region(
76 configure_cache_region(
77 region, region_settings, cache_settings)
77 region, region_settings, cache_settings)
78
78
79
79
80 def configure_cache_region(
80 def configure_cache_region(
81 region_name, region_settings, default_cache_kw, default_expire=60):
81 region_name, region_settings, default_cache_kw, default_expire=60):
82 default_type = default_cache_kw.get('type', 'memory')
82 default_type = default_cache_kw.get('type', 'memory')
83 default_lock_dir = default_cache_kw.get('lock_dir')
83 default_lock_dir = default_cache_kw.get('lock_dir')
84 default_data_dir = default_cache_kw.get('data_dir')
84 default_data_dir = default_cache_kw.get('data_dir')
85
85
86 region_settings['lock_dir'] = region_settings.get('lock_dir', default_lock_dir)
86 region_settings['lock_dir'] = region_settings.get('lock_dir', default_lock_dir)
87 region_settings['data_dir'] = region_settings.get('data_dir', default_data_dir)
87 region_settings['data_dir'] = region_settings.get('data_dir', default_data_dir)
88 region_settings['type'] = region_settings.get('type', default_type)
88 region_settings['type'] = region_settings.get('type', default_type)
89 region_settings['expire'] = int(region_settings.get('expire', default_expire))
89 region_settings['expire'] = int(region_settings.get('expire', default_expire))
90
90
91 beaker.cache.cache_regions[region_name] = region_settings
91 beaker.cache.cache_regions[region_name] = region_settings
92
92
93
93
94 def get_cache_manager(region_name, cache_name, custom_ttl=None):
94 def get_cache_manager(region_name, cache_name, custom_ttl=None):
95 """
95 """
96 Creates a Beaker cache manager. Such instance can be used like that::
96 Creates a Beaker cache manager. Such instance can be used like that::
97
97
98 _namespace = caches.get_repo_namespace_key(caches.XXX, repo_name)
98 _namespace = caches.get_repo_namespace_key(caches.XXX, repo_name)
99 cache_manager = caches.get_cache_manager('repo_cache_long', _namespace)
99 cache_manager = caches.get_cache_manager('repo_cache_long', _namespace)
100 _cache_key = caches.compute_key_from_params(repo_name, commit.raw_id)
100 _cache_key = caches.compute_key_from_params(repo_name, commit.raw_id)
101 def heavy_compute():
101 def heavy_compute():
102 ...
102 ...
103 result = cache_manager.get(_cache_key, createfunc=heavy_compute)
103 result = cache_manager.get(_cache_key, createfunc=heavy_compute)
104
104
105 :param region_name: region from ini file
105 :param region_name: region from ini file
106 :param cache_name: custom cache name, usually prefix+repo_name. eg
106 :param cache_name: custom cache name, usually prefix+repo_name. eg
107 file_switcher_repo1
107 file_switcher_repo1
108 :param custom_ttl: override .ini file timeout on this cache
108 :param custom_ttl: override .ini file timeout on this cache
109 :return: instance of cache manager
109 :return: instance of cache manager
110 """
110 """
111
111
112 cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG)
112 cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG)
113 if custom_ttl:
113 if custom_ttl:
114 log.debug('Updating region %s with custom ttl: %s',
114 log.debug('Updating region %s with custom ttl: %s',
115 region_name, custom_ttl)
115 region_name, custom_ttl)
116 cache_config.update({'expire': custom_ttl})
116 cache_config.update({'expire': custom_ttl})
117
117
118 return beaker.cache.Cache._get_cache(cache_name, cache_config)
118 return beaker.cache.Cache._get_cache(cache_name, cache_config)
119
119
120
120
121 def clear_cache_manager(cache_manager):
121 def clear_cache_manager(cache_manager):
122 """
122 """
123 namespace = 'foobar'
123 namespace = 'foobar'
124 cache_manager = get_cache_manager('repo_cache_long', namespace)
124 cache_manager = get_cache_manager('repo_cache_long', namespace)
125 clear_cache_manager(cache_manager)
125 clear_cache_manager(cache_manager)
126 """
126 """
127
127
128 log.debug('Clearing all values for cache manager %s', cache_manager)
128 log.debug('Clearing all values for cache manager %s', cache_manager)
129 cache_manager.clear()
129 cache_manager.clear()
130
130
131
131
132 def clear_repo_caches(repo_name):
132 def clear_repo_caches(repo_name):
133 # invalidate cache manager for this repo
133 # invalidate cache manager for this repo
134 for prefix in USED_REPO_CACHES:
134 for prefix in USED_REPO_CACHES:
135 namespace = get_repo_namespace_key(prefix, repo_name)
135 namespace = get_repo_namespace_key(prefix, repo_name)
136 cache_manager = get_cache_manager('repo_cache_long', namespace)
136 cache_manager = get_cache_manager('repo_cache_long', namespace)
137 clear_cache_manager(cache_manager)
137 clear_cache_manager(cache_manager)
138
138
139
139
140 def compute_key_from_params(*args):
140 def compute_key_from_params(*args):
141 """
141 """
142 Helper to compute key from given params to be used in cache manager
142 Helper to compute key from given params to be used in cache manager
143 """
143 """
144 return md5("_".join(map(safe_str, args)))
144 return md5("_".join(map(safe_str, args)))
145
145
146
146
147 def get_repo_namespace_key(prefix, repo_name):
147 def get_repo_namespace_key(prefix, repo_name):
148 return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name))
148 return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name))
149
149
150
150
151 def conditional_cache(region, prefix, condition, func):
151 def conditional_cache(region, prefix, condition, func):
152 """
152 """
153 Conditional caching function use like::
153 Conditional caching function use like::
154 def _c(arg):
154 def _c(arg):
155 # heavy computation function
155 # heavy computation function
156 return data
156 return data
157
157
158 # depending on the condition the compute is wrapped in cache or not
158 # depending on the condition the compute is wrapped in cache or not
159 compute = conditional_cache('short_term', 'cache_desc',
159 compute = conditional_cache('short_term', 'cache_desc',
160 condition=True, func=func)
160 condition=True, func=func)
161 return compute(arg)
161 return compute(arg)
162
162
163 :param region: name of cache region
163 :param region: name of cache region
164 :param prefix: cache region prefix
164 :param prefix: cache region prefix
165 :param condition: condition for cache to be triggered, and
165 :param condition: condition for cache to be triggered, and
166 return data cached
166 return data cached
167 :param func: wrapped heavy function to compute
167 :param func: wrapped heavy function to compute
168
168
169 """
169 """
170 wrapped = func
170 wrapped = func
171 if condition:
171 if condition:
172 log.debug('conditional_cache: True, wrapping call of '
172 log.debug('conditional_cache: True, wrapping call of '
173 'func: %s into %s region cache', region, func)
173 'func: %s into %s region cache', region, func)
174 cached_region = _cache_decorate((prefix,), None, None, region)
174 cached_region = _cache_decorate((prefix,), None, None, region)
175 wrapped = cached_region(func)
175 wrapped = cached_region(func)
176 return wrapped
176 return wrapped
177
177
178
178
179 class ActiveRegionCache(object):
179 class ActiveRegionCache(object):
180 def __init__(self, context):
180 def __init__(self, context):
181 self.context = context
181 self.context = context
182
182
183 def invalidate(self, *args, **kwargs):
183 def invalidate(self, *args, **kwargs):
184 return False
184 return False
185
185
186 def compute(self):
186 def compute(self):
187 log.debug('Context cache: getting obj %s from cache', self.context)
187 log.debug('Context cache: getting obj %s from cache', self.context)
188 return self.context.compute_func(self.context.cache_key)
188 return self.context.compute_func(self.context.cache_key)
189
189
190
190
191 class FreshRegionCache(ActiveRegionCache):
191 class FreshRegionCache(ActiveRegionCache):
192 def invalidate(self):
192 def invalidate(self):
193 log.debug('Context cache: invalidating cache for %s', self.context)
193 log.debug('Context cache: invalidating cache for %s', self.context)
194 region_invalidate(
194 region_invalidate(
195 self.context.compute_func, None, self.context.cache_key)
195 self.context.compute_func, None, self.context.cache_key)
196 return True
196 return True
197
197
198
198
199 class InvalidationContext(object):
199 class InvalidationContext(object):
200 def __repr__(self):
200 def __repr__(self):
201 return '<InvalidationContext:{}[{}]>'.format(
201 return '<InvalidationContext:{}[{}]>'.format(
202 safe_str(self.repo_name), safe_str(self.cache_type))
202 safe_str(self.repo_name), safe_str(self.cache_type))
203
203
204 def __init__(self, compute_func, repo_name, cache_type,
204 def __init__(self, compute_func, repo_name, cache_type,
205 raise_exception=False, thread_scoped=False):
205 raise_exception=False, thread_scoped=False):
206 self.compute_func = compute_func
206 self.compute_func = compute_func
207 self.repo_name = repo_name
207 self.repo_name = repo_name
208 self.cache_type = cache_type
208 self.cache_type = cache_type
209 self.cache_key = compute_key_from_params(
209 self.cache_key = compute_key_from_params(
210 repo_name, cache_type)
210 repo_name, cache_type)
211 self.raise_exception = raise_exception
211 self.raise_exception = raise_exception
212
212
213 # Append the thread id to the cache key if this invalidation context
213 # Append the thread id to the cache key if this invalidation context
214 # should be scoped to the current thread.
214 # should be scoped to the current thread.
215 if thread_scoped:
215 if thread_scoped:
216 thread_id = threading.current_thread().ident
216 thread_id = threading.current_thread().ident
217 self.cache_key = '{cache_key}_{thread_id}'.format(
217 self.cache_key = '{cache_key}_{thread_id}'.format(
218 cache_key=self.cache_key, thread_id=thread_id)
218 cache_key=self.cache_key, thread_id=thread_id)
219
219
220 def get_cache_obj(self):
220 def get_cache_obj(self):
221 cache_key = CacheKey.get_cache_key(
221 cache_key = CacheKey.get_cache_key(
222 self.repo_name, self.cache_type)
222 self.repo_name, self.cache_type)
223 cache_obj = CacheKey.get_active_cache(cache_key)
223 cache_obj = CacheKey.get_active_cache(cache_key)
224 if not cache_obj:
224 if not cache_obj:
225 cache_obj = CacheKey(cache_key, self.repo_name)
225 cache_obj = CacheKey(cache_key, self.repo_name)
226 return cache_obj
226 return cache_obj
227
227
228 def __enter__(self):
228 def __enter__(self):
229 """
229 """
230 Test if current object is valid, and return CacheRegion function
230 Test if current object is valid, and return CacheRegion function
231 that does invalidation and calculation
231 that does invalidation and calculation
232 """
232 """
233
233
234 self.cache_obj = self.get_cache_obj()
234 self.cache_obj = self.get_cache_obj()
235 if self.cache_obj.cache_active:
235 if self.cache_obj.cache_active:
236 # means our cache obj is existing and marked as it's
236 # means our cache obj is existing and marked as it's
237 # cache is not outdated, we return BaseInvalidator
237 # cache is not outdated, we return BaseInvalidator
238 self.skip_cache_active_change = True
238 self.skip_cache_active_change = True
239 return ActiveRegionCache(self)
239 return ActiveRegionCache(self)
240
240
241 # the key is either not existing or set to False, we return
241 # the key is either not existing or set to False, we return
242 # the real invalidator which re-computes value. We additionally set
242 # the real invalidator which re-computes value. We additionally set
243 # the flag to actually update the Database objects
243 # the flag to actually update the Database objects
244 self.skip_cache_active_change = False
244 self.skip_cache_active_change = False
245 return FreshRegionCache(self)
245 return FreshRegionCache(self)
246
246
247 def __exit__(self, exc_type, exc_val, exc_tb):
247 def __exit__(self, exc_type, exc_val, exc_tb):
248
248
249 if self.skip_cache_active_change:
249 if self.skip_cache_active_change:
250 return
250 return
251
251
252 try:
252 try:
253 self.cache_obj.cache_active = True
253 self.cache_obj.cache_active = True
254 Session().add(self.cache_obj)
254 Session().add(self.cache_obj)
255 Session().commit()
255 Session().commit()
256 except IntegrityError:
256 except IntegrityError:
257 # if we catch integrity error, it means we inserted this object
257 # if we catch integrity error, it means we inserted this object
258 # assumption is that's really an edge race-condition case and
258 # assumption is that's really an edge race-condition case and
259 # it's safe is to skip it
259 # it's safe is to skip it
260 Session().rollback()
260 Session().rollback()
261 except Exception:
261 except Exception:
262 log.exception('Failed to commit on cache key update')
262 log.exception('Failed to commit on cache key update')
263 Session().rollback()
263 Session().rollback()
264 if self.raise_exception:
264 if self.raise_exception:
265 raise
265 raise
266
266
267
267
268 def includeme(config):
268 def includeme(config):
269 configure_caches(config.registry.settings)
269 configure_caches(config.registry.settings)
General Comments 0
You need to be logged in to leave comments. Login now