##// END OF EJS Templates
caches: don't try to delete keys if there aren't any to delete....
marcink -
r2969:c92b412f default
parent child Browse files
Show More
@@ -1,320 +1,322 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 import threading
24 import threading
25
25
26 from dogpile.cache import CacheRegion
26 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
27 from dogpile.cache.util import compat
28
28
29 import rhodecode
29 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33
33
34 from . import region_meta
34 from . import region_meta
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38
38
39 class RhodeCodeCacheRegion(CacheRegion):
39 class RhodeCodeCacheRegion(CacheRegion):
40
40
41 def conditional_cache_on_arguments(
41 def conditional_cache_on_arguments(
42 self, namespace=None,
42 self, namespace=None,
43 expiration_time=None,
43 expiration_time=None,
44 should_cache_fn=None,
44 should_cache_fn=None,
45 to_str=compat.string_type,
45 to_str=compat.string_type,
46 function_key_generator=None,
46 function_key_generator=None,
47 condition=True):
47 condition=True):
48 """
48 """
49 Custom conditional decorator, that will not touch any dogpile internals if
49 Custom conditional decorator, that will not touch any dogpile internals if
50 condition isn't meet. This works a bit different than should_cache_fn
50 condition isn't meet. This works a bit different than should_cache_fn
51 And it's faster in cases we don't ever want to compute cached values
51 And it's faster in cases we don't ever want to compute cached values
52 """
52 """
53 expiration_time_is_callable = compat.callable(expiration_time)
53 expiration_time_is_callable = compat.callable(expiration_time)
54
54
55 if function_key_generator is None:
55 if function_key_generator is None:
56 function_key_generator = self.function_key_generator
56 function_key_generator = self.function_key_generator
57
57
58 def decorator(fn):
58 def decorator(fn):
59 if to_str is compat.string_type:
59 if to_str is compat.string_type:
60 # backwards compatible
60 # backwards compatible
61 key_generator = function_key_generator(namespace, fn)
61 key_generator = function_key_generator(namespace, fn)
62 else:
62 else:
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
64
64
65 @functools.wraps(fn)
65 @functools.wraps(fn)
66 def decorate(*arg, **kw):
66 def decorate(*arg, **kw):
67 key = key_generator(*arg, **kw)
67 key = key_generator(*arg, **kw)
68
68
69 @functools.wraps(fn)
69 @functools.wraps(fn)
70 def creator():
70 def creator():
71 return fn(*arg, **kw)
71 return fn(*arg, **kw)
72
72
73 if not condition:
73 if not condition:
74 return creator()
74 return creator()
75
75
76 timeout = expiration_time() if expiration_time_is_callable \
76 timeout = expiration_time() if expiration_time_is_callable \
77 else expiration_time
77 else expiration_time
78
78
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80
80
81 def invalidate(*arg, **kw):
81 def invalidate(*arg, **kw):
82 key = key_generator(*arg, **kw)
82 key = key_generator(*arg, **kw)
83 self.delete(key)
83 self.delete(key)
84
84
85 def set_(value, *arg, **kw):
85 def set_(value, *arg, **kw):
86 key = key_generator(*arg, **kw)
86 key = key_generator(*arg, **kw)
87 self.set(key, value)
87 self.set(key, value)
88
88
89 def get(*arg, **kw):
89 def get(*arg, **kw):
90 key = key_generator(*arg, **kw)
90 key = key_generator(*arg, **kw)
91 return self.get(key)
91 return self.get(key)
92
92
93 def refresh(*arg, **kw):
93 def refresh(*arg, **kw):
94 key = key_generator(*arg, **kw)
94 key = key_generator(*arg, **kw)
95 value = fn(*arg, **kw)
95 value = fn(*arg, **kw)
96 self.set(key, value)
96 self.set(key, value)
97 return value
97 return value
98
98
99 decorate.set = set_
99 decorate.set = set_
100 decorate.invalidate = invalidate
100 decorate.invalidate = invalidate
101 decorate.refresh = refresh
101 decorate.refresh = refresh
102 decorate.get = get
102 decorate.get = get
103 decorate.original = fn
103 decorate.original = fn
104 decorate.key_generator = key_generator
104 decorate.key_generator = key_generator
105
105
106 return decorate
106 return decorate
107
107
108 return decorator
108 return decorator
109
109
110
110
111 def make_region(*arg, **kw):
111 def make_region(*arg, **kw):
112 return RhodeCodeCacheRegion(*arg, **kw)
112 return RhodeCodeCacheRegion(*arg, **kw)
113
113
114
114
115 def get_default_cache_settings(settings, prefixes=None):
115 def get_default_cache_settings(settings, prefixes=None):
116 prefixes = prefixes or []
116 prefixes = prefixes or []
117 cache_settings = {}
117 cache_settings = {}
118 for key in settings.keys():
118 for key in settings.keys():
119 for prefix in prefixes:
119 for prefix in prefixes:
120 if key.startswith(prefix):
120 if key.startswith(prefix):
121 name = key.split(prefix)[1].strip()
121 name = key.split(prefix)[1].strip()
122 val = settings[key]
122 val = settings[key]
123 if isinstance(val, basestring):
123 if isinstance(val, basestring):
124 val = val.strip()
124 val = val.strip()
125 cache_settings[name] = val
125 cache_settings[name] = val
126 return cache_settings
126 return cache_settings
127
127
128
128
129 def compute_key_from_params(*args):
129 def compute_key_from_params(*args):
130 """
130 """
131 Helper to compute key from given params to be used in cache manager
131 Helper to compute key from given params to be used in cache manager
132 """
132 """
133 return sha1("_".join(map(safe_str, args)))
133 return sha1("_".join(map(safe_str, args)))
134
134
135
135
136 def key_generator(namespace, fn):
136 def key_generator(namespace, fn):
137 fname = fn.__name__
137 fname = fn.__name__
138
138
139 def generate_key(*args):
139 def generate_key(*args):
140 namespace_pref = namespace or 'default'
140 namespace_pref = namespace or 'default'
141 arg_key = compute_key_from_params(*args)
141 arg_key = compute_key_from_params(*args)
142 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
142 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
143
143
144 return final_key
144 return final_key
145
145
146 return generate_key
146 return generate_key
147
147
148
148
149 def get_or_create_region(region_name, region_namespace=None):
149 def get_or_create_region(region_name, region_namespace=None):
150 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
150 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
151 region_obj = region_meta.dogpile_cache_regions.get(region_name)
151 region_obj = region_meta.dogpile_cache_regions.get(region_name)
152 if not region_obj:
152 if not region_obj:
153 raise EnvironmentError(
153 raise EnvironmentError(
154 'Region `{}` not in configured: {}.'.format(
154 'Region `{}` not in configured: {}.'.format(
155 region_name, region_meta.dogpile_cache_regions.keys()))
155 region_name, region_meta.dogpile_cache_regions.keys()))
156
156
157 region_uid_name = '{}:{}'.format(region_name, region_namespace)
157 region_uid_name = '{}:{}'.format(region_name, region_namespace)
158 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
158 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
159 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
159 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
160 if region_exist:
160 if region_exist:
161 log.debug('Using already configured region: %s', region_namespace)
161 log.debug('Using already configured region: %s', region_namespace)
162 return region_exist
162 return region_exist
163 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
163 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
164 expiration_time = region_obj.expiration_time
164 expiration_time = region_obj.expiration_time
165
165
166 if not os.path.isdir(cache_dir):
166 if not os.path.isdir(cache_dir):
167 os.makedirs(cache_dir)
167 os.makedirs(cache_dir)
168 new_region = make_region(
168 new_region = make_region(
169 name=region_uid_name, function_key_generator=key_generator
169 name=region_uid_name, function_key_generator=key_generator
170 )
170 )
171 namespace_filename = os.path.join(
171 namespace_filename = os.path.join(
172 cache_dir, "{}.cache.dbm".format(region_namespace))
172 cache_dir, "{}.cache.dbm".format(region_namespace))
173 # special type that allows 1db per namespace
173 # special type that allows 1db per namespace
174 new_region.configure(
174 new_region.configure(
175 backend='dogpile.cache.rc.file_namespace',
175 backend='dogpile.cache.rc.file_namespace',
176 expiration_time=expiration_time,
176 expiration_time=expiration_time,
177 arguments={"filename": namespace_filename}
177 arguments={"filename": namespace_filename}
178 )
178 )
179
179
180 # create and save in region caches
180 # create and save in region caches
181 log.debug('configuring new region: %s',region_uid_name)
181 log.debug('configuring new region: %s',region_uid_name)
182 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
182 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
183
183
184 return region_obj
184 return region_obj
185
185
186
186
187 def clear_cache_namespace(cache_region, cache_namespace_uid):
187 def clear_cache_namespace(cache_region, cache_namespace_uid):
188 region = get_or_create_region(cache_region, cache_namespace_uid)
188 region = get_or_create_region(cache_region, cache_namespace_uid)
189 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
189 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
190 region.delete_multi(cache_keys)
190 num_delete_keys = len(cache_keys)
191 return len(cache_keys)
191 if num_delete_keys:
192 region.delete_multi(cache_keys)
193 return num_delete_keys
192
194
193
195
194 class ActiveRegionCache(object):
196 class ActiveRegionCache(object):
195 def __init__(self, context):
197 def __init__(self, context):
196 self.context = context
198 self.context = context
197
199
198 def should_invalidate(self):
200 def should_invalidate(self):
199 return False
201 return False
200
202
201
203
202 class FreshRegionCache(object):
204 class FreshRegionCache(object):
203 def __init__(self, context):
205 def __init__(self, context):
204 self.context = context
206 self.context = context
205
207
206 def should_invalidate(self):
208 def should_invalidate(self):
207 return True
209 return True
208
210
209
211
210 class InvalidationContext(object):
212 class InvalidationContext(object):
211 """
213 """
212 usage::
214 usage::
213
215
214 from rhodecode.lib import rc_cache
216 from rhodecode.lib import rc_cache
215
217
216 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
218 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
217 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
219 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
218
220
219 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
221 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
220 def heavy_compute(cache_name, param1, param2):
222 def heavy_compute(cache_name, param1, param2):
221 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
223 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
222
224
223 # invalidation namespace is shared namespace key for all process caches
225 # invalidation namespace is shared namespace key for all process caches
224 # we use it to send a global signal
226 # we use it to send a global signal
225 invalidation_namespace = 'repo_cache:1'
227 invalidation_namespace = 'repo_cache:1'
226
228
227 inv_context_manager = rc_cache.InvalidationContext(
229 inv_context_manager = rc_cache.InvalidationContext(
228 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
230 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
229 with inv_context_manager as invalidation_context:
231 with inv_context_manager as invalidation_context:
230 args = ('one', 'two')
232 args = ('one', 'two')
231 # re-compute and store cache if we get invalidate signal
233 # re-compute and store cache if we get invalidate signal
232 if invalidation_context.should_invalidate():
234 if invalidation_context.should_invalidate():
233 result = heavy_compute.refresh(*args)
235 result = heavy_compute.refresh(*args)
234 else:
236 else:
235 result = heavy_compute(*args)
237 result = heavy_compute(*args)
236
238
237 compute_time = inv_context_manager.compute_time
239 compute_time = inv_context_manager.compute_time
238 log.debug('result computed in %.3fs' ,compute_time)
240 log.debug('result computed in %.3fs' ,compute_time)
239
241
240 # To send global invalidation signal, simply run
242 # To send global invalidation signal, simply run
241 CacheKey.set_invalidate(invalidation_namespace)
243 CacheKey.set_invalidate(invalidation_namespace)
242
244
243 """
245 """
244
246
245 def __repr__(self):
247 def __repr__(self):
246 return '<InvalidationContext:{}[{}]>'.format(
248 return '<InvalidationContext:{}[{}]>'.format(
247 safe_str(self.cache_key), safe_str(self.uid))
249 safe_str(self.cache_key), safe_str(self.uid))
248
250
249 def __init__(self, uid, invalidation_namespace='',
251 def __init__(self, uid, invalidation_namespace='',
250 raise_exception=False, thread_scoped=None):
252 raise_exception=False, thread_scoped=None):
251 self.uid = uid
253 self.uid = uid
252 self.invalidation_namespace = invalidation_namespace
254 self.invalidation_namespace = invalidation_namespace
253 self.raise_exception = raise_exception
255 self.raise_exception = raise_exception
254 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
256 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
255 self.thread_id = 'global'
257 self.thread_id = 'global'
256
258
257 if thread_scoped is None:
259 if thread_scoped is None:
258 # if we set "default" we can override this via .ini settings
260 # if we set "default" we can override this via .ini settings
259 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
261 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
260
262
261 # Append the thread id to the cache key if this invalidation context
263 # Append the thread id to the cache key if this invalidation context
262 # should be scoped to the current thread.
264 # should be scoped to the current thread.
263 if thread_scoped is True:
265 if thread_scoped is True:
264 self.thread_id = threading.current_thread().ident
266 self.thread_id = threading.current_thread().ident
265
267
266 self.cache_key = compute_key_from_params(uid)
268 self.cache_key = compute_key_from_params(uid)
267 self.cache_key = 'proc:{}_thread:{}_{}'.format(
269 self.cache_key = 'proc:{}_thread:{}_{}'.format(
268 self.proc_id, self.thread_id, self.cache_key)
270 self.proc_id, self.thread_id, self.cache_key)
269 self.compute_time = 0
271 self.compute_time = 0
270
272
271 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
273 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
272 cache_obj = CacheKey.get_active_cache(self.cache_key)
274 cache_obj = CacheKey.get_active_cache(self.cache_key)
273 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
275 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
274 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
276 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
275 if not cache_obj:
277 if not cache_obj:
276 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
278 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
277 return cache_obj
279 return cache_obj
278
280
279 def __enter__(self):
281 def __enter__(self):
280 """
282 """
281 Test if current object is valid, and return CacheRegion function
283 Test if current object is valid, and return CacheRegion function
282 that does invalidation and calculation
284 that does invalidation and calculation
283 """
285 """
284 # register or get a new key based on uid
286 # register or get a new key based on uid
285 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
287 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
286 self._start_time = time.time()
288 self._start_time = time.time()
287 if self.cache_obj.cache_active:
289 if self.cache_obj.cache_active:
288 # means our cache obj is existing and marked as it's
290 # means our cache obj is existing and marked as it's
289 # cache is not outdated, we return ActiveRegionCache
291 # cache is not outdated, we return ActiveRegionCache
290 self.skip_cache_active_change = True
292 self.skip_cache_active_change = True
291
293
292 return ActiveRegionCache(context=self)
294 return ActiveRegionCache(context=self)
293
295
294 # the key is either not existing or set to False, we return
296 # the key is either not existing or set to False, we return
295 # the real invalidator which re-computes value. We additionally set
297 # the real invalidator which re-computes value. We additionally set
296 # the flag to actually update the Database objects
298 # the flag to actually update the Database objects
297 self.skip_cache_active_change = False
299 self.skip_cache_active_change = False
298 return FreshRegionCache(context=self)
300 return FreshRegionCache(context=self)
299
301
300 def __exit__(self, exc_type, exc_val, exc_tb):
302 def __exit__(self, exc_type, exc_val, exc_tb):
301 # save compute time
303 # save compute time
302 self.compute_time = time.time() - self._start_time
304 self.compute_time = time.time() - self._start_time
303
305
304 if self.skip_cache_active_change:
306 if self.skip_cache_active_change:
305 return
307 return
306
308
307 try:
309 try:
308 self.cache_obj.cache_active = True
310 self.cache_obj.cache_active = True
309 Session().add(self.cache_obj)
311 Session().add(self.cache_obj)
310 Session().commit()
312 Session().commit()
311 except IntegrityError:
313 except IntegrityError:
312 # if we catch integrity error, it means we inserted this object
314 # if we catch integrity error, it means we inserted this object
313 # assumption is that's really an edge race-condition case and
315 # assumption is that's really an edge race-condition case and
314 # it's safe is to skip it
316 # it's safe is to skip it
315 Session().rollback()
317 Session().rollback()
316 except Exception:
318 except Exception:
317 log.exception('Failed to commit on cache key update')
319 log.exception('Failed to commit on cache key update')
318 Session().rollback()
320 Session().rollback()
319 if self.raise_exception:
321 if self.raise_exception:
320 raise
322 raise
General Comments 0
You need to be logged in to leave comments. Login now