##// END OF EJS Templates
cache: updated cache decorators based on latest code from dogpile
marcink -
r3863:a4fb81bb default
parent child Browse files
Show More
@@ -1,348 +1,352 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 from decorator import decorate
24 import threading
25 import threading
25
26
26 from dogpile.cache import CacheRegion
27 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
28 from dogpile.cache.util import compat
28
29
29 import rhodecode
30 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 from rhodecode.model.db import Session, CacheKey, IntegrityError
33
34
34 from . import region_meta
35 from . import region_meta
35
36
36 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
37
38
38
39
39 class RhodeCodeCacheRegion(CacheRegion):
40 class RhodeCodeCacheRegion(CacheRegion):
40
41
41 def conditional_cache_on_arguments(
42 def conditional_cache_on_arguments(
42 self, namespace=None,
43 self, namespace=None,
43 expiration_time=None,
44 expiration_time=None,
44 should_cache_fn=None,
45 should_cache_fn=None,
45 to_str=compat.string_type,
46 to_str=compat.string_type,
46 function_key_generator=None,
47 function_key_generator=None,
47 condition=True):
48 condition=True):
48 """
49 """
49 Custom conditional decorator, that will not touch any dogpile internals if
50 Custom conditional decorator, that will not touch any dogpile internals if
50 condition isn't meet. This works a bit different than should_cache_fn
51 condition isn't meet. This works a bit different than should_cache_fn
51 And it's faster in cases we don't ever want to compute cached values
52 And it's faster in cases we don't ever want to compute cached values
52 """
53 """
53 expiration_time_is_callable = compat.callable(expiration_time)
54 expiration_time_is_callable = compat.callable(expiration_time)
54
55
55 if function_key_generator is None:
56 if function_key_generator is None:
56 function_key_generator = self.function_key_generator
57 function_key_generator = self.function_key_generator
57
58
58 def decorator(fn):
59 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
60
61 if not condition:
62 log.debug('Calling un-cached func:%s', user_func)
63 return user_func(*arg, **kw)
64
65 key = key_generator(*arg, **kw)
66
67 timeout = expiration_time() if expiration_time_is_callable \
68 else expiration_time
69
70 log.debug('Calling cached fn:%s', user_func)
71 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
72
73 def cache_decorator(user_func):
59 if to_str is compat.string_type:
74 if to_str is compat.string_type:
60 # backwards compatible
75 # backwards compatible
61 key_generator = function_key_generator(namespace, fn)
76 key_generator = function_key_generator(namespace, user_func)
62 else:
77 else:
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
78 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
64
65 @functools.wraps(fn)
66 def decorate(*arg, **kw):
67 key = key_generator(*arg, **kw)
68
79
69 @functools.wraps(fn)
80 def refresh(*arg, **kw):
70 def creator():
81 """
71 return fn(*arg, **kw)
82 Like invalidate, but regenerates the value instead
72
83 """
73 if not condition:
84 key = key_generator(*arg, **kw)
74 return creator()
85 value = user_func(*arg, **kw)
75
86 self.set(key, value)
76 timeout = expiration_time() if expiration_time_is_callable \
87 return value
77 else expiration_time
78
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80
88
81 def invalidate(*arg, **kw):
89 def invalidate(*arg, **kw):
82 key = key_generator(*arg, **kw)
90 key = key_generator(*arg, **kw)
83 self.delete(key)
91 self.delete(key)
84
92
85 def set_(value, *arg, **kw):
93 def set_(value, *arg, **kw):
86 key = key_generator(*arg, **kw)
94 key = key_generator(*arg, **kw)
87 self.set(key, value)
95 self.set(key, value)
88
96
89 def get(*arg, **kw):
97 def get(*arg, **kw):
90 key = key_generator(*arg, **kw)
98 key = key_generator(*arg, **kw)
91 return self.get(key)
99 return self.get(key)
92
100
93 def refresh(*arg, **kw):
101 user_func.set = set_
94 key = key_generator(*arg, **kw)
102 user_func.invalidate = invalidate
95 value = fn(*arg, **kw)
103 user_func.get = get
96 self.set(key, value)
104 user_func.refresh = refresh
97 return value
105 user_func.key_generator = key_generator
106 user_func.original = user_func
98
107
99 decorate.set = set_
108 # Use `decorate` to preserve the signature of :param:`user_func`.
100 decorate.invalidate = invalidate
101 decorate.refresh = refresh
102 decorate.get = get
103 decorate.original = fn
104 decorate.key_generator = key_generator
105 decorate.__wrapped__ = fn
106
109
107 return decorate
110 return decorate(user_func, functools.partial(
111 get_or_create_for_user_func, key_generator))
108
112
109 return decorator
113 return cache_decorator
110
114
111
115
112 def make_region(*arg, **kw):
116 def make_region(*arg, **kw):
113 return RhodeCodeCacheRegion(*arg, **kw)
117 return RhodeCodeCacheRegion(*arg, **kw)
114
118
115
119
116 def get_default_cache_settings(settings, prefixes=None):
120 def get_default_cache_settings(settings, prefixes=None):
117 prefixes = prefixes or []
121 prefixes = prefixes or []
118 cache_settings = {}
122 cache_settings = {}
119 for key in settings.keys():
123 for key in settings.keys():
120 for prefix in prefixes:
124 for prefix in prefixes:
121 if key.startswith(prefix):
125 if key.startswith(prefix):
122 name = key.split(prefix)[1].strip()
126 name = key.split(prefix)[1].strip()
123 val = settings[key]
127 val = settings[key]
124 if isinstance(val, compat.string_types):
128 if isinstance(val, compat.string_types):
125 val = val.strip()
129 val = val.strip()
126 cache_settings[name] = val
130 cache_settings[name] = val
127 return cache_settings
131 return cache_settings
128
132
129
133
130 def compute_key_from_params(*args):
134 def compute_key_from_params(*args):
131 """
135 """
132 Helper to compute key from given params to be used in cache manager
136 Helper to compute key from given params to be used in cache manager
133 """
137 """
134 return sha1("_".join(map(safe_str, args)))
138 return sha1("_".join(map(safe_str, args)))
135
139
136
140
137 def backend_key_generator(backend):
141 def backend_key_generator(backend):
138 """
142 """
139 Special wrapper that also sends over the backend to the key generator
143 Special wrapper that also sends over the backend to the key generator
140 """
144 """
141 def wrapper(namespace, fn):
145 def wrapper(namespace, fn):
142 return key_generator(backend, namespace, fn)
146 return key_generator(backend, namespace, fn)
143 return wrapper
147 return wrapper
144
148
145
149
146 def key_generator(backend, namespace, fn):
150 def key_generator(backend, namespace, fn):
147 fname = fn.__name__
151 fname = fn.__name__
148
152
149 def generate_key(*args):
153 def generate_key(*args):
150 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
154 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 namespace_pref = namespace or 'default_namespace'
155 namespace_pref = namespace or 'default_namespace'
152 arg_key = compute_key_from_params(*args)
156 arg_key = compute_key_from_params(*args)
153 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
157 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
154
158
155 return final_key
159 return final_key
156
160
157 return generate_key
161 return generate_key
158
162
159
163
160 def get_or_create_region(region_name, region_namespace=None):
164 def get_or_create_region(region_name, region_namespace=None):
161 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
165 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
162 region_obj = region_meta.dogpile_cache_regions.get(region_name)
166 region_obj = region_meta.dogpile_cache_regions.get(region_name)
163 if not region_obj:
167 if not region_obj:
164 raise EnvironmentError(
168 raise EnvironmentError(
165 'Region `{}` not in configured: {}.'.format(
169 'Region `{}` not in configured: {}.'.format(
166 region_name, region_meta.dogpile_cache_regions.keys()))
170 region_name, region_meta.dogpile_cache_regions.keys()))
167
171
168 region_uid_name = '{}:{}'.format(region_name, region_namespace)
172 region_uid_name = '{}:{}'.format(region_name, region_namespace)
169 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
173 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
170 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
174 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
171 if region_exist:
175 if region_exist:
172 log.debug('Using already configured region: %s', region_namespace)
176 log.debug('Using already configured region: %s', region_namespace)
173 return region_exist
177 return region_exist
174 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
178 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
175 expiration_time = region_obj.expiration_time
179 expiration_time = region_obj.expiration_time
176
180
177 if not os.path.isdir(cache_dir):
181 if not os.path.isdir(cache_dir):
178 os.makedirs(cache_dir)
182 os.makedirs(cache_dir)
179 new_region = make_region(
183 new_region = make_region(
180 name=region_uid_name,
184 name=region_uid_name,
181 function_key_generator=backend_key_generator(region_obj.actual_backend)
185 function_key_generator=backend_key_generator(region_obj.actual_backend)
182 )
186 )
183 namespace_filename = os.path.join(
187 namespace_filename = os.path.join(
184 cache_dir, "{}.cache.dbm".format(region_namespace))
188 cache_dir, "{}.cache.dbm".format(region_namespace))
185 # special type that allows 1db per namespace
189 # special type that allows 1db per namespace
186 new_region.configure(
190 new_region.configure(
187 backend='dogpile.cache.rc.file_namespace',
191 backend='dogpile.cache.rc.file_namespace',
188 expiration_time=expiration_time,
192 expiration_time=expiration_time,
189 arguments={"filename": namespace_filename}
193 arguments={"filename": namespace_filename}
190 )
194 )
191
195
192 # create and save in region caches
196 # create and save in region caches
193 log.debug('configuring new region: %s', region_uid_name)
197 log.debug('configuring new region: %s', region_uid_name)
194 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
198 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
195
199
196 return region_obj
200 return region_obj
197
201
198
202
199 def clear_cache_namespace(cache_region, cache_namespace_uid):
203 def clear_cache_namespace(cache_region, cache_namespace_uid):
200 region = get_or_create_region(cache_region, cache_namespace_uid)
204 region = get_or_create_region(cache_region, cache_namespace_uid)
201 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
205 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
202 num_delete_keys = len(cache_keys)
206 num_delete_keys = len(cache_keys)
203 if num_delete_keys:
207 if num_delete_keys:
204 region.delete_multi(cache_keys)
208 region.delete_multi(cache_keys)
205 return num_delete_keys
209 return num_delete_keys
206
210
207
211
208 class ActiveRegionCache(object):
212 class ActiveRegionCache(object):
209 def __init__(self, context, cache_data):
213 def __init__(self, context, cache_data):
210 self.context = context
214 self.context = context
211 self.cache_data = cache_data
215 self.cache_data = cache_data
212
216
213 def should_invalidate(self):
217 def should_invalidate(self):
214 return False
218 return False
215
219
216
220
217 class FreshRegionCache(object):
221 class FreshRegionCache(object):
218 def __init__(self, context, cache_data):
222 def __init__(self, context, cache_data):
219 self.context = context
223 self.context = context
220 self.cache_data = cache_data
224 self.cache_data = cache_data
221
225
222 def should_invalidate(self):
226 def should_invalidate(self):
223 return True
227 return True
224
228
225
229
226 class InvalidationContext(object):
230 class InvalidationContext(object):
227 """
231 """
228 usage::
232 usage::
229
233
230 from rhodecode.lib import rc_cache
234 from rhodecode.lib import rc_cache
231
235
232 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
236 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
233 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
237 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
234
238
235 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
239 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
236 def heavy_compute(cache_name, param1, param2):
240 def heavy_compute(cache_name, param1, param2):
237 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
241 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
238
242
239 # invalidation namespace is shared namespace key for all process caches
243 # invalidation namespace is shared namespace key for all process caches
240 # we use it to send a global signal
244 # we use it to send a global signal
241 invalidation_namespace = 'repo_cache:1'
245 invalidation_namespace = 'repo_cache:1'
242
246
243 inv_context_manager = rc_cache.InvalidationContext(
247 inv_context_manager = rc_cache.InvalidationContext(
244 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
248 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
245 with inv_context_manager as invalidation_context:
249 with inv_context_manager as invalidation_context:
246 args = ('one', 'two')
250 args = ('one', 'two')
247 # re-compute and store cache if we get invalidate signal
251 # re-compute and store cache if we get invalidate signal
248 if invalidation_context.should_invalidate():
252 if invalidation_context.should_invalidate():
249 result = heavy_compute.refresh(*args)
253 result = heavy_compute.refresh(*args)
250 else:
254 else:
251 result = heavy_compute(*args)
255 result = heavy_compute(*args)
252
256
253 compute_time = inv_context_manager.compute_time
257 compute_time = inv_context_manager.compute_time
254 log.debug('result computed in %.4fs', compute_time)
258 log.debug('result computed in %.4fs', compute_time)
255
259
256 # To send global invalidation signal, simply run
260 # To send global invalidation signal, simply run
257 CacheKey.set_invalidate(invalidation_namespace)
261 CacheKey.set_invalidate(invalidation_namespace)
258
262
259 """
263 """
260
264
261 def __repr__(self):
265 def __repr__(self):
262 return '<InvalidationContext:{}[{}]>'.format(
266 return '<InvalidationContext:{}[{}]>'.format(
263 safe_str(self.cache_key), safe_str(self.uid))
267 safe_str(self.cache_key), safe_str(self.uid))
264
268
265 def __init__(self, uid, invalidation_namespace='',
269 def __init__(self, uid, invalidation_namespace='',
266 raise_exception=False, thread_scoped=None):
270 raise_exception=False, thread_scoped=None):
267 self.uid = uid
271 self.uid = uid
268 self.invalidation_namespace = invalidation_namespace
272 self.invalidation_namespace = invalidation_namespace
269 self.raise_exception = raise_exception
273 self.raise_exception = raise_exception
270 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
274 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
271 self.thread_id = 'global'
275 self.thread_id = 'global'
272
276
273 if thread_scoped is None:
277 if thread_scoped is None:
274 # if we set "default" we can override this via .ini settings
278 # if we set "default" we can override this via .ini settings
275 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
279 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
276
280
277 # Append the thread id to the cache key if this invalidation context
281 # Append the thread id to the cache key if this invalidation context
278 # should be scoped to the current thread.
282 # should be scoped to the current thread.
279 if thread_scoped is True:
283 if thread_scoped is True:
280 self.thread_id = threading.current_thread().ident
284 self.thread_id = threading.current_thread().ident
281
285
282 self.cache_key = compute_key_from_params(uid)
286 self.cache_key = compute_key_from_params(uid)
283 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
287 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
284 self.proc_id, self.thread_id, self.cache_key)
288 self.proc_id, self.thread_id, self.cache_key)
285 self.compute_time = 0
289 self.compute_time = 0
286
290
287 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
291 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
288 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
292 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
289 # fetch all cache keys for this namespace and convert them to a map to find if we
293 # fetch all cache keys for this namespace and convert them to a map to find if we
290 # have specific cache_key object registered. We do this because we want to have
294 # have specific cache_key object registered. We do this because we want to have
291 # all consistent cache_state_uid for newly registered objects
295 # all consistent cache_state_uid for newly registered objects
292 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
296 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
293 cache_obj = cache_obj_map.get(self.cache_key)
297 cache_obj = cache_obj_map.get(self.cache_key)
294 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
298 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
295 if not cache_obj:
299 if not cache_obj:
296 new_cache_args = invalidation_namespace
300 new_cache_args = invalidation_namespace
297 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
301 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
298 cache_state_uid = None
302 cache_state_uid = None
299 if first_cache_obj:
303 if first_cache_obj:
300 cache_state_uid = first_cache_obj.cache_state_uid
304 cache_state_uid = first_cache_obj.cache_state_uid
301 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
305 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
302 cache_state_uid=cache_state_uid)
306 cache_state_uid=cache_state_uid)
303 return cache_obj
307 return cache_obj
304
308
305 def __enter__(self):
309 def __enter__(self):
306 """
310 """
307 Test if current object is valid, and return CacheRegion function
311 Test if current object is valid, and return CacheRegion function
308 that does invalidation and calculation
312 that does invalidation and calculation
309 """
313 """
310 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
314 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
311 # register or get a new key based on uid
315 # register or get a new key based on uid
312 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
316 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
313 cache_data = self.cache_obj.get_dict()
317 cache_data = self.cache_obj.get_dict()
314 self._start_time = time.time()
318 self._start_time = time.time()
315 if self.cache_obj.cache_active:
319 if self.cache_obj.cache_active:
316 # means our cache obj is existing and marked as it's
320 # means our cache obj is existing and marked as it's
317 # cache is not outdated, we return ActiveRegionCache
321 # cache is not outdated, we return ActiveRegionCache
318 self.skip_cache_active_change = True
322 self.skip_cache_active_change = True
319
323
320 return ActiveRegionCache(context=self, cache_data=cache_data)
324 return ActiveRegionCache(context=self, cache_data=cache_data)
321
325
322 # the key is either not existing or set to False, we return
326 # the key is either not existing or set to False, we return
323 # the real invalidator which re-computes value. We additionally set
327 # the real invalidator which re-computes value. We additionally set
324 # the flag to actually update the Database objects
328 # the flag to actually update the Database objects
325 self.skip_cache_active_change = False
329 self.skip_cache_active_change = False
326 return FreshRegionCache(context=self, cache_data=cache_data)
330 return FreshRegionCache(context=self, cache_data=cache_data)
327
331
328 def __exit__(self, exc_type, exc_val, exc_tb):
332 def __exit__(self, exc_type, exc_val, exc_tb):
329 # save compute time
333 # save compute time
330 self.compute_time = time.time() - self._start_time
334 self.compute_time = time.time() - self._start_time
331
335
332 if self.skip_cache_active_change:
336 if self.skip_cache_active_change:
333 return
337 return
334
338
335 try:
339 try:
336 self.cache_obj.cache_active = True
340 self.cache_obj.cache_active = True
337 Session().add(self.cache_obj)
341 Session().add(self.cache_obj)
338 Session().commit()
342 Session().commit()
339 except IntegrityError:
343 except IntegrityError:
340 # if we catch integrity error, it means we inserted this object
344 # if we catch integrity error, it means we inserted this object
341 # assumption is that's really an edge race-condition case and
345 # assumption is that's really an edge race-condition case and
342 # it's safe is to skip it
346 # it's safe is to skip it
343 Session().rollback()
347 Session().rollback()
344 except Exception:
348 except Exception:
345 log.exception('Failed to commit on cache key update')
349 log.exception('Failed to commit on cache key update')
346 Session().rollback()
350 Session().rollback()
347 if self.raise_exception:
351 if self.raise_exception:
348 raise
352 raise
General Comments 0
You need to be logged in to leave comments. Login now