##// END OF EJS Templates
caches: added debug and timings
super-admin -
r4733:3c68f6b7 stable
parent child Browse files
Show More
@@ -1,354 +1,363 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from pyramid.settings import asbool
37 37
38 38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 39
40 40
41 41 _default_max_size = 1024
42 42
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 class LRUMemoryBackend(memory_backend.MemoryBackend):
47 47 key_prefix = 'lru_mem_backend'
48 48 pickle_values = False
49 49
50 50 def __init__(self, arguments):
51 51 max_size = arguments.pop('max_size', _default_max_size)
52 52
53 53 LRUDictClass = LRUDict
54 54 if arguments.pop('log_key_count', None):
55 55 LRUDictClass = LRUDictDebug
56 56
57 57 arguments['cache_dict'] = LRUDictClass(max_size)
58 58 super(LRUMemoryBackend, self).__init__(arguments)
59 59
60 60 def delete(self, key):
61 61 try:
62 62 del self._cache[key]
63 63 except KeyError:
64 64 # we don't care if key isn't there at deletion
65 65 pass
66 66
67 67 def delete_multi(self, keys):
68 68 for key in keys:
69 69 self.delete(key)
70 70
71 71
72 72 class PickleSerializer(object):
73 73
74 74 def _dumps(self, value, safe=False):
75 75 try:
76 76 return compat.pickle.dumps(value)
77 77 except Exception:
78 78 if safe:
79 79 return NO_VALUE
80 80 else:
81 81 raise
82 82
83 83 def _loads(self, value, safe=True):
84 84 try:
85 85 return compat.pickle.loads(value)
86 86 except Exception:
87 87 if safe:
88 88 return NO_VALUE
89 89 else:
90 90 raise
91 91
92 92
93 93 class MsgPackSerializer(object):
94 94
95 95 def _dumps(self, value, safe=False):
96 96 try:
97 97 return msgpack.packb(value)
98 98 except Exception:
99 99 if safe:
100 100 return NO_VALUE
101 101 else:
102 102 raise
103 103
104 104 def _loads(self, value, safe=True):
105 105 """
106 106 pickle maintained the `CachedValue` wrapper of the tuple
107 107 msgpack does not, so it must be added back in.
108 108 """
109 109 try:
110 110 value = msgpack.unpackb(value, use_list=False)
111 111 return CachedValue(*value)
112 112 except Exception:
113 113 if safe:
114 114 return NO_VALUE
115 115 else:
116 116 raise
117 117
118 118
119 119 import fcntl
120 120 flock_org = fcntl.flock
121 121
122 122
123 123 class CustomLockFactory(FileLock):
124 124
125 125 @memoized_property
126 126 def _module(self):
127 127
128 128 def gevent_flock(fd, operation):
129 129 """
130 130 Gevent compatible flock
131 131 """
132 132 # set non-blocking, this will cause an exception if we cannot acquire a lock
133 133 operation |= fcntl.LOCK_NB
134 134 start_lock_time = time.time()
135 135 timeout = 60 * 15 # 15min
136 136 while True:
137 137 try:
138 138 flock_org(fd, operation)
139 139 # lock has been acquired
140 140 break
141 141 except (OSError, IOError) as e:
142 142 # raise on other errors than Resource temporarily unavailable
143 143 if e.errno != errno.EAGAIN:
144 144 raise
145 145 elif (time.time() - start_lock_time) > timeout:
146 146 # waited to much time on a lock, better fail than loop for ever
147 147 log.error('Failed to acquire lock on `%s` after waiting %ss',
148 148 self.filename, timeout)
149 149 raise
150 150 wait_timeout = 0.03
151 151 log.debug('Failed to acquire lock on `%s`, retry in %ss',
152 152 self.filename, wait_timeout)
153 153 gevent.sleep(wait_timeout)
154 154
155 155 fcntl.flock = gevent_flock
156 156 return fcntl
157 157
158 158
159 159 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
160 160 key_prefix = 'file_backend'
161 161
162 162 def __init__(self, arguments):
163 163 arguments['lock_factory'] = CustomLockFactory
164 164 db_file = arguments.get('filename')
165 165
166 166 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
167 167 try:
168 168 super(FileNamespaceBackend, self).__init__(arguments)
169 169 except Exception:
170 170 log.error('Failed to initialize db at: %s', db_file)
171 171 raise
172 172
173 173 def __repr__(self):
174 174 return '{} `{}`'.format(self.__class__, self.filename)
175 175
176 176 def list_keys(self, prefix=''):
177 177 prefix = '{}:{}'.format(self.key_prefix, prefix)
178 178
179 179 def cond(v):
180 180 if not prefix:
181 181 return True
182 182
183 183 if v.startswith(prefix):
184 184 return True
185 185 return False
186 186
187 187 with self._dbm_file(True) as dbm:
188 188 try:
189 189 return filter(cond, dbm.keys())
190 190 except Exception:
191 191 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
192 192 raise
193 193
194 194 def get_store(self):
195 195 return self.filename
196 196
197 197 def _dbm_get(self, key):
198 198 with self._dbm_file(False) as dbm:
199 199 if hasattr(dbm, 'get'):
200 200 value = dbm.get(key, NO_VALUE)
201 201 else:
202 202 # gdbm objects lack a .get method
203 203 try:
204 204 value = dbm[key]
205 205 except KeyError:
206 206 value = NO_VALUE
207 207 if value is not NO_VALUE:
208 208 value = self._loads(value)
209 209 return value
210 210
211 211 def get(self, key):
212 212 try:
213 213 return self._dbm_get(key)
214 214 except Exception:
215 215 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
216 216 raise
217 217
218 218 def set(self, key, value):
219 219 with self._dbm_file(True) as dbm:
220 220 dbm[key] = self._dumps(value)
221 221
222 222 def set_multi(self, mapping):
223 223 with self._dbm_file(True) as dbm:
224 224 for key, value in mapping.items():
225 225 dbm[key] = self._dumps(value)
226 226
227 227
228 228 class BaseRedisBackend(redis_backend.RedisBackend):
229 229 key_prefix = ''
230 230
231 231 def __init__(self, arguments):
232 232 super(BaseRedisBackend, self).__init__(arguments)
233 233 self._lock_timeout = self.lock_timeout
234 234 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
235 235
236 236 if self._lock_auto_renewal and not self._lock_timeout:
237 237 # set default timeout for auto_renewal
238 238 self._lock_timeout = 30
239 239
240 240 def _create_client(self):
241 241 args = {}
242 242
243 243 if self.url is not None:
244 244 args.update(url=self.url)
245 245
246 246 else:
247 247 args.update(
248 248 host=self.host, password=self.password,
249 249 port=self.port, db=self.db
250 250 )
251 251
252 252 connection_pool = redis.ConnectionPool(**args)
253 253
254 254 return redis.StrictRedis(connection_pool=connection_pool)
255 255
256 256 def list_keys(self, prefix=''):
257 257 prefix = '{}:{}*'.format(self.key_prefix, prefix)
258 258 return self.client.keys(prefix)
259 259
260 260 def get_store(self):
261 261 return self.client.connection_pool
262 262
263 263 def get(self, key):
264 264 value = self.client.get(key)
265 265 if value is None:
266 266 return NO_VALUE
267 267 return self._loads(value)
268 268
269 269 def get_multi(self, keys):
270 270 if not keys:
271 271 return []
272 272 values = self.client.mget(keys)
273 273 loads = self._loads
274 274 return [
275 275 loads(v) if v is not None else NO_VALUE
276 276 for v in values]
277 277
278 278 def set(self, key, value):
279 279 if self.redis_expiration_time:
280 280 self.client.setex(key, self.redis_expiration_time,
281 281 self._dumps(value))
282 282 else:
283 283 self.client.set(key, self._dumps(value))
284 284
285 285 def set_multi(self, mapping):
286 286 dumps = self._dumps
287 287 mapping = dict(
288 288 (k, dumps(v))
289 289 for k, v in mapping.items()
290 290 )
291 291
292 292 if not self.redis_expiration_time:
293 293 self.client.mset(mapping)
294 294 else:
295 295 pipe = self.client.pipeline()
296 296 for key, value in mapping.items():
297 297 pipe.setex(key, self.redis_expiration_time, value)
298 298 pipe.execute()
299 299
300 300 def get_mutex(self, key):
301 301 if self.distributed_lock:
302 302 lock_key = redis_backend.u('_lock_{0}').format(key)
303 log.debug('Trying to acquire Redis lock for key %s', lock_key)
304 303 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 304 auto_renewal=self._lock_auto_renewal)
306 305 else:
307 306 return None
308 307
309 308
310 309 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 310 key_prefix = 'redis_pickle_backend'
312 311 pass
313 312
314 313
315 314 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 315 key_prefix = 'redis_msgpack_backend'
317 316 pass
318 317
319 318
320 319 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 320 import redis_lock
322 321
323 322 class _RedisLockWrapper(object):
324 323 """LockWrapper for redis_lock"""
325 324
326 325 @classmethod
327 326 def get_lock(cls):
328 327 return redis_lock.Lock(
329 328 redis_client=client,
330 329 name=lock_key,
331 330 expire=lock_timeout,
332 331 auto_renewal=auto_renewal,
333 332 strict=True,
334 333 )
335 334
335 def __repr__(self):
336 return "{}:{}".format(self.__class__.__name__, lock_key)
337
338 def __str__(self):
339 return "{}:{}".format(self.__class__.__name__, lock_key)
340
336 341 def __init__(self):
337 342 self.lock = self.get_lock()
343 self.lock_key = lock_key
338 344
339 345 def acquire(self, wait=True):
346 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
340 347 try:
341 return self.lock.acquire(wait)
348 acquired = self.lock.acquire(wait)
349 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
350 return acquired
342 351 except redis_lock.AlreadyAcquired:
343 352 return False
344 353 except redis_lock.AlreadyStarted:
345 354 # refresh thread exists, but it also means we acquired the lock
346 355 return True
347 356
348 357 def release(self):
349 358 try:
350 359 self.lock.release()
351 360 except redis_lock.NotAcquired:
352 361 pass
353 362
354 363 return _RedisLockWrapper()
@@ -1,418 +1,422 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 24 import threading
25 25
26 26 from dogpile.cache import CacheRegion
27 27 from dogpile.cache.util import compat
28 28
29 29 import rhodecode
30 30 from rhodecode.lib.utils import safe_str, sha1
31 31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 33
34 34 from rhodecode.lib.rc_cache import cache_key_meta
35 35 from rhodecode.lib.rc_cache import region_meta
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def isCython(func):
41 41 """
42 42 Private helper that checks if a function is a cython function.
43 43 """
44 44 return func.__class__.__name__ == 'cython_function_or_method'
45 45
46 46
47 47 class RhodeCodeCacheRegion(CacheRegion):
48 48
49 49 def conditional_cache_on_arguments(
50 50 self, namespace=None,
51 51 expiration_time=None,
52 52 should_cache_fn=None,
53 53 to_str=compat.string_type,
54 54 function_key_generator=None,
55 55 condition=True):
56 56 """
57 57 Custom conditional decorator, that will not touch any dogpile internals if
58 58 condition isn't meet. This works a bit different than should_cache_fn
59 59 And it's faster in cases we don't ever want to compute cached values
60 60 """
61 61 expiration_time_is_callable = compat.callable(expiration_time)
62 62
63 63 if function_key_generator is None:
64 64 function_key_generator = self.function_key_generator
65 65
66 66 # workaround for py2 and cython problems, this block should be removed
67 67 # once we've migrated to py3
68 68 if 'cython' == 'cython':
69 69 def decorator(fn):
70 70 if to_str is compat.string_type:
71 71 # backwards compatible
72 72 key_generator = function_key_generator(namespace, fn)
73 73 else:
74 74 key_generator = function_key_generator(namespace, fn, to_str=to_str)
75 75
76 76 @functools.wraps(fn)
77 77 def decorate(*arg, **kw):
78 78 key = key_generator(*arg, **kw)
79 79
80 80 @functools.wraps(fn)
81 81 def creator():
82 82 return fn(*arg, **kw)
83 83
84 84 if not condition:
85 85 return creator()
86 86
87 87 timeout = expiration_time() if expiration_time_is_callable \
88 88 else expiration_time
89 89
90 90 return self.get_or_create(key, creator, timeout, should_cache_fn)
91 91
92 92 def invalidate(*arg, **kw):
93 93 key = key_generator(*arg, **kw)
94 94 self.delete(key)
95 95
96 96 def set_(value, *arg, **kw):
97 97 key = key_generator(*arg, **kw)
98 98 self.set(key, value)
99 99
100 100 def get(*arg, **kw):
101 101 key = key_generator(*arg, **kw)
102 102 return self.get(key)
103 103
104 104 def refresh(*arg, **kw):
105 105 key = key_generator(*arg, **kw)
106 106 value = fn(*arg, **kw)
107 107 self.set(key, value)
108 108 return value
109 109
110 110 decorate.set = set_
111 111 decorate.invalidate = invalidate
112 112 decorate.refresh = refresh
113 113 decorate.get = get
114 114 decorate.original = fn
115 115 decorate.key_generator = key_generator
116 116 decorate.__wrapped__ = fn
117 117
118 118 return decorate
119 119 return decorator
120 120
121 121 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
122 122
123 123 if not condition:
124 124 log.debug('Calling un-cached func:%s', user_func.func_name)
125 return user_func(*arg, **kw)
125 start = time.time()
126 result = user_func(*arg, **kw)
127 total = time.time() - start
128 log.debug('un-cached func:%s took %.4fs', user_func.func_name, total)
129 return result
126 130
127 131 key = key_generator(*arg, **kw)
128 132
129 133 timeout = expiration_time() if expiration_time_is_callable \
130 134 else expiration_time
131 135
132 136 log.debug('Calling cached fn:%s', user_func.func_name)
133 137 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
134 138
135 139 def cache_decorator(user_func):
136 140 if to_str is compat.string_type:
137 141 # backwards compatible
138 142 key_generator = function_key_generator(namespace, user_func)
139 143 else:
140 144 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
141 145
142 146 def refresh(*arg, **kw):
143 147 """
144 148 Like invalidate, but regenerates the value instead
145 149 """
146 150 key = key_generator(*arg, **kw)
147 151 value = user_func(*arg, **kw)
148 152 self.set(key, value)
149 153 return value
150 154
151 155 def invalidate(*arg, **kw):
152 156 key = key_generator(*arg, **kw)
153 157 self.delete(key)
154 158
155 159 def set_(value, *arg, **kw):
156 160 key = key_generator(*arg, **kw)
157 161 self.set(key, value)
158 162
159 163 def get(*arg, **kw):
160 164 key = key_generator(*arg, **kw)
161 165 return self.get(key)
162 166
163 167 user_func.set = set_
164 168 user_func.invalidate = invalidate
165 169 user_func.get = get
166 170 user_func.refresh = refresh
167 171 user_func.key_generator = key_generator
168 172 user_func.original = user_func
169 173
170 174 # Use `decorate` to preserve the signature of :param:`user_func`.
171 175 return decorator.decorate(user_func, functools.partial(
172 176 get_or_create_for_user_func, key_generator))
173 177
174 178 return cache_decorator
175 179
176 180
177 181 def make_region(*arg, **kw):
178 182 return RhodeCodeCacheRegion(*arg, **kw)
179 183
180 184
181 185 def get_default_cache_settings(settings, prefixes=None):
182 186 prefixes = prefixes or []
183 187 cache_settings = {}
184 188 for key in settings.keys():
185 189 for prefix in prefixes:
186 190 if key.startswith(prefix):
187 191 name = key.split(prefix)[1].strip()
188 192 val = settings[key]
189 193 if isinstance(val, compat.string_types):
190 194 val = val.strip()
191 195 cache_settings[name] = val
192 196 return cache_settings
193 197
194 198
195 199 def compute_key_from_params(*args):
196 200 """
197 201 Helper to compute key from given params to be used in cache manager
198 202 """
199 203 return sha1("_".join(map(safe_str, args)))
200 204
201 205
202 206 def backend_key_generator(backend):
203 207 """
204 208 Special wrapper that also sends over the backend to the key generator
205 209 """
206 210 def wrapper(namespace, fn):
207 211 return key_generator(backend, namespace, fn)
208 212 return wrapper
209 213
210 214
211 215 def key_generator(backend, namespace, fn):
212 216 fname = fn.__name__
213 217
214 218 def generate_key(*args):
215 219 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
216 220 namespace_pref = namespace or 'default_namespace'
217 221 arg_key = compute_key_from_params(*args)
218 222 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
219 223
220 224 return final_key
221 225
222 226 return generate_key
223 227
224 228
225 229 def get_or_create_region(region_name, region_namespace=None):
226 230 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
227 231 region_obj = region_meta.dogpile_cache_regions.get(region_name)
228 232 if not region_obj:
229 233 raise EnvironmentError(
230 234 'Region `{}` not in configured: {}.'.format(
231 235 region_name, region_meta.dogpile_cache_regions.keys()))
232 236
233 237 region_uid_name = '{}:{}'.format(region_name, region_namespace)
234 238 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
235 239 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
236 240 if region_exist:
237 241 log.debug('Using already configured region: %s', region_namespace)
238 242 return region_exist
239 243 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
240 244 expiration_time = region_obj.expiration_time
241 245
242 246 if not os.path.isdir(cache_dir):
243 247 os.makedirs(cache_dir)
244 248 new_region = make_region(
245 249 name=region_uid_name,
246 250 function_key_generator=backend_key_generator(region_obj.actual_backend)
247 251 )
248 252 namespace_filename = os.path.join(
249 253 cache_dir, "{}.cache.dbm".format(region_namespace))
250 254 # special type that allows 1db per namespace
251 255 new_region.configure(
252 256 backend='dogpile.cache.rc.file_namespace',
253 257 expiration_time=expiration_time,
254 258 arguments={"filename": namespace_filename}
255 259 )
256 260
257 261 # create and save in region caches
258 262 log.debug('configuring new region: %s', region_uid_name)
259 263 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
260 264
261 265 return region_obj
262 266
263 267
264 268 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
265 269 region = get_or_create_region(cache_region, cache_namespace_uid)
266 270 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
267 271 num_delete_keys = len(cache_keys)
268 272 if invalidate:
269 273 region.invalidate(hard=False)
270 274 else:
271 275 if num_delete_keys:
272 276 region.delete_multi(cache_keys)
273 277 return num_delete_keys
274 278
275 279
276 280 class ActiveRegionCache(object):
277 281 def __init__(self, context, cache_data):
278 282 self.context = context
279 283 self.cache_data = cache_data
280 284
281 285 def should_invalidate(self):
282 286 return False
283 287
284 288
285 289 class FreshRegionCache(object):
286 290 def __init__(self, context, cache_data):
287 291 self.context = context
288 292 self.cache_data = cache_data
289 293
290 294 def should_invalidate(self):
291 295 return True
292 296
293 297
294 298 class InvalidationContext(object):
295 299 """
296 300 usage::
297 301
298 302 from rhodecode.lib import rc_cache
299 303
300 304 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
301 305 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
302 306
303 307 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
304 308 def heavy_compute(cache_name, param1, param2):
305 309 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
306 310
307 311 # invalidation namespace is shared namespace key for all process caches
308 312 # we use it to send a global signal
309 313 invalidation_namespace = 'repo_cache:1'
310 314
311 315 inv_context_manager = rc_cache.InvalidationContext(
312 316 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
313 317 with inv_context_manager as invalidation_context:
314 318 args = ('one', 'two')
315 319 # re-compute and store cache if we get invalidate signal
316 320 if invalidation_context.should_invalidate():
317 321 result = heavy_compute.refresh(*args)
318 322 else:
319 323 result = heavy_compute(*args)
320 324
321 325 compute_time = inv_context_manager.compute_time
322 326 log.debug('result computed in %.4fs', compute_time)
323 327
324 328 # To send global invalidation signal, simply run
325 329 CacheKey.set_invalidate(invalidation_namespace)
326 330
327 331 """
328 332
329 333 def __repr__(self):
330 334 return '<InvalidationContext:{}[{}]>'.format(
331 335 safe_str(self.cache_key), safe_str(self.uid))
332 336
333 337 def __init__(self, uid, invalidation_namespace='',
334 338 raise_exception=False, thread_scoped=None):
335 339 self.uid = uid
336 340 self.invalidation_namespace = invalidation_namespace
337 341 self.raise_exception = raise_exception
338 342 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
339 343 self.thread_id = 'global'
340 344
341 345 if thread_scoped is None:
342 346 # if we set "default" we can override this via .ini settings
343 347 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
344 348
345 349 # Append the thread id to the cache key if this invalidation context
346 350 # should be scoped to the current thread.
347 351 if thread_scoped is True:
348 352 self.thread_id = threading.current_thread().ident
349 353
350 354 self.cache_key = compute_key_from_params(uid)
351 355 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
352 356 self.proc_id, self.thread_id, self.cache_key)
353 357 self.compute_time = 0
354 358
355 359 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
356 360 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
357 361 # fetch all cache keys for this namespace and convert them to a map to find if we
358 362 # have specific cache_key object registered. We do this because we want to have
359 363 # all consistent cache_state_uid for newly registered objects
360 364 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
361 365 cache_obj = cache_obj_map.get(self.cache_key)
362 366 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
363 367 if not cache_obj:
364 368 new_cache_args = invalidation_namespace
365 369 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
366 370 cache_state_uid = None
367 371 if first_cache_obj:
368 372 cache_state_uid = first_cache_obj.cache_state_uid
369 373 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
370 374 cache_state_uid=cache_state_uid)
371 375 cache_key_meta.cache_keys_by_pid.append(self.cache_key)
372 376
373 377 return cache_obj
374 378
375 379 def __enter__(self):
376 380 """
377 381 Test if current object is valid, and return CacheRegion function
378 382 that does invalidation and calculation
379 383 """
380 384 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
381 385 # register or get a new key based on uid
382 386 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
383 387 cache_data = self.cache_obj.get_dict()
384 388 self._start_time = time.time()
385 389 if self.cache_obj.cache_active:
386 390 # means our cache obj is existing and marked as it's
387 391 # cache is not outdated, we return ActiveRegionCache
388 392 self.skip_cache_active_change = True
389 393
390 394 return ActiveRegionCache(context=self, cache_data=cache_data)
391 395
392 396 # the key is either not existing or set to False, we return
393 397 # the real invalidator which re-computes value. We additionally set
394 398 # the flag to actually update the Database objects
395 399 self.skip_cache_active_change = False
396 400 return FreshRegionCache(context=self, cache_data=cache_data)
397 401
398 402 def __exit__(self, exc_type, exc_val, exc_tb):
399 403 # save compute time
400 404 self.compute_time = time.time() - self._start_time
401 405
402 406 if self.skip_cache_active_change:
403 407 return
404 408
405 409 try:
406 410 self.cache_obj.cache_active = True
407 411 Session().add(self.cache_obj)
408 412 Session().commit()
409 413 except IntegrityError:
410 414 # if we catch integrity error, it means we inserted this object
411 415 # assumption is that's really an edge race-condition case and
412 416 # it's safe is to skip it
413 417 Session().rollback()
414 418 except Exception:
415 419 log.exception('Failed to commit on cache key update')
416 420 Session().rollback()
417 421 if self.raise_exception:
418 422 raise
General Comments 0
You need to be logged in to leave comments. Login now