##// END OF EJS Templates
caches: newly generated cache object should have always unique UIDs to prevent...
marcink -
r3861:58b00196 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,339 +1,348 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 import threading
24 import threading
25
25
26 from dogpile.cache import CacheRegion
26 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
27 from dogpile.cache.util import compat
28
28
29 import rhodecode
29 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33
33
34 from . import region_meta
34 from . import region_meta
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38
38
39 class RhodeCodeCacheRegion(CacheRegion):
39 class RhodeCodeCacheRegion(CacheRegion):
40
40
41 def conditional_cache_on_arguments(
41 def conditional_cache_on_arguments(
42 self, namespace=None,
42 self, namespace=None,
43 expiration_time=None,
43 expiration_time=None,
44 should_cache_fn=None,
44 should_cache_fn=None,
45 to_str=compat.string_type,
45 to_str=compat.string_type,
46 function_key_generator=None,
46 function_key_generator=None,
47 condition=True):
47 condition=True):
48 """
48 """
49 Custom conditional decorator, that will not touch any dogpile internals if
49 Custom conditional decorator, that will not touch any dogpile internals if
50 condition isn't meet. This works a bit different than should_cache_fn
50 condition isn't meet. This works a bit different than should_cache_fn
51 And it's faster in cases we don't ever want to compute cached values
51 And it's faster in cases we don't ever want to compute cached values
52 """
52 """
53 expiration_time_is_callable = compat.callable(expiration_time)
53 expiration_time_is_callable = compat.callable(expiration_time)
54
54
55 if function_key_generator is None:
55 if function_key_generator is None:
56 function_key_generator = self.function_key_generator
56 function_key_generator = self.function_key_generator
57
57
58 def decorator(fn):
58 def decorator(fn):
59 if to_str is compat.string_type:
59 if to_str is compat.string_type:
60 # backwards compatible
60 # backwards compatible
61 key_generator = function_key_generator(namespace, fn)
61 key_generator = function_key_generator(namespace, fn)
62 else:
62 else:
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
64
64
65 @functools.wraps(fn)
65 @functools.wraps(fn)
66 def decorate(*arg, **kw):
66 def decorate(*arg, **kw):
67 key = key_generator(*arg, **kw)
67 key = key_generator(*arg, **kw)
68
68
69 @functools.wraps(fn)
69 @functools.wraps(fn)
70 def creator():
70 def creator():
71 return fn(*arg, **kw)
71 return fn(*arg, **kw)
72
72
73 if not condition:
73 if not condition:
74 return creator()
74 return creator()
75
75
76 timeout = expiration_time() if expiration_time_is_callable \
76 timeout = expiration_time() if expiration_time_is_callable \
77 else expiration_time
77 else expiration_time
78
78
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80
80
81 def invalidate(*arg, **kw):
81 def invalidate(*arg, **kw):
82 key = key_generator(*arg, **kw)
82 key = key_generator(*arg, **kw)
83 self.delete(key)
83 self.delete(key)
84
84
85 def set_(value, *arg, **kw):
85 def set_(value, *arg, **kw):
86 key = key_generator(*arg, **kw)
86 key = key_generator(*arg, **kw)
87 self.set(key, value)
87 self.set(key, value)
88
88
89 def get(*arg, **kw):
89 def get(*arg, **kw):
90 key = key_generator(*arg, **kw)
90 key = key_generator(*arg, **kw)
91 return self.get(key)
91 return self.get(key)
92
92
93 def refresh(*arg, **kw):
93 def refresh(*arg, **kw):
94 key = key_generator(*arg, **kw)
94 key = key_generator(*arg, **kw)
95 value = fn(*arg, **kw)
95 value = fn(*arg, **kw)
96 self.set(key, value)
96 self.set(key, value)
97 return value
97 return value
98
98
99 decorate.set = set_
99 decorate.set = set_
100 decorate.invalidate = invalidate
100 decorate.invalidate = invalidate
101 decorate.refresh = refresh
101 decorate.refresh = refresh
102 decorate.get = get
102 decorate.get = get
103 decorate.original = fn
103 decorate.original = fn
104 decorate.key_generator = key_generator
104 decorate.key_generator = key_generator
105 decorate.__wrapped__ = fn
105 decorate.__wrapped__ = fn
106
106
107 return decorate
107 return decorate
108
108
109 return decorator
109 return decorator
110
110
111
111
112 def make_region(*arg, **kw):
112 def make_region(*arg, **kw):
113 return RhodeCodeCacheRegion(*arg, **kw)
113 return RhodeCodeCacheRegion(*arg, **kw)
114
114
115
115
116 def get_default_cache_settings(settings, prefixes=None):
116 def get_default_cache_settings(settings, prefixes=None):
117 prefixes = prefixes or []
117 prefixes = prefixes or []
118 cache_settings = {}
118 cache_settings = {}
119 for key in settings.keys():
119 for key in settings.keys():
120 for prefix in prefixes:
120 for prefix in prefixes:
121 if key.startswith(prefix):
121 if key.startswith(prefix):
122 name = key.split(prefix)[1].strip()
122 name = key.split(prefix)[1].strip()
123 val = settings[key]
123 val = settings[key]
124 if isinstance(val, compat.string_types):
124 if isinstance(val, compat.string_types):
125 val = val.strip()
125 val = val.strip()
126 cache_settings[name] = val
126 cache_settings[name] = val
127 return cache_settings
127 return cache_settings
128
128
129
129
130 def compute_key_from_params(*args):
130 def compute_key_from_params(*args):
131 """
131 """
132 Helper to compute key from given params to be used in cache manager
132 Helper to compute key from given params to be used in cache manager
133 """
133 """
134 return sha1("_".join(map(safe_str, args)))
134 return sha1("_".join(map(safe_str, args)))
135
135
136
136
137 def backend_key_generator(backend):
137 def backend_key_generator(backend):
138 """
138 """
139 Special wrapper that also sends over the backend to the key generator
139 Special wrapper that also sends over the backend to the key generator
140 """
140 """
141 def wrapper(namespace, fn):
141 def wrapper(namespace, fn):
142 return key_generator(backend, namespace, fn)
142 return key_generator(backend, namespace, fn)
143 return wrapper
143 return wrapper
144
144
145
145
146 def key_generator(backend, namespace, fn):
146 def key_generator(backend, namespace, fn):
147 fname = fn.__name__
147 fname = fn.__name__
148
148
149 def generate_key(*args):
149 def generate_key(*args):
150 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
150 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 namespace_pref = namespace or 'default_namespace'
151 namespace_pref = namespace or 'default_namespace'
152 arg_key = compute_key_from_params(*args)
152 arg_key = compute_key_from_params(*args)
153 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
153 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
154
154
155 return final_key
155 return final_key
156
156
157 return generate_key
157 return generate_key
158
158
159
159
160 def get_or_create_region(region_name, region_namespace=None):
160 def get_or_create_region(region_name, region_namespace=None):
161 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
161 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
162 region_obj = region_meta.dogpile_cache_regions.get(region_name)
162 region_obj = region_meta.dogpile_cache_regions.get(region_name)
163 if not region_obj:
163 if not region_obj:
164 raise EnvironmentError(
164 raise EnvironmentError(
165 'Region `{}` not in configured: {}.'.format(
165 'Region `{}` not in configured: {}.'.format(
166 region_name, region_meta.dogpile_cache_regions.keys()))
166 region_name, region_meta.dogpile_cache_regions.keys()))
167
167
168 region_uid_name = '{}:{}'.format(region_name, region_namespace)
168 region_uid_name = '{}:{}'.format(region_name, region_namespace)
169 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
169 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
170 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
170 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
171 if region_exist:
171 if region_exist:
172 log.debug('Using already configured region: %s', region_namespace)
172 log.debug('Using already configured region: %s', region_namespace)
173 return region_exist
173 return region_exist
174 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
174 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
175 expiration_time = region_obj.expiration_time
175 expiration_time = region_obj.expiration_time
176
176
177 if not os.path.isdir(cache_dir):
177 if not os.path.isdir(cache_dir):
178 os.makedirs(cache_dir)
178 os.makedirs(cache_dir)
179 new_region = make_region(
179 new_region = make_region(
180 name=region_uid_name,
180 name=region_uid_name,
181 function_key_generator=backend_key_generator(region_obj.actual_backend)
181 function_key_generator=backend_key_generator(region_obj.actual_backend)
182 )
182 )
183 namespace_filename = os.path.join(
183 namespace_filename = os.path.join(
184 cache_dir, "{}.cache.dbm".format(region_namespace))
184 cache_dir, "{}.cache.dbm".format(region_namespace))
185 # special type that allows 1db per namespace
185 # special type that allows 1db per namespace
186 new_region.configure(
186 new_region.configure(
187 backend='dogpile.cache.rc.file_namespace',
187 backend='dogpile.cache.rc.file_namespace',
188 expiration_time=expiration_time,
188 expiration_time=expiration_time,
189 arguments={"filename": namespace_filename}
189 arguments={"filename": namespace_filename}
190 )
190 )
191
191
192 # create and save in region caches
192 # create and save in region caches
193 log.debug('configuring new region: %s', region_uid_name)
193 log.debug('configuring new region: %s', region_uid_name)
194 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
194 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
195
195
196 return region_obj
196 return region_obj
197
197
198
198
199 def clear_cache_namespace(cache_region, cache_namespace_uid):
199 def clear_cache_namespace(cache_region, cache_namespace_uid):
200 region = get_or_create_region(cache_region, cache_namespace_uid)
200 region = get_or_create_region(cache_region, cache_namespace_uid)
201 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
201 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
202 num_delete_keys = len(cache_keys)
202 num_delete_keys = len(cache_keys)
203 if num_delete_keys:
203 if num_delete_keys:
204 region.delete_multi(cache_keys)
204 region.delete_multi(cache_keys)
205 return num_delete_keys
205 return num_delete_keys
206
206
207
207
208 class ActiveRegionCache(object):
208 class ActiveRegionCache(object):
209 def __init__(self, context, cache_data):
209 def __init__(self, context, cache_data):
210 self.context = context
210 self.context = context
211 self.cache_data = cache_data
211 self.cache_data = cache_data
212
212
213 def should_invalidate(self):
213 def should_invalidate(self):
214 return False
214 return False
215
215
216
216
217 class FreshRegionCache(object):
217 class FreshRegionCache(object):
218 def __init__(self, context, cache_data):
218 def __init__(self, context, cache_data):
219 self.context = context
219 self.context = context
220 self.cache_data = cache_data
220 self.cache_data = cache_data
221
221
222 def should_invalidate(self):
222 def should_invalidate(self):
223 return True
223 return True
224
224
225
225
226 class InvalidationContext(object):
226 class InvalidationContext(object):
227 """
227 """
228 usage::
228 usage::
229
229
230 from rhodecode.lib import rc_cache
230 from rhodecode.lib import rc_cache
231
231
232 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
232 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
233 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
233 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
234
234
235 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
235 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
236 def heavy_compute(cache_name, param1, param2):
236 def heavy_compute(cache_name, param1, param2):
237 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
237 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
238
238
239 # invalidation namespace is shared namespace key for all process caches
239 # invalidation namespace is shared namespace key for all process caches
240 # we use it to send a global signal
240 # we use it to send a global signal
241 invalidation_namespace = 'repo_cache:1'
241 invalidation_namespace = 'repo_cache:1'
242
242
243 inv_context_manager = rc_cache.InvalidationContext(
243 inv_context_manager = rc_cache.InvalidationContext(
244 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
244 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
245 with inv_context_manager as invalidation_context:
245 with inv_context_manager as invalidation_context:
246 args = ('one', 'two')
246 args = ('one', 'two')
247 # re-compute and store cache if we get invalidate signal
247 # re-compute and store cache if we get invalidate signal
248 if invalidation_context.should_invalidate():
248 if invalidation_context.should_invalidate():
249 result = heavy_compute.refresh(*args)
249 result = heavy_compute.refresh(*args)
250 else:
250 else:
251 result = heavy_compute(*args)
251 result = heavy_compute(*args)
252
252
253 compute_time = inv_context_manager.compute_time
253 compute_time = inv_context_manager.compute_time
254 log.debug('result computed in %.4fs', compute_time)
254 log.debug('result computed in %.4fs', compute_time)
255
255
256 # To send global invalidation signal, simply run
256 # To send global invalidation signal, simply run
257 CacheKey.set_invalidate(invalidation_namespace)
257 CacheKey.set_invalidate(invalidation_namespace)
258
258
259 """
259 """
260
260
261 def __repr__(self):
261 def __repr__(self):
262 return '<InvalidationContext:{}[{}]>'.format(
262 return '<InvalidationContext:{}[{}]>'.format(
263 safe_str(self.cache_key), safe_str(self.uid))
263 safe_str(self.cache_key), safe_str(self.uid))
264
264
265 def __init__(self, uid, invalidation_namespace='',
265 def __init__(self, uid, invalidation_namespace='',
266 raise_exception=False, thread_scoped=None):
266 raise_exception=False, thread_scoped=None):
267 self.uid = uid
267 self.uid = uid
268 self.invalidation_namespace = invalidation_namespace
268 self.invalidation_namespace = invalidation_namespace
269 self.raise_exception = raise_exception
269 self.raise_exception = raise_exception
270 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
270 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
271 self.thread_id = 'global'
271 self.thread_id = 'global'
272
272
273 if thread_scoped is None:
273 if thread_scoped is None:
274 # if we set "default" we can override this via .ini settings
274 # if we set "default" we can override this via .ini settings
275 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
275 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
276
276
277 # Append the thread id to the cache key if this invalidation context
277 # Append the thread id to the cache key if this invalidation context
278 # should be scoped to the current thread.
278 # should be scoped to the current thread.
279 if thread_scoped is True:
279 if thread_scoped is True:
280 self.thread_id = threading.current_thread().ident
280 self.thread_id = threading.current_thread().ident
281
281
282 self.cache_key = compute_key_from_params(uid)
282 self.cache_key = compute_key_from_params(uid)
283 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
283 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
284 self.proc_id, self.thread_id, self.cache_key)
284 self.proc_id, self.thread_id, self.cache_key)
285 self.compute_time = 0
285 self.compute_time = 0
286
286
287 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
287 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
288 cache_obj = CacheKey.get_active_cache(self.cache_key)
288 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
289 # fetch all cache keys for this namespace and convert them to a map to find if we
290 # have specific cache_key object registered. We do this because we want to have
291 # all consistent cache_state_uid for newly registered objects
292 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
293 cache_obj = cache_obj_map.get(self.cache_key)
289 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
294 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
290 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
291 if not cache_obj:
295 if not cache_obj:
292 new_cache_args = invalidation_namespace
296 new_cache_args = invalidation_namespace
293 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args)
297 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
298 cache_state_uid = None
299 if first_cache_obj:
300 cache_state_uid = first_cache_obj.cache_state_uid
301 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
302 cache_state_uid=cache_state_uid)
294 return cache_obj
303 return cache_obj
295
304
296 def __enter__(self):
305 def __enter__(self):
297 """
306 """
298 Test if current object is valid, and return CacheRegion function
307 Test if current object is valid, and return CacheRegion function
299 that does invalidation and calculation
308 that does invalidation and calculation
300 """
309 """
301 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
310 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
302 # register or get a new key based on uid
311 # register or get a new key based on uid
303 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
312 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
304 cache_data = self.cache_obj.get_dict()
313 cache_data = self.cache_obj.get_dict()
305 self._start_time = time.time()
314 self._start_time = time.time()
306 if self.cache_obj.cache_active:
315 if self.cache_obj.cache_active:
307 # means our cache obj is existing and marked as it's
316 # means our cache obj is existing and marked as it's
308 # cache is not outdated, we return ActiveRegionCache
317 # cache is not outdated, we return ActiveRegionCache
309 self.skip_cache_active_change = True
318 self.skip_cache_active_change = True
310
319
311 return ActiveRegionCache(context=self, cache_data=cache_data)
320 return ActiveRegionCache(context=self, cache_data=cache_data)
312
321
313 # the key is either not existing or set to False, we return
322 # the key is either not existing or set to False, we return
314 # the real invalidator which re-computes value. We additionally set
323 # the real invalidator which re-computes value. We additionally set
315 # the flag to actually update the Database objects
324 # the flag to actually update the Database objects
316 self.skip_cache_active_change = False
325 self.skip_cache_active_change = False
317 return FreshRegionCache(context=self, cache_data=cache_data)
326 return FreshRegionCache(context=self, cache_data=cache_data)
318
327
319 def __exit__(self, exc_type, exc_val, exc_tb):
328 def __exit__(self, exc_type, exc_val, exc_tb):
320 # save compute time
329 # save compute time
321 self.compute_time = time.time() - self._start_time
330 self.compute_time = time.time() - self._start_time
322
331
323 if self.skip_cache_active_change:
332 if self.skip_cache_active_change:
324 return
333 return
325
334
326 try:
335 try:
327 self.cache_obj.cache_active = True
336 self.cache_obj.cache_active = True
328 Session().add(self.cache_obj)
337 Session().add(self.cache_obj)
329 Session().commit()
338 Session().commit()
330 except IntegrityError:
339 except IntegrityError:
331 # if we catch integrity error, it means we inserted this object
340 # if we catch integrity error, it means we inserted this object
332 # assumption is that's really an edge race-condition case and
341 # assumption is that's really an edge race-condition case and
333 # it's safe is to skip it
342 # it's safe is to skip it
334 Session().rollback()
343 Session().rollback()
335 except Exception:
344 except Exception:
336 log.exception('Failed to commit on cache key update')
345 log.exception('Failed to commit on cache key update')
337 Session().rollback()
346 Session().rollback()
338 if self.raise_exception:
347 if self.raise_exception:
339 raise
348 raise
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now