##// END OF EJS Templates
cache-keys: register and self cleanup cache keys used for invalidation to prevent leaking lot of them into DB on worker recycle
marcink -
r3890:2f6d1dda default
parent child Browse files
Show More
@@ -0,0 +1,35 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import atexit
22 import logging
23
24 log = logging.getLogger(__name__)
25
26 cache_keys_by_pid = []
27
28
29 @atexit.register
30 def free_cache_keys():
31 from rhodecode.model.db import Session, CacheKey
32 log.info('Clearing %s cache keys', len(cache_keys_by_pid))
33 for cache_key in cache_keys_by_pid:
34 CacheKey.query().filter(CacheKey.cache_key == cache_key).delete()
35 Session().commit()
@@ -1,352 +1,355 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 24 from decorator import decorate
25 25 import threading
26 26
27 27 from dogpile.cache import CacheRegion
28 28 from dogpile.cache.util import compat
29 29
30 30 import rhodecode
31 31 from rhodecode.lib.utils import safe_str, sha1
32 32 from rhodecode.lib.utils2 import safe_unicode, str2bool
33 33 from rhodecode.model.db import Session, CacheKey, IntegrityError
34 34
35 from . import region_meta
35 from rhodecode.lib.rc_cache import cache_key_meta
36 from rhodecode.lib.rc_cache import region_meta
36 37
37 38 log = logging.getLogger(__name__)
38 39
39 40
40 41 class RhodeCodeCacheRegion(CacheRegion):
41 42
42 43 def conditional_cache_on_arguments(
43 44 self, namespace=None,
44 45 expiration_time=None,
45 46 should_cache_fn=None,
46 47 to_str=compat.string_type,
47 48 function_key_generator=None,
48 49 condition=True):
49 50 """
50 51 Custom conditional decorator, that will not touch any dogpile internals if
51 52 condition isn't meet. This works a bit different than should_cache_fn
52 53 And it's faster in cases we don't ever want to compute cached values
53 54 """
54 55 expiration_time_is_callable = compat.callable(expiration_time)
55 56
56 57 if function_key_generator is None:
57 58 function_key_generator = self.function_key_generator
58 59
59 60 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
60 61
61 62 if not condition:
62 63 log.debug('Calling un-cached func:%s', user_func)
63 64 return user_func(*arg, **kw)
64 65
65 66 key = key_generator(*arg, **kw)
66 67
67 68 timeout = expiration_time() if expiration_time_is_callable \
68 69 else expiration_time
69 70
70 71 log.debug('Calling cached fn:%s', user_func)
71 72 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
72 73
73 74 def cache_decorator(user_func):
74 75 if to_str is compat.string_type:
75 76 # backwards compatible
76 77 key_generator = function_key_generator(namespace, user_func)
77 78 else:
78 79 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
79 80
80 81 def refresh(*arg, **kw):
81 82 """
82 83 Like invalidate, but regenerates the value instead
83 84 """
84 85 key = key_generator(*arg, **kw)
85 86 value = user_func(*arg, **kw)
86 87 self.set(key, value)
87 88 return value
88 89
89 90 def invalidate(*arg, **kw):
90 91 key = key_generator(*arg, **kw)
91 92 self.delete(key)
92 93
93 94 def set_(value, *arg, **kw):
94 95 key = key_generator(*arg, **kw)
95 96 self.set(key, value)
96 97
97 98 def get(*arg, **kw):
98 99 key = key_generator(*arg, **kw)
99 100 return self.get(key)
100 101
101 102 user_func.set = set_
102 103 user_func.invalidate = invalidate
103 104 user_func.get = get
104 105 user_func.refresh = refresh
105 106 user_func.key_generator = key_generator
106 107 user_func.original = user_func
107 108
108 109 # Use `decorate` to preserve the signature of :param:`user_func`.
109 110
110 111 return decorate(user_func, functools.partial(
111 112 get_or_create_for_user_func, key_generator))
112 113
113 114 return cache_decorator
114 115
115 116
116 117 def make_region(*arg, **kw):
117 118 return RhodeCodeCacheRegion(*arg, **kw)
118 119
119 120
120 121 def get_default_cache_settings(settings, prefixes=None):
121 122 prefixes = prefixes or []
122 123 cache_settings = {}
123 124 for key in settings.keys():
124 125 for prefix in prefixes:
125 126 if key.startswith(prefix):
126 127 name = key.split(prefix)[1].strip()
127 128 val = settings[key]
128 129 if isinstance(val, compat.string_types):
129 130 val = val.strip()
130 131 cache_settings[name] = val
131 132 return cache_settings
132 133
133 134
134 135 def compute_key_from_params(*args):
135 136 """
136 137 Helper to compute key from given params to be used in cache manager
137 138 """
138 139 return sha1("_".join(map(safe_str, args)))
139 140
140 141
141 142 def backend_key_generator(backend):
142 143 """
143 144 Special wrapper that also sends over the backend to the key generator
144 145 """
145 146 def wrapper(namespace, fn):
146 147 return key_generator(backend, namespace, fn)
147 148 return wrapper
148 149
149 150
150 151 def key_generator(backend, namespace, fn):
151 152 fname = fn.__name__
152 153
153 154 def generate_key(*args):
154 155 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
155 156 namespace_pref = namespace or 'default_namespace'
156 157 arg_key = compute_key_from_params(*args)
157 158 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
158 159
159 160 return final_key
160 161
161 162 return generate_key
162 163
163 164
164 165 def get_or_create_region(region_name, region_namespace=None):
165 166 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
166 167 region_obj = region_meta.dogpile_cache_regions.get(region_name)
167 168 if not region_obj:
168 169 raise EnvironmentError(
169 170 'Region `{}` not in configured: {}.'.format(
170 171 region_name, region_meta.dogpile_cache_regions.keys()))
171 172
172 173 region_uid_name = '{}:{}'.format(region_name, region_namespace)
173 174 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
174 175 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
175 176 if region_exist:
176 177 log.debug('Using already configured region: %s', region_namespace)
177 178 return region_exist
178 179 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
179 180 expiration_time = region_obj.expiration_time
180 181
181 182 if not os.path.isdir(cache_dir):
182 183 os.makedirs(cache_dir)
183 184 new_region = make_region(
184 185 name=region_uid_name,
185 186 function_key_generator=backend_key_generator(region_obj.actual_backend)
186 187 )
187 188 namespace_filename = os.path.join(
188 189 cache_dir, "{}.cache.dbm".format(region_namespace))
189 190 # special type that allows 1db per namespace
190 191 new_region.configure(
191 192 backend='dogpile.cache.rc.file_namespace',
192 193 expiration_time=expiration_time,
193 194 arguments={"filename": namespace_filename}
194 195 )
195 196
196 197 # create and save in region caches
197 198 log.debug('configuring new region: %s', region_uid_name)
198 199 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
199 200
200 201 return region_obj
201 202
202 203
203 204 def clear_cache_namespace(cache_region, cache_namespace_uid):
204 205 region = get_or_create_region(cache_region, cache_namespace_uid)
205 206 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
206 207 num_delete_keys = len(cache_keys)
207 208 if num_delete_keys:
208 209 region.delete_multi(cache_keys)
209 210 return num_delete_keys
210 211
211 212
212 213 class ActiveRegionCache(object):
213 214 def __init__(self, context, cache_data):
214 215 self.context = context
215 216 self.cache_data = cache_data
216 217
217 218 def should_invalidate(self):
218 219 return False
219 220
220 221
221 222 class FreshRegionCache(object):
222 223 def __init__(self, context, cache_data):
223 224 self.context = context
224 225 self.cache_data = cache_data
225 226
226 227 def should_invalidate(self):
227 228 return True
228 229
229 230
230 231 class InvalidationContext(object):
231 232 """
232 233 usage::
233 234
234 235 from rhodecode.lib import rc_cache
235 236
236 237 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
237 238 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
238 239
239 240 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
240 241 def heavy_compute(cache_name, param1, param2):
241 242 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
242 243
243 244 # invalidation namespace is shared namespace key for all process caches
244 245 # we use it to send a global signal
245 246 invalidation_namespace = 'repo_cache:1'
246 247
247 248 inv_context_manager = rc_cache.InvalidationContext(
248 249 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
249 250 with inv_context_manager as invalidation_context:
250 251 args = ('one', 'two')
251 252 # re-compute and store cache if we get invalidate signal
252 253 if invalidation_context.should_invalidate():
253 254 result = heavy_compute.refresh(*args)
254 255 else:
255 256 result = heavy_compute(*args)
256 257
257 258 compute_time = inv_context_manager.compute_time
258 259 log.debug('result computed in %.4fs', compute_time)
259 260
260 261 # To send global invalidation signal, simply run
261 262 CacheKey.set_invalidate(invalidation_namespace)
262 263
263 264 """
264 265
265 266 def __repr__(self):
266 267 return '<InvalidationContext:{}[{}]>'.format(
267 268 safe_str(self.cache_key), safe_str(self.uid))
268 269
269 270 def __init__(self, uid, invalidation_namespace='',
270 271 raise_exception=False, thread_scoped=None):
271 272 self.uid = uid
272 273 self.invalidation_namespace = invalidation_namespace
273 274 self.raise_exception = raise_exception
274 275 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
275 276 self.thread_id = 'global'
276 277
277 278 if thread_scoped is None:
278 279 # if we set "default" we can override this via .ini settings
279 280 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
280 281
281 282 # Append the thread id to the cache key if this invalidation context
282 283 # should be scoped to the current thread.
283 284 if thread_scoped is True:
284 285 self.thread_id = threading.current_thread().ident
285 286
286 287 self.cache_key = compute_key_from_params(uid)
287 288 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
288 289 self.proc_id, self.thread_id, self.cache_key)
289 290 self.compute_time = 0
290 291
291 292 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
292 293 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
293 294 # fetch all cache keys for this namespace and convert them to a map to find if we
294 295 # have specific cache_key object registered. We do this because we want to have
295 296 # all consistent cache_state_uid for newly registered objects
296 297 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
297 298 cache_obj = cache_obj_map.get(self.cache_key)
298 299 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
299 300 if not cache_obj:
300 301 new_cache_args = invalidation_namespace
301 302 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
302 303 cache_state_uid = None
303 304 if first_cache_obj:
304 305 cache_state_uid = first_cache_obj.cache_state_uid
305 306 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
306 307 cache_state_uid=cache_state_uid)
308 cache_key_meta.cache_keys_by_pid.append(self.cache_key)
309
307 310 return cache_obj
308 311
309 312 def __enter__(self):
310 313 """
311 314 Test if current object is valid, and return CacheRegion function
312 315 that does invalidation and calculation
313 316 """
314 317 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
315 318 # register or get a new key based on uid
316 319 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
317 320 cache_data = self.cache_obj.get_dict()
318 321 self._start_time = time.time()
319 322 if self.cache_obj.cache_active:
320 323 # means our cache obj is existing and marked as it's
321 324 # cache is not outdated, we return ActiveRegionCache
322 325 self.skip_cache_active_change = True
323 326
324 327 return ActiveRegionCache(context=self, cache_data=cache_data)
325 328
326 329 # the key is either not existing or set to False, we return
327 330 # the real invalidator which re-computes value. We additionally set
328 331 # the flag to actually update the Database objects
329 332 self.skip_cache_active_change = False
330 333 return FreshRegionCache(context=self, cache_data=cache_data)
331 334
332 335 def __exit__(self, exc_type, exc_val, exc_tb):
333 336 # save compute time
334 337 self.compute_time = time.time() - self._start_time
335 338
336 339 if self.skip_cache_active_change:
337 340 return
338 341
339 342 try:
340 343 self.cache_obj.cache_active = True
341 344 Session().add(self.cache_obj)
342 345 Session().commit()
343 346 except IntegrityError:
344 347 # if we catch integrity error, it means we inserted this object
345 348 # assumption is that's really an edge race-condition case and
346 349 # it's safe is to skip it
347 350 Session().rollback()
348 351 except Exception:
349 352 log.exception('Failed to commit on cache key update')
350 353 Session().rollback()
351 354 if self.raise_exception:
352 355 raise
General Comments 0
You need to be logged in to leave comments. Login now