##// END OF EJS Templates
caches: use a better logic to clear cache keys
super-admin -
r4845:58a1f157 default
parent child Browse files
Show More
@@ -1,45 +1,51 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import atexit
23 23 import logging
24 import signal
24 25
25 26 log = logging.getLogger(__name__)
26 27
27 cache_keys_by_pid = []
28 cache_keys_by_pid = set()
28 29
29 30
30 @atexit.register
31 def free_cache_keys():
31 def free_cache_keys(*args):
32 32 ssh_cmd = os.environ.get('RC_CMD_SSH_WRAPPER')
33 33 if ssh_cmd:
34 34 return
35 35
36 36 from rhodecode.model.db import Session, CacheKey
37 37 log.info('Clearing %s cache keys', len(cache_keys_by_pid))
38 38
39 39 if cache_keys_by_pid:
40 40 try:
41 for cache_key in cache_keys_by_pid:
42 CacheKey.query().filter(CacheKey.cache_key == cache_key).delete()
41 for cache_proc in cache_keys_by_pid:
42 CacheKey.query().filter(CacheKey.cache_key.startswith(cache_proc)).delete()
43 43 Session().commit()
44 cache_keys_by_pid.clear()
44 45 except Exception:
45 46 log.warn('Failed to clear keys, exiting gracefully')
47
48
49 atexit.register(free_cache_keys)
50 signal.signal(signal.SIGTERM, free_cache_keys)
51 signal.signal(signal.SIGINT, free_cache_keys)
@@ -1,422 +1,423 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 24 import threading
25 25
26 26 from dogpile.cache import CacheRegion
27 27 from dogpile.cache.util import compat
28 28
29 29 import rhodecode
30 30 from rhodecode.lib.utils import safe_str, sha1
31 31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 33
34 34 from rhodecode.lib.rc_cache import cache_key_meta
35 35 from rhodecode.lib.rc_cache import region_meta
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def isCython(func):
41 41 """
42 42 Private helper that checks if a function is a cython function.
43 43 """
44 44 return func.__class__.__name__ == 'cython_function_or_method'
45 45
46 46
47 47 class RhodeCodeCacheRegion(CacheRegion):
48 48
49 49 def conditional_cache_on_arguments(
50 50 self, namespace=None,
51 51 expiration_time=None,
52 52 should_cache_fn=None,
53 53 to_str=compat.string_type,
54 54 function_key_generator=None,
55 55 condition=True):
56 56 """
57 57 Custom conditional decorator, that will not touch any dogpile internals if
58 58 condition isn't meet. This works a bit different than should_cache_fn
59 59 And it's faster in cases we don't ever want to compute cached values
60 60 """
61 61 expiration_time_is_callable = compat.callable(expiration_time)
62 62
63 63 if function_key_generator is None:
64 64 function_key_generator = self.function_key_generator
65 65
66 66 # workaround for py2 and cython problems, this block should be removed
67 67 # once we've migrated to py3
68 68 if 'cython' == 'cython':
69 69 def decorator(fn):
70 70 if to_str is compat.string_type:
71 71 # backwards compatible
72 72 key_generator = function_key_generator(namespace, fn)
73 73 else:
74 74 key_generator = function_key_generator(namespace, fn, to_str=to_str)
75 75
76 76 @functools.wraps(fn)
77 77 def decorate(*arg, **kw):
78 78 key = key_generator(*arg, **kw)
79 79
80 80 @functools.wraps(fn)
81 81 def creator():
82 82 return fn(*arg, **kw)
83 83
84 84 if not condition:
85 85 return creator()
86 86
87 87 timeout = expiration_time() if expiration_time_is_callable \
88 88 else expiration_time
89 89
90 90 return self.get_or_create(key, creator, timeout, should_cache_fn)
91 91
92 92 def invalidate(*arg, **kw):
93 93 key = key_generator(*arg, **kw)
94 94 self.delete(key)
95 95
96 96 def set_(value, *arg, **kw):
97 97 key = key_generator(*arg, **kw)
98 98 self.set(key, value)
99 99
100 100 def get(*arg, **kw):
101 101 key = key_generator(*arg, **kw)
102 102 return self.get(key)
103 103
104 104 def refresh(*arg, **kw):
105 105 key = key_generator(*arg, **kw)
106 106 value = fn(*arg, **kw)
107 107 self.set(key, value)
108 108 return value
109 109
110 110 decorate.set = set_
111 111 decorate.invalidate = invalidate
112 112 decorate.refresh = refresh
113 113 decorate.get = get
114 114 decorate.original = fn
115 115 decorate.key_generator = key_generator
116 116 decorate.__wrapped__ = fn
117 117
118 118 return decorate
119 119 return decorator
120 120
121 121 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
122 122
123 123 if not condition:
124 124 log.debug('Calling un-cached method:%s', user_func.func_name)
125 125 start = time.time()
126 126 result = user_func(*arg, **kw)
127 127 total = time.time() - start
128 128 log.debug('un-cached method:%s took %.4fs', user_func.func_name, total)
129 129 return result
130 130
131 131 key = key_generator(*arg, **kw)
132 132
133 133 timeout = expiration_time() if expiration_time_is_callable \
134 134 else expiration_time
135 135
136 136 log.debug('Calling cached method:`%s`', user_func.func_name)
137 137 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
138 138
139 139 def cache_decorator(user_func):
140 140 if to_str is compat.string_type:
141 141 # backwards compatible
142 142 key_generator = function_key_generator(namespace, user_func)
143 143 else:
144 144 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
145 145
146 146 def refresh(*arg, **kw):
147 147 """
148 148 Like invalidate, but regenerates the value instead
149 149 """
150 150 key = key_generator(*arg, **kw)
151 151 value = user_func(*arg, **kw)
152 152 self.set(key, value)
153 153 return value
154 154
155 155 def invalidate(*arg, **kw):
156 156 key = key_generator(*arg, **kw)
157 157 self.delete(key)
158 158
159 159 def set_(value, *arg, **kw):
160 160 key = key_generator(*arg, **kw)
161 161 self.set(key, value)
162 162
163 163 def get(*arg, **kw):
164 164 key = key_generator(*arg, **kw)
165 165 return self.get(key)
166 166
167 167 user_func.set = set_
168 168 user_func.invalidate = invalidate
169 169 user_func.get = get
170 170 user_func.refresh = refresh
171 171 user_func.key_generator = key_generator
172 172 user_func.original = user_func
173 173
174 174 # Use `decorate` to preserve the signature of :param:`user_func`.
175 175 return decorator.decorate(user_func, functools.partial(
176 176 get_or_create_for_user_func, key_generator))
177 177
178 178 return cache_decorator
179 179
180 180
181 181 def make_region(*arg, **kw):
182 182 return RhodeCodeCacheRegion(*arg, **kw)
183 183
184 184
185 185 def get_default_cache_settings(settings, prefixes=None):
186 186 prefixes = prefixes or []
187 187 cache_settings = {}
188 188 for key in settings.keys():
189 189 for prefix in prefixes:
190 190 if key.startswith(prefix):
191 191 name = key.split(prefix)[1].strip()
192 192 val = settings[key]
193 193 if isinstance(val, compat.string_types):
194 194 val = val.strip()
195 195 cache_settings[name] = val
196 196 return cache_settings
197 197
198 198
199 199 def compute_key_from_params(*args):
200 200 """
201 201 Helper to compute key from given params to be used in cache manager
202 202 """
203 203 return sha1("_".join(map(safe_str, args)))
204 204
205 205
206 206 def backend_key_generator(backend):
207 207 """
208 208 Special wrapper that also sends over the backend to the key generator
209 209 """
210 210 def wrapper(namespace, fn):
211 211 return key_generator(backend, namespace, fn)
212 212 return wrapper
213 213
214 214
215 215 def key_generator(backend, namespace, fn):
216 216 fname = fn.__name__
217 217
218 218 def generate_key(*args):
219 219 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
220 220 namespace_pref = namespace or 'default_namespace'
221 221 arg_key = compute_key_from_params(*args)
222 222 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
223 223
224 224 return final_key
225 225
226 226 return generate_key
227 227
228 228
229 229 def get_or_create_region(region_name, region_namespace=None):
230 230 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
231 231 region_obj = region_meta.dogpile_cache_regions.get(region_name)
232 232 if not region_obj:
233 233 raise EnvironmentError(
234 234 'Region `{}` not in configured: {}.'.format(
235 235 region_name, region_meta.dogpile_cache_regions.keys()))
236 236
237 237 region_uid_name = '{}:{}'.format(region_name, region_namespace)
238 238 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
239 239 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
240 240 if region_exist:
241 241 log.debug('Using already configured region: %s', region_namespace)
242 242 return region_exist
243 243 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
244 244 expiration_time = region_obj.expiration_time
245 245
246 246 if not os.path.isdir(cache_dir):
247 247 os.makedirs(cache_dir)
248 248 new_region = make_region(
249 249 name=region_uid_name,
250 250 function_key_generator=backend_key_generator(region_obj.actual_backend)
251 251 )
252 252 namespace_filename = os.path.join(
253 253 cache_dir, "{}.cache.dbm".format(region_namespace))
254 254 # special type that allows 1db per namespace
255 255 new_region.configure(
256 256 backend='dogpile.cache.rc.file_namespace',
257 257 expiration_time=expiration_time,
258 258 arguments={"filename": namespace_filename}
259 259 )
260 260
261 261 # create and save in region caches
262 262 log.debug('configuring new region: %s', region_uid_name)
263 263 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
264 264
265 265 return region_obj
266 266
267 267
268 268 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
269 269 region = get_or_create_region(cache_region, cache_namespace_uid)
270 270 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
271 271 num_delete_keys = len(cache_keys)
272 272 if invalidate:
273 273 region.invalidate(hard=False)
274 274 else:
275 275 if num_delete_keys:
276 276 region.delete_multi(cache_keys)
277 277 return num_delete_keys
278 278
279 279
280 280 class ActiveRegionCache(object):
281 281 def __init__(self, context, cache_data):
282 282 self.context = context
283 283 self.cache_data = cache_data
284 284
285 285 def should_invalidate(self):
286 286 return False
287 287
288 288
289 289 class FreshRegionCache(object):
290 290 def __init__(self, context, cache_data):
291 291 self.context = context
292 292 self.cache_data = cache_data
293 293
294 294 def should_invalidate(self):
295 295 return True
296 296
297 297
298 298 class InvalidationContext(object):
299 299 """
300 300 usage::
301 301
302 302 from rhodecode.lib import rc_cache
303 303
304 304 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
305 305 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
306 306
307 307 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
308 308 def heavy_compute(cache_name, param1, param2):
309 309 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
310 310
311 311 # invalidation namespace is shared namespace key for all process caches
312 312 # we use it to send a global signal
313 313 invalidation_namespace = 'repo_cache:1'
314 314
315 315 inv_context_manager = rc_cache.InvalidationContext(
316 316 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
317 317 with inv_context_manager as invalidation_context:
318 318 args = ('one', 'two')
319 319 # re-compute and store cache if we get invalidate signal
320 320 if invalidation_context.should_invalidate():
321 321 result = heavy_compute.refresh(*args)
322 322 else:
323 323 result = heavy_compute(*args)
324 324
325 325 compute_time = inv_context_manager.compute_time
326 326 log.debug('result computed in %.4fs', compute_time)
327 327
328 328 # To send global invalidation signal, simply run
329 329 CacheKey.set_invalidate(invalidation_namespace)
330 330
331 331 """
332 332
333 333 def __repr__(self):
334 334 return '<InvalidationContext:{}[{}]>'.format(
335 335 safe_str(self.cache_key), safe_str(self.uid))
336 336
337 337 def __init__(self, uid, invalidation_namespace='',
338 338 raise_exception=False, thread_scoped=None):
339 339 self.uid = uid
340 340 self.invalidation_namespace = invalidation_namespace
341 341 self.raise_exception = raise_exception
342 342 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
343 343 self.thread_id = 'global'
344 344
345 345 if thread_scoped is None:
346 346 # if we set "default" we can override this via .ini settings
347 347 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
348 348
349 349 # Append the thread id to the cache key if this invalidation context
350 350 # should be scoped to the current thread.
351 351 if thread_scoped is True:
352 352 self.thread_id = threading.current_thread().ident
353 353
354 354 self.cache_key = compute_key_from_params(uid)
355 355 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
356 356 self.proc_id, self.thread_id, self.cache_key)
357 self.proc_key = 'proc:{}'.format(self.proc_id)
357 358 self.compute_time = 0
358 359
359 360 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
360 361 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
361 362 # fetch all cache keys for this namespace and convert them to a map to find if we
362 363 # have specific cache_key object registered. We do this because we want to have
363 364 # all consistent cache_state_uid for newly registered objects
364 365 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
365 366 cache_obj = cache_obj_map.get(self.cache_key)
366 367 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
367 368 if not cache_obj:
368 369 new_cache_args = invalidation_namespace
369 370 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
370 371 cache_state_uid = None
371 372 if first_cache_obj:
372 373 cache_state_uid = first_cache_obj.cache_state_uid
373 374 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
374 375 cache_state_uid=cache_state_uid)
375 cache_key_meta.cache_keys_by_pid.append(self.cache_key)
376 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
376 377
377 378 return cache_obj
378 379
379 380 def __enter__(self):
380 381 """
381 382 Test if current object is valid, and return CacheRegion function
382 383 that does invalidation and calculation
383 384 """
384 385 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
385 386 # register or get a new key based on uid
386 387 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
387 388 cache_data = self.cache_obj.get_dict()
388 389 self._start_time = time.time()
389 390 if self.cache_obj.cache_active:
390 391 # means our cache obj is existing and marked as it's
391 392 # cache is not outdated, we return ActiveRegionCache
392 393 self.skip_cache_active_change = True
393 394
394 395 return ActiveRegionCache(context=self, cache_data=cache_data)
395 396
396 397 # the key is either not existing or set to False, we return
397 398 # the real invalidator which re-computes value. We additionally set
398 399 # the flag to actually update the Database objects
399 400 self.skip_cache_active_change = False
400 401 return FreshRegionCache(context=self, cache_data=cache_data)
401 402
402 403 def __exit__(self, exc_type, exc_val, exc_tb):
403 404 # save compute time
404 405 self.compute_time = time.time() - self._start_time
405 406
406 407 if self.skip_cache_active_change:
407 408 return
408 409
409 410 try:
410 411 self.cache_obj.cache_active = True
411 412 Session().add(self.cache_obj)
412 413 Session().commit()
413 414 except IntegrityError:
414 415 # if we catch integrity error, it means we inserted this object
415 416 # assumption is that's really an edge race-condition case and
416 417 # it's safe is to skip it
417 418 Session().rollback()
418 419 except Exception:
419 420 log.exception('Failed to commit on cache key update')
420 421 Session().rollback()
421 422 if self.raise_exception:
422 423 raise
General Comments 0
You need to be logged in to leave comments. Login now