##// END OF EJS Templates
chore(caches): cleanup cache lib and added cleanup backend
super-admin -
r5497:f4c81595 default
parent child Browse files
Show More
@@ -1,335 +1,342 b''
1 1 # Copyright (C) 2015-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import errno
20 20 import fcntl
21 21 import functools
22 22 import logging
23 23 import os
24 24 import pickle
25 25 import time
26 26
27 27 import gevent
28 28 import msgpack
29 29 import redis
30 30
31 31 flock_org = fcntl.flock
32 32 from typing import Union
33 33
34 34 from dogpile.cache.api import Deserializer, Serializer
35 35 from dogpile.cache.backends import file as file_backend
36 36 from dogpile.cache.backends import memory as memory_backend
37 37 from dogpile.cache.backends import redis as redis_backend
38 38 from dogpile.cache.backends.file import FileLock
39 39 from dogpile.cache.util import memoized_property
40 40
41 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
42 from rhodecode.lib.str_utils import safe_bytes, safe_str
43 from rhodecode.lib.type_utils import str2bool
41 from ...lib.memory_lru_dict import LRUDict, LRUDictDebug
42 from ...lib.str_utils import safe_bytes, safe_str
43 from ...lib.type_utils import str2bool
44 44
45 45 _default_max_size = 1024
46 46
47 47 log = logging.getLogger(__name__)
48 48
49 49
50 50 class LRUMemoryBackend(memory_backend.MemoryBackend):
51 51 key_prefix = 'lru_mem_backend'
52 52 pickle_values = False
53 53
54 54 def __init__(self, arguments):
55 55 self.max_size = arguments.pop('max_size', _default_max_size)
56 56
57 57 LRUDictClass = LRUDict
58 58 if arguments.pop('log_key_count', None):
59 59 LRUDictClass = LRUDictDebug
60 60
61 61 arguments['cache_dict'] = LRUDictClass(self.max_size)
62 62 super().__init__(arguments)
63 63
64 64 def __repr__(self):
65 65 return f'{self.__class__}(maxsize=`{self.max_size}`)'
66 66
67 67 def __str__(self):
68 68 return self.__repr__()
69 69
70 70 def delete(self, key):
71 71 try:
72 72 del self._cache[key]
73 73 except KeyError:
74 74 # we don't care if key isn't there at deletion
75 75 pass
76 76
77 77 def list_keys(self, prefix):
78 78 return list(self._cache.keys())
79 79
80 80 def delete_multi(self, keys):
81 81 for key in keys:
82 82 self.delete(key)
83 83
84 84 def delete_multi_by_prefix(self, prefix):
85 85 cache_keys = self.list_keys(prefix=prefix)
86 86 num_affected_keys = len(cache_keys)
87 87 if num_affected_keys:
88 88 self.delete_multi(cache_keys)
89 89 return num_affected_keys
90 90
91 91
92 92 class PickleSerializer:
93 93 serializer: None | Serializer = staticmethod( # type: ignore
94 94 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
95 95 )
96 96 deserializer: None | Deserializer = staticmethod( # type: ignore
97 97 functools.partial(pickle.loads)
98 98 )
99 99
100 100
101 101 class MsgPackSerializer:
102 102 serializer: None | Serializer = staticmethod( # type: ignore
103 103 msgpack.packb
104 104 )
105 105 deserializer: None | Deserializer = staticmethod( # type: ignore
106 106 functools.partial(msgpack.unpackb, use_list=False)
107 107 )
108 108
109 109
110 110 class CustomLockFactory(FileLock):
111 111
112 112 @memoized_property
113 113 def _module(self):
114 114
115 115 def gevent_flock(fd, operation):
116 116 """
117 117 Gevent compatible flock
118 118 """
119 119 # set non-blocking, this will cause an exception if we cannot acquire a lock
120 120 operation |= fcntl.LOCK_NB
121 121 start_lock_time = time.time()
122 122 timeout = 60 * 15 # 15min
123 123 while True:
124 124 try:
125 125 flock_org(fd, operation)
126 126 # lock has been acquired
127 127 break
128 128 except (OSError, IOError) as e:
129 129 # raise on other errors than Resource temporarily unavailable
130 130 if e.errno != errno.EAGAIN:
131 131 raise
132 132 elif (time.time() - start_lock_time) > timeout:
133 133 # waited to much time on a lock, better fail than loop for ever
134 134 log.error('Failed to acquire lock on `%s` after waiting %ss',
135 135 self.filename, timeout)
136 136 raise
137 137 wait_timeout = 0.03
138 138 log.debug('Failed to acquire lock on `%s`, retry in %ss',
139 139 self.filename, wait_timeout)
140 140 gevent.sleep(wait_timeout)
141 141
142 142 fcntl.flock = gevent_flock
143 143 return fcntl
144 144
145 145
146 146 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
147 147 key_prefix = 'file_backend'
148 148
149 149 def __init__(self, arguments):
150 150 arguments['lock_factory'] = CustomLockFactory
151 151 db_file = arguments.get('filename')
152 152
153 153 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
154 154 db_file_dir = os.path.dirname(db_file)
155 155 if not os.path.isdir(db_file_dir):
156 156 os.makedirs(db_file_dir)
157 157
158 158 try:
159 159 super().__init__(arguments)
160 160 except Exception:
161 161 log.exception('Failed to initialize db at: %s', db_file)
162 162 raise
163 163
164 164 def __repr__(self):
165 165 return f'{self.__class__}(file=`{self.filename}`)'
166 166
167 167 def __str__(self):
168 168 return self.__repr__()
169 169
170 170 def _get_keys_pattern(self, prefix: bytes = b''):
171 171 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
172 172
173 173 def list_keys(self, prefix: bytes = b''):
174 174 prefix = self._get_keys_pattern(prefix)
175 175
176 176 def cond(dbm_key: bytes):
177 177 if not prefix:
178 178 return True
179 179
180 180 if dbm_key.startswith(prefix):
181 181 return True
182 182 return False
183 183
184 184 with self._dbm_file(True) as dbm:
185 185 try:
186 186 return list(filter(cond, dbm.keys()))
187 187 except Exception:
188 188 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
189 189 raise
190 190
191 191 def delete_multi_by_prefix(self, prefix):
192 192 cache_keys = self.list_keys(prefix=prefix)
193 193 num_affected_keys = len(cache_keys)
194 194 if num_affected_keys:
195 195 self.delete_multi(cache_keys)
196 196 return num_affected_keys
197 197
198 198 def get_store(self):
199 199 return self.filename
200 200
201 def cleanup_store(self):
202 for ext in ("db", "dat", "pag", "dir"):
203 final_filename = self.filename + os.extsep + ext
204 if os.path.exists(final_filename):
205 os.remove(final_filename)
206 log.warning('Removed dbm file %s', final_filename)
207
201 208
202 209 class BaseRedisBackend(redis_backend.RedisBackend):
203 210 key_prefix = ''
204 211
205 212 def __init__(self, arguments):
206 213 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
207 214 super().__init__(arguments)
208 215
209 216 self._lock_timeout = self.lock_timeout
210 217 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
211 218
212 219 if self._lock_auto_renewal and not self._lock_timeout:
213 220 # set default timeout for auto_renewal
214 221 self._lock_timeout = 30
215 222
216 223 def __repr__(self):
217 224 return f'{self.__class__}(conn=`{self.db_conn}`)'
218 225
219 226 def __str__(self):
220 227 return self.__repr__()
221 228
222 229 def _create_client(self):
223 230 args = {}
224 231
225 232 if self.url is not None:
226 233 args.update(url=self.url)
227 234
228 235 else:
229 236 args.update(
230 237 host=self.host, password=self.password,
231 238 port=self.port, db=self.db
232 239 )
233 240
234 241 connection_pool = redis.ConnectionPool(**args)
235 242 self.writer_client = redis.StrictRedis(
236 243 connection_pool=connection_pool
237 244 )
238 245 self.reader_client = self.writer_client
239 246
240 247 def _get_keys_pattern(self, prefix: bytes = b''):
241 248 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
242 249
243 250 def list_keys(self, prefix: bytes = b''):
244 251 prefix = self._get_keys_pattern(prefix)
245 252 return self.reader_client.keys(prefix)
246 253
247 254 def delete_multi_by_prefix(self, prefix, use_lua=False):
248 255 if use_lua:
249 256 # high efficient LUA script to delete ALL keys by prefix...
250 257 lua = """local keys = redis.call('keys', ARGV[1])
251 258 for i=1,#keys,5000 do
252 259 redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
253 260 end
254 261 return #keys"""
255 262 num_affected_keys = self.writer_client.eval(
256 263 lua,
257 264 0,
258 265 f"{prefix}*")
259 266 else:
260 267 cache_keys = self.list_keys(prefix=prefix)
261 268 num_affected_keys = len(cache_keys)
262 269 if num_affected_keys:
263 270 self.delete_multi(cache_keys)
264 271 return num_affected_keys
265 272
266 273 def get_store(self):
267 274 return self.reader_client.connection_pool
268 275
269 276 def get_mutex(self, key):
270 277 if self.distributed_lock:
271 278 lock_key = f'_lock_{safe_str(key)}'
272 279 return get_mutex_lock(
273 280 self.writer_client, lock_key,
274 281 self._lock_timeout,
275 282 auto_renewal=self._lock_auto_renewal
276 283 )
277 284 else:
278 285 return None
279 286
280 287
281 288 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
282 289 key_prefix = 'redis_pickle_backend'
283 290 pass
284 291
285 292
286 293 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
287 294 key_prefix = 'redis_msgpack_backend'
288 295 pass
289 296
290 297
291 298 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
292 from rhodecode.lib._vendor import redis_lock
299 from ...lib._vendor import redis_lock
293 300
294 301 class _RedisLockWrapper:
295 302 """LockWrapper for redis_lock"""
296 303
297 304 @classmethod
298 305 def get_lock(cls):
299 306 return redis_lock.Lock(
300 307 redis_client=client,
301 308 name=lock_key,
302 309 expire=lock_timeout,
303 310 auto_renewal=auto_renewal,
304 311 strict=True,
305 312 )
306 313
307 314 def __repr__(self):
308 315 return f"{self.__class__.__name__}:{lock_key}"
309 316
310 317 def __str__(self):
311 318 return f"{self.__class__.__name__}:{lock_key}"
312 319
313 320 def __init__(self):
314 321 self.lock = self.get_lock()
315 322 self.lock_key = lock_key
316 323
317 324 def acquire(self, wait=True):
318 325 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
319 326 try:
320 327 acquired = self.lock.acquire(wait)
321 328 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
322 329 return acquired
323 330 except redis_lock.AlreadyAcquired:
324 331 return False
325 332 except redis_lock.AlreadyStarted:
326 333 # refresh thread exists, but it also means we acquired the lock
327 334 return True
328 335
329 336 def release(self):
330 337 try:
331 338 self.lock.release()
332 339 except redis_lock.NotAcquired:
333 340 pass
334 341
335 342 return _RedisLockWrapper()
@@ -1,357 +1,357 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import functools
20 20 import logging
21 21 import os
22 22 import threading
23 23 import time
24 24
25 25 import decorator
26 26 from dogpile.cache import CacheRegion
27 27
28 28 import rhodecode
29 from rhodecode.lib.hash_utils import sha1
30 from rhodecode.lib.str_utils import safe_bytes
31 from rhodecode.lib.type_utils import str2bool # noqa :required by imports from .utils
29 from ...lib.hash_utils import sha1
30 from ...lib.str_utils import safe_bytes
31 from ...lib.type_utils import str2bool # noqa :required by imports from .utils
32 32
33 33 from . import region_meta
34 34
35 35 log = logging.getLogger(__name__)
36 36
37 37
38 38 def isCython(func):
39 39 """
40 40 Private helper that checks if a function is a cython function.
41 41 """
42 42 return func.__class__.__name__ == 'cython_function_or_method'
43 43
44 44
45 45 class RhodeCodeCacheRegion(CacheRegion):
46 46
47 47 def __repr__(self):
48 48 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
49 49
50 50 def conditional_cache_on_arguments(
51 51 self, namespace=None,
52 52 expiration_time=None,
53 53 should_cache_fn=None,
54 54 to_str=str,
55 55 function_key_generator=None,
56 56 condition=True):
57 57 """
58 58 Custom conditional decorator, that will not touch any dogpile internals if
59 59 condition isn't meet. This works a bit different from should_cache_fn
60 60 And it's faster in cases we don't ever want to compute cached values
61 61 """
62 62 expiration_time_is_callable = callable(expiration_time)
63 63 if not namespace:
64 64 namespace = getattr(self, '_default_namespace', None)
65 65
66 66 if function_key_generator is None:
67 67 function_key_generator = self.function_key_generator
68 68
69 69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
70 70
71 71 if not condition:
72 72 log.debug('Calling un-cached method:%s', user_func.__name__)
73 73 start = time.time()
74 74 result = user_func(*arg, **kw)
75 75 total = time.time() - start
76 76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
77 77 return result
78 78
79 79 key = func_key_generator(*arg, **kw)
80 80
81 81 timeout = expiration_time() if expiration_time_is_callable \
82 82 else expiration_time
83 83
84 84 log.debug('Calling cached method:`%s`', user_func.__name__)
85 85 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
86 86
87 87 def cache_decorator(user_func):
88 88 if to_str is str:
89 89 # backwards compatible
90 90 key_generator = function_key_generator(namespace, user_func)
91 91 else:
92 92 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
93 93
94 94 def refresh(*arg, **kw):
95 95 """
96 96 Like invalidate, but regenerates the value instead
97 97 """
98 98 key = key_generator(*arg, **kw)
99 99 value = user_func(*arg, **kw)
100 100 self.set(key, value)
101 101 return value
102 102
103 103 def invalidate(*arg, **kw):
104 104 key = key_generator(*arg, **kw)
105 105 self.delete(key)
106 106
107 107 def set_(value, *arg, **kw):
108 108 key = key_generator(*arg, **kw)
109 109 self.set(key, value)
110 110
111 111 def get(*arg, **kw):
112 112 key = key_generator(*arg, **kw)
113 113 return self.get(key)
114 114
115 115 user_func.set = set_
116 116 user_func.invalidate = invalidate
117 117 user_func.get = get
118 118 user_func.refresh = refresh
119 119 user_func.key_generator = key_generator
120 120 user_func.original = user_func
121 121
122 122 # Use `decorate` to preserve the signature of :param:`user_func`.
123 123 return decorator.decorate(user_func, functools.partial(
124 124 get_or_create_for_user_func, key_generator))
125 125
126 126 return cache_decorator
127 127
128 128
129 129 def make_region(*arg, **kw):
130 130 return RhodeCodeCacheRegion(*arg, **kw)
131 131
132 132
133 133 def get_default_cache_settings(settings, prefixes=None):
134 134 prefixes = prefixes or []
135 135 cache_settings = {}
136 136 for key in settings.keys():
137 137 for prefix in prefixes:
138 138 if key.startswith(prefix):
139 139 name = key.split(prefix)[1].strip()
140 140 val = settings[key]
141 141 if isinstance(val, str):
142 142 val = val.strip()
143 143 cache_settings[name] = val
144 144 return cache_settings
145 145
146 146
147 147 def compute_key_from_params(*args):
148 148 """
149 149 Helper to compute key from given params to be used in cache manager
150 150 """
151 151 return sha1(safe_bytes("_".join(map(str, args))))
152 152
153 153
154 154 def custom_key_generator(backend, namespace, fn):
155 155 func_name = fn.__name__
156 156
157 157 def generate_key(*args):
158 158 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
159 159 namespace_pref = namespace or 'default_namespace'
160 160 arg_key = compute_key_from_params(*args)
161 161 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
162 162
163 163 return final_key
164 164
165 165 return generate_key
166 166
167 167
168 168 def backend_key_generator(backend):
169 169 """
170 170 Special wrapper that also sends over the backend to the key generator
171 171 """
172 172 def wrapper(namespace, fn):
173 173 return custom_key_generator(backend, namespace, fn)
174 174 return wrapper
175 175
176 176
177 177 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
178 178 from .backends import FileNamespaceBackend
179 179 from . import async_creation_runner
180 180
181 181 region_obj = region_meta.dogpile_cache_regions.get(region_name)
182 182 if not region_obj:
183 183 reg_keys = list(region_meta.dogpile_cache_regions.keys())
184 184 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
185 185
186 186 region_uid_name = f'{region_name}:{region_namespace}'
187 187
188 188 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
189 189 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
190 190 if not region_namespace:
191 191 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
192 192
193 193 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
194 194 if region_exist:
195 195 log.debug('Using already configured region: %s', region_namespace)
196 196 return region_exist
197 197
198 198 expiration_time = region_obj.expiration_time
199 199
200 200 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
201 201 namespace_cache_dir = cache_dir
202 202
203 203 # we default the namespace_cache_dir to our default cache dir.
204 204 # however, if this backend is configured with filename= param, we prioritize that
205 205 # so all caches within that particular region, even those namespaced end up in the same path
206 206 if region_obj.actual_backend.filename:
207 207 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
208 208
209 209 if not os.path.isdir(namespace_cache_dir):
210 210 os.makedirs(namespace_cache_dir)
211 211 new_region = make_region(
212 212 name=region_uid_name,
213 213 function_key_generator=backend_key_generator(region_obj.actual_backend)
214 214 )
215 215
216 216 namespace_filename = os.path.join(
217 217 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
218 218 # special type that allows 1db per namespace
219 219 new_region.configure(
220 220 backend='dogpile.cache.rc.file_namespace',
221 221 expiration_time=expiration_time,
222 222 arguments={"filename": namespace_filename}
223 223 )
224 224
225 225 # create and save in region caches
226 226 log.debug('configuring new region: %s', region_uid_name)
227 227 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
228 228
229 229 region_obj._default_namespace = region_namespace
230 230 if use_async_runner:
231 231 region_obj.async_creation_runner = async_creation_runner
232 232 return region_obj
233 233
234 234
235 235 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
236 236 from . import CLEAR_DELETE, CLEAR_INVALIDATE
237 237
238 238 if not isinstance(cache_region, RhodeCodeCacheRegion):
239 239 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
240 240 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
241 241 cache_region, cache_namespace_uid, method)
242 242
243 243 num_affected_keys = 0
244 244
245 245 if method == CLEAR_INVALIDATE:
246 246 # NOTE: The CacheRegion.invalidate() method’s default mode of
247 247 # operation is to set a timestamp local to this CacheRegion in this Python process only.
248 248 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
249 249 cache_region.invalidate(hard=True)
250 250
251 251 if method == CLEAR_DELETE:
252 252 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
253 253 return num_affected_keys
254 254
255 255
256 256 class ActiveRegionCache(object):
257 257 def __init__(self, context, cache_data: dict):
258 258 self.context = context
259 259 self.cache_data = cache_data
260 260
261 261 @property
262 262 def state_uid(self) -> str:
263 263 return self.cache_data['cache_state_uid']
264 264
265 265
266 266 class InvalidationContext(object):
267 267 """
268 268 usage::
269 269
270 270 from rhodecode.lib import rc_cache
271 271
272 272 repo_namespace_key = 'some-cache-for-repo-id-100'
273 273 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
274 274
275 275 def cache_generator(_state_uid):
276 276
277 277 @region.conditional_cache_on_arguments(namespace='some-common-namespace-100')
278 278 def _dummy_func(*args):
279 279 # compute heavy function
280 280 return _state_uid, 'result'
281 281
282 282 return _dummy_func
283 283
284 284 with inv_context_manager as invalidation_context:
285 285 cache_state_uid = invalidation_context.state_uid
286 286 cache_func = cache_generator(cache_state_uid)
287 287 previous_state_uid, result = cache_func(*call_args)
288 288
289 289 should_invalidate = previous_state_uid != cache_state_uid
290 290 if should_invalidate:
291 291 _, result = cache_func.refresh(*call_args)
292 292
293 293 # To send global invalidation signal, simply run
294 294 CacheKey.set_invalidate(repo_namespace_key)
295 295
296 296 """
297 297
298 298 def __repr__(self):
299 299 return f'<InvalidationContext:{self.cache_key}>'
300 300
301 301 def __init__(self, key, raise_exception=False, thread_scoped=None):
302 302 self.cache_key = key
303 303
304 304 self.raise_exception = raise_exception
305 305 self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT'
306 306 self.thread_id = 'global'
307 307
308 308 if thread_scoped is None:
309 309 # if we set "default" we can override this via .ini settings
310 310 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
311 311
312 312 # Append the thread id to the cache key if this invalidation context
313 313 # should be scoped to the current thread.
314 314 if thread_scoped is True:
315 315 self.thread_id = threading.current_thread().ident
316 316
317 317 self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}'
318 318 self.compute_time = 0
319 319
320 320 def get_or_create_cache_obj(self):
321 321 from rhodecode.model.db import CacheKey, Session, IntegrityError
322 322
323 323 cache_obj = CacheKey.get_active_cache(self.cache_key)
324 324 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
325 325
326 326 if not cache_obj:
327 327 # generate new UID for non-existing cache object
328 328 cache_state_uid = CacheKey.generate_new_state_uid()
329 329 cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}',
330 330 cache_state_uid=cache_state_uid, cache_active=True)
331 331 try:
332 332 Session().add(cache_obj)
333 333 Session().commit()
334 334 except IntegrityError:
335 335 # if we catch integrity error, it means we inserted this object
336 336 # assumption is that's really an edge race-condition case and
337 337 # it's safe is to skip it
338 338 Session().rollback()
339 339 except Exception:
340 340 log.exception('Failed to commit on cache key update')
341 341 Session().rollback()
342 342 if self.raise_exception:
343 343 raise
344 344 return cache_obj
345 345
346 346 def __enter__(self):
347 347 log.debug('Entering cache invalidation check context: %s', self)
348 348 self._start_time = time.time()
349 349
350 350 self.cache_obj = self.get_or_create_cache_obj()
351 351 cache_data = self.cache_obj.get_dict()
352 352
353 353 return ActiveRegionCache(context=self, cache_data=cache_data)
354 354
355 355 def __exit__(self, exc_type, exc_val, exc_tb):
356 356 # save compute time
357 357 self.compute_time = time.time() - self._start_time
General Comments 0
You need to be logged in to leave comments. Login now