##// END OF EJS Templates
caches: newly generated cache object should have always unique UIDs to prevent...
marcink -
r3861:58b00196 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,339 +1,348 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 24 import threading
25 25
26 26 from dogpile.cache import CacheRegion
27 27 from dogpile.cache.util import compat
28 28
29 29 import rhodecode
30 30 from rhodecode.lib.utils import safe_str, sha1
31 31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 33
34 34 from . import region_meta
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38
39 39 class RhodeCodeCacheRegion(CacheRegion):
40 40
41 41 def conditional_cache_on_arguments(
42 42 self, namespace=None,
43 43 expiration_time=None,
44 44 should_cache_fn=None,
45 45 to_str=compat.string_type,
46 46 function_key_generator=None,
47 47 condition=True):
48 48 """
49 49 Custom conditional decorator, that will not touch any dogpile internals if
50 50 condition isn't meet. This works a bit different than should_cache_fn
51 51 And it's faster in cases we don't ever want to compute cached values
52 52 """
53 53 expiration_time_is_callable = compat.callable(expiration_time)
54 54
55 55 if function_key_generator is None:
56 56 function_key_generator = self.function_key_generator
57 57
58 58 def decorator(fn):
59 59 if to_str is compat.string_type:
60 60 # backwards compatible
61 61 key_generator = function_key_generator(namespace, fn)
62 62 else:
63 63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
64 64
65 65 @functools.wraps(fn)
66 66 def decorate(*arg, **kw):
67 67 key = key_generator(*arg, **kw)
68 68
69 69 @functools.wraps(fn)
70 70 def creator():
71 71 return fn(*arg, **kw)
72 72
73 73 if not condition:
74 74 return creator()
75 75
76 76 timeout = expiration_time() if expiration_time_is_callable \
77 77 else expiration_time
78 78
79 79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80 80
81 81 def invalidate(*arg, **kw):
82 82 key = key_generator(*arg, **kw)
83 83 self.delete(key)
84 84
85 85 def set_(value, *arg, **kw):
86 86 key = key_generator(*arg, **kw)
87 87 self.set(key, value)
88 88
89 89 def get(*arg, **kw):
90 90 key = key_generator(*arg, **kw)
91 91 return self.get(key)
92 92
93 93 def refresh(*arg, **kw):
94 94 key = key_generator(*arg, **kw)
95 95 value = fn(*arg, **kw)
96 96 self.set(key, value)
97 97 return value
98 98
99 99 decorate.set = set_
100 100 decorate.invalidate = invalidate
101 101 decorate.refresh = refresh
102 102 decorate.get = get
103 103 decorate.original = fn
104 104 decorate.key_generator = key_generator
105 105 decorate.__wrapped__ = fn
106 106
107 107 return decorate
108 108
109 109 return decorator
110 110
111 111
112 112 def make_region(*arg, **kw):
113 113 return RhodeCodeCacheRegion(*arg, **kw)
114 114
115 115
116 116 def get_default_cache_settings(settings, prefixes=None):
117 117 prefixes = prefixes or []
118 118 cache_settings = {}
119 119 for key in settings.keys():
120 120 for prefix in prefixes:
121 121 if key.startswith(prefix):
122 122 name = key.split(prefix)[1].strip()
123 123 val = settings[key]
124 124 if isinstance(val, compat.string_types):
125 125 val = val.strip()
126 126 cache_settings[name] = val
127 127 return cache_settings
128 128
129 129
130 130 def compute_key_from_params(*args):
131 131 """
132 132 Helper to compute key from given params to be used in cache manager
133 133 """
134 134 return sha1("_".join(map(safe_str, args)))
135 135
136 136
137 137 def backend_key_generator(backend):
138 138 """
139 139 Special wrapper that also sends over the backend to the key generator
140 140 """
141 141 def wrapper(namespace, fn):
142 142 return key_generator(backend, namespace, fn)
143 143 return wrapper
144 144
145 145
146 146 def key_generator(backend, namespace, fn):
147 147 fname = fn.__name__
148 148
149 149 def generate_key(*args):
150 150 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 151 namespace_pref = namespace or 'default_namespace'
152 152 arg_key = compute_key_from_params(*args)
153 153 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
154 154
155 155 return final_key
156 156
157 157 return generate_key
158 158
159 159
160 160 def get_or_create_region(region_name, region_namespace=None):
161 161 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
162 162 region_obj = region_meta.dogpile_cache_regions.get(region_name)
163 163 if not region_obj:
164 164 raise EnvironmentError(
165 165 'Region `{}` not in configured: {}.'.format(
166 166 region_name, region_meta.dogpile_cache_regions.keys()))
167 167
168 168 region_uid_name = '{}:{}'.format(region_name, region_namespace)
169 169 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
170 170 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
171 171 if region_exist:
172 172 log.debug('Using already configured region: %s', region_namespace)
173 173 return region_exist
174 174 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
175 175 expiration_time = region_obj.expiration_time
176 176
177 177 if not os.path.isdir(cache_dir):
178 178 os.makedirs(cache_dir)
179 179 new_region = make_region(
180 180 name=region_uid_name,
181 181 function_key_generator=backend_key_generator(region_obj.actual_backend)
182 182 )
183 183 namespace_filename = os.path.join(
184 184 cache_dir, "{}.cache.dbm".format(region_namespace))
185 185 # special type that allows 1db per namespace
186 186 new_region.configure(
187 187 backend='dogpile.cache.rc.file_namespace',
188 188 expiration_time=expiration_time,
189 189 arguments={"filename": namespace_filename}
190 190 )
191 191
192 192 # create and save in region caches
193 193 log.debug('configuring new region: %s', region_uid_name)
194 194 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
195 195
196 196 return region_obj
197 197
198 198
199 199 def clear_cache_namespace(cache_region, cache_namespace_uid):
200 200 region = get_or_create_region(cache_region, cache_namespace_uid)
201 201 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
202 202 num_delete_keys = len(cache_keys)
203 203 if num_delete_keys:
204 204 region.delete_multi(cache_keys)
205 205 return num_delete_keys
206 206
207 207
208 208 class ActiveRegionCache(object):
209 209 def __init__(self, context, cache_data):
210 210 self.context = context
211 211 self.cache_data = cache_data
212 212
213 213 def should_invalidate(self):
214 214 return False
215 215
216 216
217 217 class FreshRegionCache(object):
218 218 def __init__(self, context, cache_data):
219 219 self.context = context
220 220 self.cache_data = cache_data
221 221
222 222 def should_invalidate(self):
223 223 return True
224 224
225 225
226 226 class InvalidationContext(object):
227 227 """
228 228 usage::
229 229
230 230 from rhodecode.lib import rc_cache
231 231
232 232 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
233 233 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
234 234
235 235 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
236 236 def heavy_compute(cache_name, param1, param2):
237 237 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
238 238
239 239 # invalidation namespace is shared namespace key for all process caches
240 240 # we use it to send a global signal
241 241 invalidation_namespace = 'repo_cache:1'
242 242
243 243 inv_context_manager = rc_cache.InvalidationContext(
244 244 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
245 245 with inv_context_manager as invalidation_context:
246 246 args = ('one', 'two')
247 247 # re-compute and store cache if we get invalidate signal
248 248 if invalidation_context.should_invalidate():
249 249 result = heavy_compute.refresh(*args)
250 250 else:
251 251 result = heavy_compute(*args)
252 252
253 253 compute_time = inv_context_manager.compute_time
254 254 log.debug('result computed in %.4fs', compute_time)
255 255
256 256 # To send global invalidation signal, simply run
257 257 CacheKey.set_invalidate(invalidation_namespace)
258 258
259 259 """
260 260
261 261 def __repr__(self):
262 262 return '<InvalidationContext:{}[{}]>'.format(
263 263 safe_str(self.cache_key), safe_str(self.uid))
264 264
265 265 def __init__(self, uid, invalidation_namespace='',
266 266 raise_exception=False, thread_scoped=None):
267 267 self.uid = uid
268 268 self.invalidation_namespace = invalidation_namespace
269 269 self.raise_exception = raise_exception
270 270 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
271 271 self.thread_id = 'global'
272 272
273 273 if thread_scoped is None:
274 274 # if we set "default" we can override this via .ini settings
275 275 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
276 276
277 277 # Append the thread id to the cache key if this invalidation context
278 278 # should be scoped to the current thread.
279 279 if thread_scoped is True:
280 280 self.thread_id = threading.current_thread().ident
281 281
282 282 self.cache_key = compute_key_from_params(uid)
283 283 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
284 284 self.proc_id, self.thread_id, self.cache_key)
285 285 self.compute_time = 0
286 286
287 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
288 cache_obj = CacheKey.get_active_cache(self.cache_key)
287 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
288 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
289 # fetch all cache keys for this namespace and convert them to a map to find if we
290 # have specific cache_key object registered. We do this because we want to have
291 # all consistent cache_state_uid for newly registered objects
292 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
293 cache_obj = cache_obj_map.get(self.cache_key)
289 294 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
290 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
291 295 if not cache_obj:
292 296 new_cache_args = invalidation_namespace
293 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args)
297 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
298 cache_state_uid = None
299 if first_cache_obj:
300 cache_state_uid = first_cache_obj.cache_state_uid
301 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
302 cache_state_uid=cache_state_uid)
294 303 return cache_obj
295 304
296 305 def __enter__(self):
297 306 """
298 307 Test if current object is valid, and return CacheRegion function
299 308 that does invalidation and calculation
300 309 """
301 310 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
302 311 # register or get a new key based on uid
303 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
312 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
304 313 cache_data = self.cache_obj.get_dict()
305 314 self._start_time = time.time()
306 315 if self.cache_obj.cache_active:
307 316 # means our cache obj is existing and marked as it's
308 317 # cache is not outdated, we return ActiveRegionCache
309 318 self.skip_cache_active_change = True
310 319
311 320 return ActiveRegionCache(context=self, cache_data=cache_data)
312 321
313 322 # the key is either not existing or set to False, we return
314 323 # the real invalidator which re-computes value. We additionally set
315 324 # the flag to actually update the Database objects
316 325 self.skip_cache_active_change = False
317 326 return FreshRegionCache(context=self, cache_data=cache_data)
318 327
319 328 def __exit__(self, exc_type, exc_val, exc_tb):
320 329 # save compute time
321 330 self.compute_time = time.time() - self._start_time
322 331
323 332 if self.skip_cache_active_change:
324 333 return
325 334
326 335 try:
327 336 self.cache_obj.cache_active = True
328 337 Session().add(self.cache_obj)
329 338 Session().commit()
330 339 except IntegrityError:
331 340 # if we catch integrity error, it means we inserted this object
332 341 # assumption is that's really an edge race-condition case and
333 342 # it's safe is to skip it
334 343 Session().rollback()
335 344 except Exception:
336 345 log.exception('Failed to commit on cache key update')
337 346 Session().rollback()
338 347 if self.raise_exception:
339 348 raise
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now