##// END OF EJS Templates
beaker: fix cache sqlalchemy options.
marcink -
r2789:1a9efc1b stable
parent child Browse files
Show More
@@ -1,269 +1,269 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 import beaker
23 23 import logging
24 24 import threading
25 25
26 26 from beaker.cache import _cache_decorate, cache_regions, region_invalidate
27 27 from sqlalchemy.exc import IntegrityError
28 28
29 29 from rhodecode.lib.utils import safe_str, md5
30 30 from rhodecode.model.db import Session, CacheKey
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34 FILE_TREE = 'cache_file_tree'
35 35 FILE_TREE_META = 'cache_file_tree_metadata'
36 36 FILE_SEARCH_TREE_META = 'cache_file_search_metadata'
37 37 SUMMARY_STATS = 'cache_summary_stats'
38 38
39 39 # This list of caches gets purged when invalidation happens
40 40 USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META)
41 41
42 42 DEFAULT_CACHE_MANAGER_CONFIG = {
43 43 'type': 'memorylru_base',
44 44 'max_items': 10240,
45 45 'key_length': 256,
46 46 'enabled': True
47 47 }
48 48
49 49
50 50 def get_default_cache_settings(settings):
51 51 cache_settings = {}
52 52 for key in settings.keys():
53 53 for prefix in ['beaker.cache.', 'cache.']:
54 54 if key.startswith(prefix):
55 55 name = key.split(prefix)[1].strip()
56 56 cache_settings[name] = settings[key].strip()
57 57 return cache_settings
58 58
59 59
60 60 # set cache regions for beaker so celery can utilise it
61 61 def configure_caches(settings, default_region_settings=None):
62 62 cache_settings = {'regions': None}
63 63 # main cache settings used as default ...
64 64 cache_settings.update(get_default_cache_settings(settings))
65 65 default_region_settings = default_region_settings or \
66 66 {'type': DEFAULT_CACHE_MANAGER_CONFIG['type']}
67 67 if cache_settings['regions']:
68 68 for region in cache_settings['regions'].split(','):
69 69 region = region.strip()
70 70 region_settings = default_region_settings.copy()
71 71 for key, value in cache_settings.items():
72 72 if key.startswith(region):
73 region_settings[key.split('.')[1]] = value
73 region_settings[key.split(region + '.')[-1]] = value
74 74 log.debug('Configuring cache region `%s` with settings %s',
75 75 region, region_settings)
76 76 configure_cache_region(
77 77 region, region_settings, cache_settings)
78 78
79 79
80 80 def configure_cache_region(
81 81 region_name, region_settings, default_cache_kw, default_expire=60):
82 82 default_type = default_cache_kw.get('type', 'memory')
83 83 default_lock_dir = default_cache_kw.get('lock_dir')
84 84 default_data_dir = default_cache_kw.get('data_dir')
85 85
86 86 region_settings['lock_dir'] = region_settings.get('lock_dir', default_lock_dir)
87 87 region_settings['data_dir'] = region_settings.get('data_dir', default_data_dir)
88 88 region_settings['type'] = region_settings.get('type', default_type)
89 89 region_settings['expire'] = int(region_settings.get('expire', default_expire))
90 90
91 91 beaker.cache.cache_regions[region_name] = region_settings
92 92
93 93
94 94 def get_cache_manager(region_name, cache_name, custom_ttl=None):
95 95 """
96 96 Creates a Beaker cache manager. Such instance can be used like that::
97 97
98 98 _namespace = caches.get_repo_namespace_key(caches.XXX, repo_name)
99 99 cache_manager = caches.get_cache_manager('repo_cache_long', _namespace)
100 100 _cache_key = caches.compute_key_from_params(repo_name, commit.raw_id)
101 101 def heavy_compute():
102 102 ...
103 103 result = cache_manager.get(_cache_key, createfunc=heavy_compute)
104 104
105 105 :param region_name: region from ini file
106 106 :param cache_name: custom cache name, usually prefix+repo_name. eg
107 107 file_switcher_repo1
108 108 :param custom_ttl: override .ini file timeout on this cache
109 109 :return: instance of cache manager
110 110 """
111 111
112 112 cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG)
113 113 if custom_ttl:
114 114 log.debug('Updating region %s with custom ttl: %s',
115 115 region_name, custom_ttl)
116 116 cache_config.update({'expire': custom_ttl})
117 117
118 118 return beaker.cache.Cache._get_cache(cache_name, cache_config)
119 119
120 120
121 121 def clear_cache_manager(cache_manager):
122 122 """
123 123 namespace = 'foobar'
124 124 cache_manager = get_cache_manager('repo_cache_long', namespace)
125 125 clear_cache_manager(cache_manager)
126 126 """
127 127
128 128 log.debug('Clearing all values for cache manager %s', cache_manager)
129 129 cache_manager.clear()
130 130
131 131
132 132 def clear_repo_caches(repo_name):
133 133 # invalidate cache manager for this repo
134 134 for prefix in USED_REPO_CACHES:
135 135 namespace = get_repo_namespace_key(prefix, repo_name)
136 136 cache_manager = get_cache_manager('repo_cache_long', namespace)
137 137 clear_cache_manager(cache_manager)
138 138
139 139
140 140 def compute_key_from_params(*args):
141 141 """
142 142 Helper to compute key from given params to be used in cache manager
143 143 """
144 144 return md5("_".join(map(safe_str, args)))
145 145
146 146
147 147 def get_repo_namespace_key(prefix, repo_name):
148 148 return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name))
149 149
150 150
151 151 def conditional_cache(region, prefix, condition, func):
152 152 """
153 153 Conditional caching function use like::
154 154 def _c(arg):
155 155 # heavy computation function
156 156 return data
157 157
158 158 # depending on the condition the compute is wrapped in cache or not
159 159 compute = conditional_cache('short_term', 'cache_desc',
160 160 condition=True, func=func)
161 161 return compute(arg)
162 162
163 163 :param region: name of cache region
164 164 :param prefix: cache region prefix
165 165 :param condition: condition for cache to be triggered, and
166 166 return data cached
167 167 :param func: wrapped heavy function to compute
168 168
169 169 """
170 170 wrapped = func
171 171 if condition:
172 172 log.debug('conditional_cache: True, wrapping call of '
173 173 'func: %s into %s region cache', region, func)
174 174 cached_region = _cache_decorate((prefix,), None, None, region)
175 175 wrapped = cached_region(func)
176 176 return wrapped
177 177
178 178
179 179 class ActiveRegionCache(object):
180 180 def __init__(self, context):
181 181 self.context = context
182 182
183 183 def invalidate(self, *args, **kwargs):
184 184 return False
185 185
186 186 def compute(self):
187 187 log.debug('Context cache: getting obj %s from cache', self.context)
188 188 return self.context.compute_func(self.context.cache_key)
189 189
190 190
191 191 class FreshRegionCache(ActiveRegionCache):
192 192 def invalidate(self):
193 193 log.debug('Context cache: invalidating cache for %s', self.context)
194 194 region_invalidate(
195 195 self.context.compute_func, None, self.context.cache_key)
196 196 return True
197 197
198 198
199 199 class InvalidationContext(object):
200 200 def __repr__(self):
201 201 return '<InvalidationContext:{}[{}]>'.format(
202 202 safe_str(self.repo_name), safe_str(self.cache_type))
203 203
204 204 def __init__(self, compute_func, repo_name, cache_type,
205 205 raise_exception=False, thread_scoped=False):
206 206 self.compute_func = compute_func
207 207 self.repo_name = repo_name
208 208 self.cache_type = cache_type
209 209 self.cache_key = compute_key_from_params(
210 210 repo_name, cache_type)
211 211 self.raise_exception = raise_exception
212 212
213 213 # Append the thread id to the cache key if this invalidation context
214 214 # should be scoped to the current thread.
215 215 if thread_scoped:
216 216 thread_id = threading.current_thread().ident
217 217 self.cache_key = '{cache_key}_{thread_id}'.format(
218 218 cache_key=self.cache_key, thread_id=thread_id)
219 219
220 220 def get_cache_obj(self):
221 221 cache_key = CacheKey.get_cache_key(
222 222 self.repo_name, self.cache_type)
223 223 cache_obj = CacheKey.get_active_cache(cache_key)
224 224 if not cache_obj:
225 225 cache_obj = CacheKey(cache_key, self.repo_name)
226 226 return cache_obj
227 227
228 228 def __enter__(self):
229 229 """
230 230 Test if current object is valid, and return CacheRegion function
231 231 that does invalidation and calculation
232 232 """
233 233
234 234 self.cache_obj = self.get_cache_obj()
235 235 if self.cache_obj.cache_active:
236 236 # means our cache obj is existing and marked as it's
237 237 # cache is not outdated, we return BaseInvalidator
238 238 self.skip_cache_active_change = True
239 239 return ActiveRegionCache(self)
240 240
241 241 # the key is either not existing or set to False, we return
242 242 # the real invalidator which re-computes value. We additionally set
243 243 # the flag to actually update the Database objects
244 244 self.skip_cache_active_change = False
245 245 return FreshRegionCache(self)
246 246
247 247 def __exit__(self, exc_type, exc_val, exc_tb):
248 248
249 249 if self.skip_cache_active_change:
250 250 return
251 251
252 252 try:
253 253 self.cache_obj.cache_active = True
254 254 Session().add(self.cache_obj)
255 255 Session().commit()
256 256 except IntegrityError:
257 257 # if we catch integrity error, it means we inserted this object
258 258 # assumption is that's really an edge race-condition case and
259 259 # it's safe is to skip it
260 260 Session().rollback()
261 261 except Exception:
262 262 log.exception('Failed to commit on cache key update')
263 263 Session().rollback()
264 264 if self.raise_exception:
265 265 raise
266 266
267 267
268 268 def includeme(config):
269 269 configure_caches(config.registry.settings)
General Comments 0
You need to be logged in to leave comments. Login now