##// END OF EJS Templates
cache: updated cache decorators based on latest code from dogpile
marcink -
r3863:a4fb81bb default
parent child Browse files
Show More
@@ -1,348 +1,352 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 from decorator import decorate
24 25 import threading
25 26
26 27 from dogpile.cache import CacheRegion
27 28 from dogpile.cache.util import compat
28 29
29 30 import rhodecode
30 31 from rhodecode.lib.utils import safe_str, sha1
31 32 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 33 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 34
34 35 from . import region_meta
35 36
36 37 log = logging.getLogger(__name__)
37 38
38 39
39 40 class RhodeCodeCacheRegion(CacheRegion):
40 41
41 42 def conditional_cache_on_arguments(
42 43 self, namespace=None,
43 44 expiration_time=None,
44 45 should_cache_fn=None,
45 46 to_str=compat.string_type,
46 47 function_key_generator=None,
47 48 condition=True):
48 49 """
49 50 Custom conditional decorator, that will not touch any dogpile internals if
50 51 condition isn't meet. This works a bit different than should_cache_fn
51 52 And it's faster in cases we don't ever want to compute cached values
52 53 """
53 54 expiration_time_is_callable = compat.callable(expiration_time)
54 55
55 56 if function_key_generator is None:
56 57 function_key_generator = self.function_key_generator
57 58
58 def decorator(fn):
59 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
60
61 if not condition:
62 log.debug('Calling un-cached func:%s', user_func)
63 return user_func(*arg, **kw)
64
65 key = key_generator(*arg, **kw)
66
67 timeout = expiration_time() if expiration_time_is_callable \
68 else expiration_time
69
70 log.debug('Calling cached fn:%s', user_func)
71 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
72
73 def cache_decorator(user_func):
59 74 if to_str is compat.string_type:
60 75 # backwards compatible
61 key_generator = function_key_generator(namespace, fn)
76 key_generator = function_key_generator(namespace, user_func)
62 77 else:
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
64
65 @functools.wraps(fn)
66 def decorate(*arg, **kw):
67 key = key_generator(*arg, **kw)
78 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
68 79
69 @functools.wraps(fn)
70 def creator():
71 return fn(*arg, **kw)
72
73 if not condition:
74 return creator()
75
76 timeout = expiration_time() if expiration_time_is_callable \
77 else expiration_time
78
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80 def refresh(*arg, **kw):
81 """
82 Like invalidate, but regenerates the value instead
83 """
84 key = key_generator(*arg, **kw)
85 value = user_func(*arg, **kw)
86 self.set(key, value)
87 return value
80 88
81 89 def invalidate(*arg, **kw):
82 90 key = key_generator(*arg, **kw)
83 91 self.delete(key)
84 92
85 93 def set_(value, *arg, **kw):
86 94 key = key_generator(*arg, **kw)
87 95 self.set(key, value)
88 96
89 97 def get(*arg, **kw):
90 98 key = key_generator(*arg, **kw)
91 99 return self.get(key)
92 100
93 def refresh(*arg, **kw):
94 key = key_generator(*arg, **kw)
95 value = fn(*arg, **kw)
96 self.set(key, value)
97 return value
101 user_func.set = set_
102 user_func.invalidate = invalidate
103 user_func.get = get
104 user_func.refresh = refresh
105 user_func.key_generator = key_generator
106 user_func.original = user_func
98 107
99 decorate.set = set_
100 decorate.invalidate = invalidate
101 decorate.refresh = refresh
102 decorate.get = get
103 decorate.original = fn
104 decorate.key_generator = key_generator
105 decorate.__wrapped__ = fn
108 # Use `decorate` to preserve the signature of :param:`user_func`.
106 109
107 return decorate
110 return decorate(user_func, functools.partial(
111 get_or_create_for_user_func, key_generator))
108 112
109 return decorator
113 return cache_decorator
110 114
111 115
112 116 def make_region(*arg, **kw):
113 117 return RhodeCodeCacheRegion(*arg, **kw)
114 118
115 119
116 120 def get_default_cache_settings(settings, prefixes=None):
117 121 prefixes = prefixes or []
118 122 cache_settings = {}
119 123 for key in settings.keys():
120 124 for prefix in prefixes:
121 125 if key.startswith(prefix):
122 126 name = key.split(prefix)[1].strip()
123 127 val = settings[key]
124 128 if isinstance(val, compat.string_types):
125 129 val = val.strip()
126 130 cache_settings[name] = val
127 131 return cache_settings
128 132
129 133
130 134 def compute_key_from_params(*args):
131 135 """
132 136 Helper to compute key from given params to be used in cache manager
133 137 """
134 138 return sha1("_".join(map(safe_str, args)))
135 139
136 140
137 141 def backend_key_generator(backend):
138 142 """
139 143 Special wrapper that also sends over the backend to the key generator
140 144 """
141 145 def wrapper(namespace, fn):
142 146 return key_generator(backend, namespace, fn)
143 147 return wrapper
144 148
145 149
146 150 def key_generator(backend, namespace, fn):
147 151 fname = fn.__name__
148 152
149 153 def generate_key(*args):
150 154 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 155 namespace_pref = namespace or 'default_namespace'
152 156 arg_key = compute_key_from_params(*args)
153 157 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
154 158
155 159 return final_key
156 160
157 161 return generate_key
158 162
159 163
160 164 def get_or_create_region(region_name, region_namespace=None):
161 165 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
162 166 region_obj = region_meta.dogpile_cache_regions.get(region_name)
163 167 if not region_obj:
164 168 raise EnvironmentError(
165 169 'Region `{}` not in configured: {}.'.format(
166 170 region_name, region_meta.dogpile_cache_regions.keys()))
167 171
168 172 region_uid_name = '{}:{}'.format(region_name, region_namespace)
169 173 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
170 174 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
171 175 if region_exist:
172 176 log.debug('Using already configured region: %s', region_namespace)
173 177 return region_exist
174 178 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
175 179 expiration_time = region_obj.expiration_time
176 180
177 181 if not os.path.isdir(cache_dir):
178 182 os.makedirs(cache_dir)
179 183 new_region = make_region(
180 184 name=region_uid_name,
181 185 function_key_generator=backend_key_generator(region_obj.actual_backend)
182 186 )
183 187 namespace_filename = os.path.join(
184 188 cache_dir, "{}.cache.dbm".format(region_namespace))
185 189 # special type that allows 1db per namespace
186 190 new_region.configure(
187 191 backend='dogpile.cache.rc.file_namespace',
188 192 expiration_time=expiration_time,
189 193 arguments={"filename": namespace_filename}
190 194 )
191 195
192 196 # create and save in region caches
193 197 log.debug('configuring new region: %s', region_uid_name)
194 198 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
195 199
196 200 return region_obj
197 201
198 202
199 203 def clear_cache_namespace(cache_region, cache_namespace_uid):
200 204 region = get_or_create_region(cache_region, cache_namespace_uid)
201 205 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
202 206 num_delete_keys = len(cache_keys)
203 207 if num_delete_keys:
204 208 region.delete_multi(cache_keys)
205 209 return num_delete_keys
206 210
207 211
208 212 class ActiveRegionCache(object):
209 213 def __init__(self, context, cache_data):
210 214 self.context = context
211 215 self.cache_data = cache_data
212 216
213 217 def should_invalidate(self):
214 218 return False
215 219
216 220
217 221 class FreshRegionCache(object):
218 222 def __init__(self, context, cache_data):
219 223 self.context = context
220 224 self.cache_data = cache_data
221 225
222 226 def should_invalidate(self):
223 227 return True
224 228
225 229
226 230 class InvalidationContext(object):
227 231 """
228 232 usage::
229 233
230 234 from rhodecode.lib import rc_cache
231 235
232 236 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
233 237 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
234 238
235 239 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
236 240 def heavy_compute(cache_name, param1, param2):
237 241 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
238 242
239 243 # invalidation namespace is shared namespace key for all process caches
240 244 # we use it to send a global signal
241 245 invalidation_namespace = 'repo_cache:1'
242 246
243 247 inv_context_manager = rc_cache.InvalidationContext(
244 248 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
245 249 with inv_context_manager as invalidation_context:
246 250 args = ('one', 'two')
247 251 # re-compute and store cache if we get invalidate signal
248 252 if invalidation_context.should_invalidate():
249 253 result = heavy_compute.refresh(*args)
250 254 else:
251 255 result = heavy_compute(*args)
252 256
253 257 compute_time = inv_context_manager.compute_time
254 258 log.debug('result computed in %.4fs', compute_time)
255 259
256 260 # To send global invalidation signal, simply run
257 261 CacheKey.set_invalidate(invalidation_namespace)
258 262
259 263 """
260 264
261 265 def __repr__(self):
262 266 return '<InvalidationContext:{}[{}]>'.format(
263 267 safe_str(self.cache_key), safe_str(self.uid))
264 268
265 269 def __init__(self, uid, invalidation_namespace='',
266 270 raise_exception=False, thread_scoped=None):
267 271 self.uid = uid
268 272 self.invalidation_namespace = invalidation_namespace
269 273 self.raise_exception = raise_exception
270 274 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
271 275 self.thread_id = 'global'
272 276
273 277 if thread_scoped is None:
274 278 # if we set "default" we can override this via .ini settings
275 279 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
276 280
277 281 # Append the thread id to the cache key if this invalidation context
278 282 # should be scoped to the current thread.
279 283 if thread_scoped is True:
280 284 self.thread_id = threading.current_thread().ident
281 285
282 286 self.cache_key = compute_key_from_params(uid)
283 287 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
284 288 self.proc_id, self.thread_id, self.cache_key)
285 289 self.compute_time = 0
286 290
287 291 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
288 292 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
289 293 # fetch all cache keys for this namespace and convert them to a map to find if we
290 294 # have specific cache_key object registered. We do this because we want to have
291 295 # all consistent cache_state_uid for newly registered objects
292 296 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
293 297 cache_obj = cache_obj_map.get(self.cache_key)
294 298 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
295 299 if not cache_obj:
296 300 new_cache_args = invalidation_namespace
297 301 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
298 302 cache_state_uid = None
299 303 if first_cache_obj:
300 304 cache_state_uid = first_cache_obj.cache_state_uid
301 305 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
302 306 cache_state_uid=cache_state_uid)
303 307 return cache_obj
304 308
305 309 def __enter__(self):
306 310 """
307 311 Test if current object is valid, and return CacheRegion function
308 312 that does invalidation and calculation
309 313 """
310 314 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
311 315 # register or get a new key based on uid
312 316 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
313 317 cache_data = self.cache_obj.get_dict()
314 318 self._start_time = time.time()
315 319 if self.cache_obj.cache_active:
316 320 # means our cache obj is existing and marked as it's
317 321 # cache is not outdated, we return ActiveRegionCache
318 322 self.skip_cache_active_change = True
319 323
320 324 return ActiveRegionCache(context=self, cache_data=cache_data)
321 325
322 326 # the key is either not existing or set to False, we return
323 327 # the real invalidator which re-computes value. We additionally set
324 328 # the flag to actually update the Database objects
325 329 self.skip_cache_active_change = False
326 330 return FreshRegionCache(context=self, cache_data=cache_data)
327 331
328 332 def __exit__(self, exc_type, exc_val, exc_tb):
329 333 # save compute time
330 334 self.compute_time = time.time() - self._start_time
331 335
332 336 if self.skip_cache_active_change:
333 337 return
334 338
335 339 try:
336 340 self.cache_obj.cache_active = True
337 341 Session().add(self.cache_obj)
338 342 Session().commit()
339 343 except IntegrityError:
340 344 # if we catch integrity error, it means we inserted this object
341 345 # assumption is that's really an edge race-condition case and
342 346 # it's safe is to skip it
343 347 Session().rollback()
344 348 except Exception:
345 349 log.exception('Failed to commit on cache key update')
346 350 Session().rollback()
347 351 if self.raise_exception:
348 352 raise
General Comments 0
You need to be logged in to leave comments. Login now