##// END OF EJS Templates
caches: improve logging
super-admin -
r5579:54bb9264 default
parent child Browse files
Show More
@@ -1,357 +1,355 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import functools
20 20 import logging
21 21 import os
22 22 import threading
23 23 import time
24 24
25 25 import decorator
26 26 from dogpile.cache import CacheRegion
27 27
28 28 import rhodecode
29 29 from ...lib.hash_utils import sha1
30 30 from ...lib.str_utils import safe_bytes
31 31 from ...lib.type_utils import str2bool # noqa :required by imports from .utils
32 32
33 33 from . import region_meta
34 34
35 35 log = logging.getLogger(__name__)
36 36
37 37
38 38 def isCython(func):
39 39 """
40 40 Private helper that checks if a function is a cython function.
41 41 """
42 42 return func.__class__.__name__ == 'cython_function_or_method'
43 43
44 44
45 45 class RhodeCodeCacheRegion(CacheRegion):
46 46
47 47 def __repr__(self):
48 48 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
49 49
50 50 def conditional_cache_on_arguments(
51 51 self, namespace=None,
52 52 expiration_time=None,
53 53 should_cache_fn=None,
54 54 to_str=str,
55 55 function_key_generator=None,
56 56 condition=True):
57 57 """
58 58 Custom conditional decorator, that will not touch any dogpile internals if
59 59 condition isn't meet. This works a bit different from should_cache_fn
60 60 And it's faster in cases we don't ever want to compute cached values
61 61 """
62 62 expiration_time_is_callable = callable(expiration_time)
63 63 if not namespace:
64 64 namespace = getattr(self, '_default_namespace', None)
65 65
66 66 if function_key_generator is None:
67 67 function_key_generator = self.function_key_generator
68 68
69 69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
70 70
71 71 if not condition:
72 log.debug('Calling un-cached method:%s', user_func.__name__)
72 log.debug('Calling un-cached method:`%s`', user_func.__name__)
73 73 start = time.time()
74 74 result = user_func(*arg, **kw)
75 75 total = time.time() - start
76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
76 log.debug('Call for un-cached method:`%s` took %.4fs', user_func.__name__, total)
77 77 return result
78 78
79 79 key = func_key_generator(*arg, **kw)
80 timeout = expiration_time() if expiration_time_is_callable else expiration_time
81 log.debug('Calling cached (timeout=%s) method:`%s`', timeout, user_func.__name__)
80 82
81 timeout = expiration_time() if expiration_time_is_callable \
82 else expiration_time
83
84 log.debug('Calling cached method:`%s`', user_func.__name__)
85 83 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
86 84
87 85 def cache_decorator(user_func):
88 86 if to_str is str:
89 87 # backwards compatible
90 88 key_generator = function_key_generator(namespace, user_func)
91 89 else:
92 90 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
93 91
94 92 def refresh(*arg, **kw):
95 93 """
96 94 Like invalidate, but regenerates the value instead
97 95 """
98 96 key = key_generator(*arg, **kw)
99 97 value = user_func(*arg, **kw)
100 98 self.set(key, value)
101 99 return value
102 100
103 101 def invalidate(*arg, **kw):
104 102 key = key_generator(*arg, **kw)
105 103 self.delete(key)
106 104
107 105 def set_(value, *arg, **kw):
108 106 key = key_generator(*arg, **kw)
109 107 self.set(key, value)
110 108
111 109 def get(*arg, **kw):
112 110 key = key_generator(*arg, **kw)
113 111 return self.get(key)
114 112
115 113 user_func.set = set_
116 114 user_func.invalidate = invalidate
117 115 user_func.get = get
118 116 user_func.refresh = refresh
119 117 user_func.key_generator = key_generator
120 118 user_func.original = user_func
121 119
122 120 # Use `decorate` to preserve the signature of :param:`user_func`.
123 121 return decorator.decorate(user_func, functools.partial(
124 122 get_or_create_for_user_func, key_generator))
125 123
126 124 return cache_decorator
127 125
128 126
129 127 def make_region(*arg, **kw):
130 128 return RhodeCodeCacheRegion(*arg, **kw)
131 129
132 130
133 131 def get_default_cache_settings(settings, prefixes=None):
134 132 prefixes = prefixes or []
135 133 cache_settings = {}
136 134 for key in settings.keys():
137 135 for prefix in prefixes:
138 136 if key.startswith(prefix):
139 137 name = key.split(prefix)[1].strip()
140 138 val = settings[key]
141 139 if isinstance(val, str):
142 140 val = val.strip()
143 141 cache_settings[name] = val
144 142 return cache_settings
145 143
146 144
147 145 def compute_key_from_params(*args):
148 146 """
149 147 Helper to compute key from given params to be used in cache manager
150 148 """
151 149 return sha1(safe_bytes("_".join(map(str, args))))
152 150
153 151
154 152 def custom_key_generator(backend, namespace, fn):
155 153 func_name = fn.__name__
156 154
157 155 def generate_key(*args):
158 156 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
159 157 namespace_pref = namespace or 'default_namespace'
160 158 arg_key = compute_key_from_params(*args)
161 159 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
162 160
163 161 return final_key
164 162
165 163 return generate_key
166 164
167 165
168 166 def backend_key_generator(backend):
169 167 """
170 168 Special wrapper that also sends over the backend to the key generator
171 169 """
172 170 def wrapper(namespace, fn):
173 171 return custom_key_generator(backend, namespace, fn)
174 172 return wrapper
175 173
176 174
177 175 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False, force=False):
178 176 from .backends import FileNamespaceBackend
179 177 from . import async_creation_runner
180 178
181 179 region_obj = region_meta.dogpile_cache_regions.get(region_name)
182 180 if not region_obj:
183 181 reg_keys = list(region_meta.dogpile_cache_regions.keys())
184 182 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
185 183
186 184 region_uid_name = f'{region_name}:{region_namespace}'
187 185
188 186 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
189 187 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
190 188 if not region_namespace:
191 189 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
192 190
193 191 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
194 192 if region_exist and not force:
195 193 log.debug('Using already configured region: %s', region_namespace)
196 194 return region_exist
197 195
198 196 expiration_time = region_obj.expiration_time
199 197
200 198 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
201 199 namespace_cache_dir = cache_dir
202 200
203 201 # we default the namespace_cache_dir to our default cache dir.
204 202 # however, if this backend is configured with filename= param, we prioritize that
205 203 # so all caches within that particular region, even those namespaced end up in the same path
206 204 if region_obj.actual_backend.filename:
207 205 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
208 206
209 207 if not os.path.isdir(namespace_cache_dir):
210 208 os.makedirs(namespace_cache_dir)
211 209 new_region = make_region(
212 210 name=region_uid_name,
213 211 function_key_generator=backend_key_generator(region_obj.actual_backend)
214 212 )
215 213
216 214 namespace_filename = os.path.join(
217 215 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
218 216 # special type that allows 1db per namespace
219 217 new_region.configure(
220 218 backend='dogpile.cache.rc.file_namespace',
221 219 expiration_time=expiration_time,
222 220 arguments={"filename": namespace_filename}
223 221 )
224 222
225 223 # create and save in region caches
226 224 log.debug('configuring new region: %s', region_uid_name)
227 225 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
228 226
229 227 region_obj._default_namespace = region_namespace
230 228 if use_async_runner:
231 229 region_obj.async_creation_runner = async_creation_runner
232 230 return region_obj
233 231
234 232
235 233 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
236 234 from . import CLEAR_DELETE, CLEAR_INVALIDATE
237 235
238 236 if not isinstance(cache_region, RhodeCodeCacheRegion):
239 237 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
240 238 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
241 239 cache_region, cache_namespace_uid, method)
242 240
243 241 num_affected_keys = 0
244 242
245 243 if method == CLEAR_INVALIDATE:
246 244 # NOTE: The CacheRegion.invalidate() method’s default mode of
247 245 # operation is to set a timestamp local to this CacheRegion in this Python process only.
248 246 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
249 247 cache_region.invalidate(hard=True)
250 248
251 249 if method == CLEAR_DELETE:
252 250 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
253 251 return num_affected_keys
254 252
255 253
256 254 class ActiveRegionCache(object):
257 255 def __init__(self, context, cache_data: dict):
258 256 self.context = context
259 257 self.cache_data = cache_data
260 258
261 259 @property
262 260 def state_uid(self) -> str:
263 261 return self.cache_data['cache_state_uid']
264 262
265 263
266 264 class InvalidationContext(object):
267 265 """
268 266 usage::
269 267
270 268 from rhodecode.lib import rc_cache
271 269
272 270 repo_namespace_key = 'some-cache-for-repo-id-100'
273 271 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
274 272
275 273 def cache_generator(_state_uid):
276 274
277 275 @region.conditional_cache_on_arguments(namespace='some-common-namespace-100')
278 276 def _dummy_func(*args):
279 277 # compute heavy function
280 278 return _state_uid, 'result'
281 279
282 280 return _dummy_func
283 281
284 282 with inv_context_manager as invalidation_context:
285 283 cache_state_uid = invalidation_context.state_uid
286 284 cache_func = cache_generator(cache_state_uid)
287 285 previous_state_uid, result = cache_func(*call_args)
288 286
289 287 should_invalidate = previous_state_uid != cache_state_uid
290 288 if should_invalidate:
291 289 _, result = cache_func.refresh(*call_args)
292 290
293 291 # To send global invalidation signal, simply run
294 292 CacheKey.set_invalidate(repo_namespace_key)
295 293
296 294 """
297 295
298 296 def __repr__(self):
299 297 return f'<InvalidationContext:{self.cache_key}>'
300 298
301 299 def __init__(self, key, raise_exception=False, thread_scoped=None):
302 300 self.cache_key = key
303 301
304 302 self.raise_exception = raise_exception
305 303 self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT'
306 304 self.thread_id = 'global'
307 305
308 306 if thread_scoped is None:
309 307 # if we set "default" we can override this via .ini settings
310 308 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
311 309
312 310 # Append the thread id to the cache key if this invalidation context
313 311 # should be scoped to the current thread.
314 312 if thread_scoped is True:
315 313 self.thread_id = threading.current_thread().ident
316 314
317 315 self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}'
318 316 self.compute_time = 0
319 317
320 318 def get_or_create_cache_obj(self):
321 319 from rhodecode.model.db import CacheKey, Session, IntegrityError
322 320
323 321 cache_obj = CacheKey.get_active_cache(self.cache_key)
324 322 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
325 323
326 324 if not cache_obj:
327 325 # generate new UID for non-existing cache object
328 326 cache_state_uid = CacheKey.generate_new_state_uid()
329 327 cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}',
330 328 cache_state_uid=cache_state_uid, cache_active=True)
331 329 try:
332 330 Session().add(cache_obj)
333 331 Session().commit()
334 332 except IntegrityError:
335 333 # if we catch integrity error, it means we inserted this object
336 334 # assumption is that's really an edge race-condition case and
337 335 # it's safe is to skip it
338 336 Session().rollback()
339 337 except Exception:
340 338 log.exception('Failed to commit on cache key update')
341 339 Session().rollback()
342 340 if self.raise_exception:
343 341 raise
344 342 return cache_obj
345 343
346 344 def __enter__(self):
347 345 log.debug('Entering cache invalidation check context: %s', self)
348 346 self._start_time = time.time()
349 347
350 348 self.cache_obj = self.get_or_create_cache_obj()
351 349 cache_data = self.cache_obj.get_dict()
352 350
353 351 return ActiveRegionCache(context=self, cache_data=cache_data)
354 352
355 353 def __exit__(self, exc_type, exc_val, exc_tb):
356 354 # save compute time
357 355 self.compute_time = time.time() - self._start_time
General Comments 0
You need to be logged in to leave comments. Login now