##// END OF EJS Templates
caches: fixed utils imports
super-admin -
r4922:905a9fec default
parent child Browse files
Show More
@@ -1,368 +1,369 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 24 import decorator
25 25 import threading
26 26
27 27 from dogpile.cache import CacheRegion
28 28
29 29 import rhodecode
30 from rhodecode.lib.utils import safe_bytes, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
30 from rhodecode.lib.hash_utils import sha1
31 from rhodecode.lib.type_utils import str2bool
32 from rhodecode.lib.str_utils import safe_bytes
32 33 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 34
34 35 from rhodecode.lib.rc_cache import cache_key_meta
35 36 from rhodecode.lib.rc_cache import region_meta
36 37
37 38 log = logging.getLogger(__name__)
38 39
39 40
40 41 def isCython(func):
41 42 """
42 43 Private helper that checks if a function is a cython function.
43 44 """
44 45 return func.__class__.__name__ == 'cython_function_or_method'
45 46
46 47
47 48 class RhodeCodeCacheRegion(CacheRegion):
48 49
49 50 def conditional_cache_on_arguments(
50 51 self, namespace=None,
51 52 expiration_time=None,
52 53 should_cache_fn=None,
53 54 to_str=str,
54 55 function_key_generator=None,
55 56 condition=True):
56 57 """
57 58 Custom conditional decorator, that will not touch any dogpile internals if
58 59 condition isn't meet. This works a bit different than should_cache_fn
59 60 And it's faster in cases we don't ever want to compute cached values
60 61 """
61 62 expiration_time_is_callable = callable(expiration_time)
62 63
63 64 if function_key_generator is None:
64 65 function_key_generator = self.function_key_generator
65 66
66 67 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
67 68
68 69 if not condition:
69 70 log.debug('Calling un-cached method:%s', user_func.__name__)
70 71 start = time.time()
71 72 result = user_func(*arg, **kw)
72 73 total = time.time() - start
73 74 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
74 75 return result
75 76
76 77 key = key_generator(*arg, **kw)
77 78
78 79 timeout = expiration_time() if expiration_time_is_callable \
79 80 else expiration_time
80 81
81 82 log.debug('Calling cached method:`%s`', user_func.__name__)
82 83 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
83 84
84 85 def cache_decorator(user_func):
85 86 if to_str is str:
86 87 # backwards compatible
87 88 key_generator = function_key_generator(namespace, user_func)
88 89 else:
89 90 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
90 91
91 92 def refresh(*arg, **kw):
92 93 """
93 94 Like invalidate, but regenerates the value instead
94 95 """
95 96 key = key_generator(*arg, **kw)
96 97 value = user_func(*arg, **kw)
97 98 self.set(key, value)
98 99 return value
99 100
100 101 def invalidate(*arg, **kw):
101 102 key = key_generator(*arg, **kw)
102 103 self.delete(key)
103 104
104 105 def set_(value, *arg, **kw):
105 106 key = key_generator(*arg, **kw)
106 107 self.set(key, value)
107 108
108 109 def get(*arg, **kw):
109 110 key = key_generator(*arg, **kw)
110 111 return self.get(key)
111 112
112 113 user_func.set = set_
113 114 user_func.invalidate = invalidate
114 115 user_func.get = get
115 116 user_func.refresh = refresh
116 117 user_func.key_generator = key_generator
117 118 user_func.original = user_func
118 119
119 120 # Use `decorate` to preserve the signature of :param:`user_func`.
120 121 return decorator.decorate(user_func, functools.partial(
121 122 get_or_create_for_user_func, key_generator))
122 123
123 124 return cache_decorator
124 125
125 126
126 127 def make_region(*arg, **kw):
127 128 return RhodeCodeCacheRegion(*arg, **kw)
128 129
129 130
130 131 def get_default_cache_settings(settings, prefixes=None):
131 132 prefixes = prefixes or []
132 133 cache_settings = {}
133 134 for key in settings.keys():
134 135 for prefix in prefixes:
135 136 if key.startswith(prefix):
136 137 name = key.split(prefix)[1].strip()
137 138 val = settings[key]
138 139 if isinstance(val, str):
139 140 val = val.strip()
140 141 cache_settings[name] = val
141 142 return cache_settings
142 143
143 144
144 145 def compute_key_from_params(*args):
145 146 """
146 147 Helper to compute key from given params to be used in cache manager
147 148 """
148 149 return sha1(safe_bytes("_".join(map(str, args))))
149 150
150 151
151 152 def backend_key_generator(backend):
152 153 """
153 154 Special wrapper that also sends over the backend to the key generator
154 155 """
155 156 def wrapper(namespace, fn):
156 157 return key_generator(backend, namespace, fn)
157 158 return wrapper
158 159
159 160
160 161 def key_generator(backend, namespace, fn):
161 162 fname = fn.__name__
162 163
163 164 def generate_key(*args):
164 165 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
165 166 namespace_pref = namespace or 'default_namespace'
166 167 arg_key = compute_key_from_params(*args)
167 168 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
168 169
169 170 return final_key
170 171
171 172 return generate_key
172 173
173 174
174 175 def get_or_create_region(region_name, region_namespace=None):
175 176 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
176 177 region_obj = region_meta.dogpile_cache_regions.get(region_name)
177 178 if not region_obj:
178 179 raise EnvironmentError(
179 180 'Region `{}` not in configured: {}.'.format(
180 181 region_name, list(region_meta.dogpile_cache_regions.keys())))
181 182
182 183 region_uid_name = '{}:{}'.format(region_name, region_namespace)
183 184 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
184 185 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
185 186 if region_exist:
186 187 log.debug('Using already configured region: %s', region_namespace)
187 188 return region_exist
188 189 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
189 190 expiration_time = region_obj.expiration_time
190 191
191 192 if not os.path.isdir(cache_dir):
192 193 os.makedirs(cache_dir)
193 194 new_region = make_region(
194 195 name=region_uid_name,
195 196 function_key_generator=backend_key_generator(region_obj.actual_backend)
196 197 )
197 198 namespace_filename = os.path.join(
198 199 cache_dir, "{}.cache.dbm".format(region_namespace))
199 200 # special type that allows 1db per namespace
200 201 new_region.configure(
201 202 backend='dogpile.cache.rc.file_namespace',
202 203 expiration_time=expiration_time,
203 204 arguments={"filename": namespace_filename}
204 205 )
205 206
206 207 # create and save in region caches
207 208 log.debug('configuring new region: %s', region_uid_name)
208 209 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
209 210
210 211 return region_obj
211 212
212 213
213 214 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
214 215 region = get_or_create_region(cache_region, cache_namespace_uid)
215 216 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
216 217 num_delete_keys = len(cache_keys)
217 218 if invalidate:
218 219 region.invalidate(hard=False)
219 220 else:
220 221 if num_delete_keys:
221 222 region.delete_multi(cache_keys)
222 223 return num_delete_keys
223 224
224 225
225 226 class ActiveRegionCache(object):
226 227 def __init__(self, context, cache_data):
227 228 self.context = context
228 229 self.cache_data = cache_data
229 230
230 231 def should_invalidate(self):
231 232 return False
232 233
233 234
234 235 class FreshRegionCache(object):
235 236 def __init__(self, context, cache_data):
236 237 self.context = context
237 238 self.cache_data = cache_data
238 239
239 240 def should_invalidate(self):
240 241 return True
241 242
242 243
243 244 class InvalidationContext(object):
244 245 """
245 246 usage::
246 247
247 248 from rhodecode.lib import rc_cache
248 249
249 250 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
250 251 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
251 252
252 253 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
253 254 def heavy_compute(cache_name, param1, param2):
254 255 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
255 256
256 257 # invalidation namespace is shared namespace key for all process caches
257 258 # we use it to send a global signal
258 259 invalidation_namespace = 'repo_cache:1'
259 260
260 261 inv_context_manager = rc_cache.InvalidationContext(
261 262 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
262 263 with inv_context_manager as invalidation_context:
263 264 args = ('one', 'two')
264 265 # re-compute and store cache if we get invalidate signal
265 266 if invalidation_context.should_invalidate():
266 267 result = heavy_compute.refresh(*args)
267 268 else:
268 269 result = heavy_compute(*args)
269 270
270 271 compute_time = inv_context_manager.compute_time
271 272 log.debug('result computed in %.4fs', compute_time)
272 273
273 274 # To send global invalidation signal, simply run
274 275 CacheKey.set_invalidate(invalidation_namespace)
275 276
276 277 """
277 278
278 279 def __repr__(self):
279 280 return '<InvalidationContext:{}[{}]>'.format(
280 281 safe_str(self.cache_key), safe_str(self.uid))
281 282
282 283 def __init__(self, uid, invalidation_namespace='',
283 284 raise_exception=False, thread_scoped=None):
284 285 self.uid = uid
285 286 self.invalidation_namespace = invalidation_namespace
286 287 self.raise_exception = raise_exception
287 288 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
288 289 self.thread_id = 'global'
289 290
290 291 if thread_scoped is None:
291 292 # if we set "default" we can override this via .ini settings
292 293 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
293 294
294 295 # Append the thread id to the cache key if this invalidation context
295 296 # should be scoped to the current thread.
296 297 if thread_scoped is True:
297 298 self.thread_id = threading.current_thread().ident
298 299
299 300 self.cache_key = compute_key_from_params(uid)
300 301 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
301 302 self.proc_id, self.thread_id, self.cache_key)
302 303 self.proc_key = 'proc:{}'.format(self.proc_id)
303 304 self.compute_time = 0
304 305
305 306 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
306 307 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
307 308 # fetch all cache keys for this namespace and convert them to a map to find if we
308 309 # have specific cache_key object registered. We do this because we want to have
309 310 # all consistent cache_state_uid for newly registered objects
310 311 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
311 312 cache_obj = cache_obj_map.get(self.cache_key)
312 313 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
313 314 if not cache_obj:
314 315 new_cache_args = invalidation_namespace
315 316 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
316 317 cache_state_uid = None
317 318 if first_cache_obj:
318 319 cache_state_uid = first_cache_obj.cache_state_uid
319 320 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
320 321 cache_state_uid=cache_state_uid)
321 322 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
322 323
323 324 return cache_obj
324 325
325 326 def __enter__(self):
326 327 """
327 328 Test if current object is valid, and return CacheRegion function
328 329 that does invalidation and calculation
329 330 """
330 331 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
331 332 # register or get a new key based on uid
332 333 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
333 334 cache_data = self.cache_obj.get_dict()
334 335 self._start_time = time.time()
335 336 if self.cache_obj.cache_active:
336 337 # means our cache obj is existing and marked as it's
337 338 # cache is not outdated, we return ActiveRegionCache
338 339 self.skip_cache_active_change = True
339 340
340 341 return ActiveRegionCache(context=self, cache_data=cache_data)
341 342
342 343 # the key is either not existing or set to False, we return
343 344 # the real invalidator which re-computes value. We additionally set
344 345 # the flag to actually update the Database objects
345 346 self.skip_cache_active_change = False
346 347 return FreshRegionCache(context=self, cache_data=cache_data)
347 348
348 349 def __exit__(self, exc_type, exc_val, exc_tb):
349 350 # save compute time
350 351 self.compute_time = time.time() - self._start_time
351 352
352 353 if self.skip_cache_active_change:
353 354 return
354 355
355 356 try:
356 357 self.cache_obj.cache_active = True
357 358 Session().add(self.cache_obj)
358 359 Session().commit()
359 360 except IntegrityError:
360 361 # if we catch integrity error, it means we inserted this object
361 362 # assumption is that's really an edge race-condition case and
362 363 # it's safe is to skip it
363 364 Session().rollback()
364 365 except Exception:
365 366 log.exception('Failed to commit on cache key update')
366 367 Session().rollback()
367 368 if self.raise_exception:
368 369 raise
General Comments 0
You need to be logged in to leave comments. Login now