##// END OF EJS Templates
fix(caches): removed cacheKey cleanup logic as its proven to fail, and is not reliable.
super-admin -
r5287:486bcf43 default
parent child Browse files
Show More
@@ -1,404 +1,403 b''
1 1 # Copyright (C) 2015-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import functools
20 20 import logging
21 21 import os
22 22 import threading
23 23 import time
24 24
25 25 import decorator
26 26 from dogpile.cache import CacheRegion
27 27
28 28 import rhodecode
29 29 from rhodecode.lib.hash_utils import sha1
30 30 from rhodecode.lib.str_utils import safe_bytes
31 31 from rhodecode.lib.type_utils import str2bool # noqa :required by imports from .utils
32 32
33 from . import region_meta, cache_key_meta
33 from . import region_meta
34 34
35 35 log = logging.getLogger(__name__)
36 36
37 37
38 38 def isCython(func):
39 39 """
40 40 Private helper that checks if a function is a cython function.
41 41 """
42 42 return func.__class__.__name__ == 'cython_function_or_method'
43 43
44 44
45 45 class RhodeCodeCacheRegion(CacheRegion):
46 46
47 47 def __repr__(self):
48 48 return f'{self.__class__}(name={self.name})'
49 49
50 50 def conditional_cache_on_arguments(
51 51 self, namespace=None,
52 52 expiration_time=None,
53 53 should_cache_fn=None,
54 54 to_str=str,
55 55 function_key_generator=None,
56 56 condition=True):
57 57 """
58 58 Custom conditional decorator, that will not touch any dogpile internals if
59 59 condition isn't meet. This works a bit different from should_cache_fn
60 60 And it's faster in cases we don't ever want to compute cached values
61 61 """
62 62 expiration_time_is_callable = callable(expiration_time)
63 63 if not namespace:
64 64 namespace = getattr(self, '_default_namespace', None)
65 65
66 66 if function_key_generator is None:
67 67 function_key_generator = self.function_key_generator
68 68
69 69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
70 70
71 71 if not condition:
72 72 log.debug('Calling un-cached method:%s', user_func.__name__)
73 73 start = time.time()
74 74 result = user_func(*arg, **kw)
75 75 total = time.time() - start
76 76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
77 77 return result
78 78
79 79 key = func_key_generator(*arg, **kw)
80 80
81 81 timeout = expiration_time() if expiration_time_is_callable \
82 82 else expiration_time
83 83
84 84 log.debug('Calling cached method:`%s`', user_func.__name__)
85 85 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
86 86
87 87 def cache_decorator(user_func):
88 88 if to_str is str:
89 89 # backwards compatible
90 90 key_generator = function_key_generator(namespace, user_func)
91 91 else:
92 92 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
93 93
94 94 def refresh(*arg, **kw):
95 95 """
96 96 Like invalidate, but regenerates the value instead
97 97 """
98 98 key = key_generator(*arg, **kw)
99 99 value = user_func(*arg, **kw)
100 100 self.set(key, value)
101 101 return value
102 102
103 103 def invalidate(*arg, **kw):
104 104 key = key_generator(*arg, **kw)
105 105 self.delete(key)
106 106
107 107 def set_(value, *arg, **kw):
108 108 key = key_generator(*arg, **kw)
109 109 self.set(key, value)
110 110
111 111 def get(*arg, **kw):
112 112 key = key_generator(*arg, **kw)
113 113 return self.get(key)
114 114
115 115 user_func.set = set_
116 116 user_func.invalidate = invalidate
117 117 user_func.get = get
118 118 user_func.refresh = refresh
119 119 user_func.key_generator = key_generator
120 120 user_func.original = user_func
121 121
122 122 # Use `decorate` to preserve the signature of :param:`user_func`.
123 123 return decorator.decorate(user_func, functools.partial(
124 124 get_or_create_for_user_func, key_generator))
125 125
126 126 return cache_decorator
127 127
128 128
129 129 def make_region(*arg, **kw):
130 130 return RhodeCodeCacheRegion(*arg, **kw)
131 131
132 132
133 133 def get_default_cache_settings(settings, prefixes=None):
134 134 prefixes = prefixes or []
135 135 cache_settings = {}
136 136 for key in settings.keys():
137 137 for prefix in prefixes:
138 138 if key.startswith(prefix):
139 139 name = key.split(prefix)[1].strip()
140 140 val = settings[key]
141 141 if isinstance(val, str):
142 142 val = val.strip()
143 143 cache_settings[name] = val
144 144 return cache_settings
145 145
146 146
147 147 def compute_key_from_params(*args):
148 148 """
149 149 Helper to compute key from given params to be used in cache manager
150 150 """
151 151 return sha1(safe_bytes("_".join(map(str, args))))
152 152
153 153
154 154 def custom_key_generator(backend, namespace, fn):
155 155 func_name = fn.__name__
156 156
157 157 def generate_key(*args):
158 158 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
159 159 namespace_pref = namespace or 'default_namespace'
160 160 arg_key = compute_key_from_params(*args)
161 161 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
162 162
163 163 return final_key
164 164
165 165 return generate_key
166 166
167 167
168 168 def backend_key_generator(backend):
169 169 """
170 170 Special wrapper that also sends over the backend to the key generator
171 171 """
172 172 def wrapper(namespace, fn):
173 173 return custom_key_generator(backend, namespace, fn)
174 174 return wrapper
175 175
176 176
177 177 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
178 178 from .backends import FileNamespaceBackend
179 179 from . import async_creation_runner
180 180
181 181 region_obj = region_meta.dogpile_cache_regions.get(region_name)
182 182 if not region_obj:
183 183 reg_keys = list(region_meta.dogpile_cache_regions.keys())
184 184 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
185 185
186 186 region_uid_name = f'{region_name}:{region_namespace}'
187 187
188 188 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
189 189 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
190 190 if not region_namespace:
191 191 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
192 192
193 193 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
194 194 if region_exist:
195 195 log.debug('Using already configured region: %s', region_namespace)
196 196 return region_exist
197 197
198 198 expiration_time = region_obj.expiration_time
199 199
200 200 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
201 201 namespace_cache_dir = cache_dir
202 202
203 203 # we default the namespace_cache_dir to our default cache dir.
204 204 # however, if this backend is configured with filename= param, we prioritize that
205 205 # so all caches within that particular region, even those namespaced end up in the same path
206 206 if region_obj.actual_backend.filename:
207 207 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
208 208
209 209 if not os.path.isdir(namespace_cache_dir):
210 210 os.makedirs(namespace_cache_dir)
211 211 new_region = make_region(
212 212 name=region_uid_name,
213 213 function_key_generator=backend_key_generator(region_obj.actual_backend)
214 214 )
215 215
216 216 namespace_filename = os.path.join(
217 217 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
218 218 # special type that allows 1db per namespace
219 219 new_region.configure(
220 220 backend='dogpile.cache.rc.file_namespace',
221 221 expiration_time=expiration_time,
222 222 arguments={"filename": namespace_filename}
223 223 )
224 224
225 225 # create and save in region caches
226 226 log.debug('configuring new region: %s', region_uid_name)
227 227 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
228 228
229 229 region_obj._default_namespace = region_namespace
230 230 if use_async_runner:
231 231 region_obj.async_creation_runner = async_creation_runner
232 232 return region_obj
233 233
234 234
235 235 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
236 236 from . import CLEAR_DELETE, CLEAR_INVALIDATE
237 237
238 238 if not isinstance(cache_region, RhodeCodeCacheRegion):
239 239 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
240 240 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
241 241 cache_region, cache_namespace_uid, method)
242 242
243 243 num_affected_keys = 0
244 244
245 245 if method == CLEAR_INVALIDATE:
246 246 # NOTE: The CacheRegion.invalidate() method’s default mode of
247 247 # operation is to set a timestamp local to this CacheRegion in this Python process only.
248 248 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
249 249 cache_region.invalidate(hard=True)
250 250
251 251 if method == CLEAR_DELETE:
252 252 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
253 253
254 254 return num_affected_keys
255 255
256 256
257 257 class ActiveRegionCache(object):
258 258 def __init__(self, context, cache_data):
259 259 self.context = context
260 260 self.cache_data = cache_data
261 261
262 262 def should_invalidate(self):
263 263 return False
264 264
265 265
266 266 class FreshRegionCache(object):
267 267 def __init__(self, context, cache_data):
268 268 self.context = context
269 269 self.cache_data = cache_data
270 270
271 271 def should_invalidate(self):
272 272 return True
273 273
274 274
275 275 class InvalidationContext(object):
276 276 """
277 277 usage::
278 278
279 279 from rhodecode.lib import rc_cache
280 280
281 281 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
282 282 region = rc_cache.get_or_create_region('some_region', cache_namespace_uid)
283 283
284 284 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
285 285 def heavy_compute(cache_name, param1, param2):
286 286 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
287 287
288 288 # invalidation namespace is shared namespace key for all process caches
289 289 # we use it to send a global signal
290 290 invalidation_namespace = 'repo_cache:1'
291 291
292 292 inv_context_manager = rc_cache.InvalidationContext(
293 293 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
294 294 with inv_context_manager as invalidation_context:
295 295 args = ('one', 'two')
296 296 # re-compute and store cache if we get invalidate signal
297 297 if invalidation_context.should_invalidate():
298 298 result = heavy_compute.refresh(*args)
299 299 else:
300 300 result = heavy_compute(*args)
301 301
302 302 compute_time = inv_context_manager.compute_time
303 303 log.debug('result computed in %.4fs', compute_time)
304 304
305 305 # To send global invalidation signal, simply run
306 306 CacheKey.set_invalidate(invalidation_namespace)
307 307
308 308 """
309 309
310 310 def __repr__(self):
311 311 return f'<InvalidationContext:{self.cache_key}[{self.uid}]>'
312 312
313 313 def __init__(self, uid, invalidation_namespace='',
314 314 raise_exception=False, thread_scoped=None):
315 315 self.uid = uid
316 316 self.invalidation_namespace = invalidation_namespace
317 317 self.raise_exception = raise_exception
318 318 self.proc_id = rhodecode.CONFIG.get('instance_id') or 'DEFAULT'
319 319 self.thread_id = 'global'
320 320
321 321 if thread_scoped is None:
322 322 # if we set "default" we can override this via .ini settings
323 323 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
324 324
325 325 # Append the thread id to the cache key if this invalidation context
326 326 # should be scoped to the current thread.
327 327 if thread_scoped is True:
328 328 self.thread_id = threading.current_thread().ident
329 329
330 330 self.cache_key = compute_key_from_params(uid)
331 331 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
332 332 self.proc_id, self.thread_id, self.cache_key)
333 333 self.proc_key = f'proc:{self.proc_id}'
334 334 self.compute_time = 0
335 335
336 336 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
337 337 from rhodecode.model.db import CacheKey
338 338
339 339 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
340 340 # fetch all cache keys for this namespace and convert them to a map to find if we
341 341 # have specific cache_key object registered. We do this because we want to have
342 342 # all consistent cache_state_uid for newly registered objects
343 343 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
344 344 cache_obj = cache_obj_map.get(self.cache_key)
345 345 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
346 346
347 347 if not cache_obj:
348 348 new_cache_args = invalidation_namespace
349 349 first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None
350 350 cache_state_uid = None
351 351 if first_cache_obj:
352 352 cache_state_uid = first_cache_obj.cache_state_uid
353 353 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
354 354 cache_state_uid=cache_state_uid)
355 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
356 355
357 356 return cache_obj
358 357
359 358 def __enter__(self):
360 359 """
361 360 Test if current object is valid, and return CacheRegion function
362 361 that does invalidation and calculation
363 362 """
364 363 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
365 364 # register or get a new key based on uid
366 365 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
367 366 cache_data = self.cache_obj.get_dict()
368 367 self._start_time = time.time()
369 368 if self.cache_obj.cache_active:
370 369 # means our cache obj is existing and marked as it's
371 370 # cache is not outdated, we return ActiveRegionCache
372 371 self.skip_cache_active_change = True
373 372
374 373 return ActiveRegionCache(context=self, cache_data=cache_data)
375 374
376 375 # the key is either not existing or set to False, we return
377 376 # the real invalidator which re-computes value. We additionally set
378 377 # the flag to actually update the Database objects
379 378 self.skip_cache_active_change = False
380 379 return FreshRegionCache(context=self, cache_data=cache_data)
381 380
382 381 def __exit__(self, exc_type, exc_val, exc_tb):
383 382 from rhodecode.model.db import IntegrityError, Session
384 383
385 384 # save compute time
386 385 self.compute_time = time.time() - self._start_time
387 386
388 387 if self.skip_cache_active_change:
389 388 return
390 389
391 390 try:
392 391 self.cache_obj.cache_active = True
393 392 Session().add(self.cache_obj)
394 393 Session().commit()
395 394 except IntegrityError:
396 395 # if we catch integrity error, it means we inserted this object
397 396 # assumption is that's really an edge race-condition case and
398 397 # it's safe is to skip it
399 398 Session().rollback()
400 399 except Exception:
401 400 log.exception('Failed to commit on cache key update')
402 401 Session().rollback()
403 402 if self.raise_exception:
404 403 raise
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now