##// END OF EJS Templates
chore(caches): cleanup cache lib and added cleanup backend
super-admin -
r5497:f4c81595 default
parent child Browse files
Show More
@@ -1,335 +1,342 b''
1 # Copyright (C) 2015-2023 RhodeCode GmbH
1 # Copyright (C) 2015-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import errno
19 import errno
20 import fcntl
20 import fcntl
21 import functools
21 import functools
22 import logging
22 import logging
23 import os
23 import os
24 import pickle
24 import pickle
25 import time
25 import time
26
26
27 import gevent
27 import gevent
28 import msgpack
28 import msgpack
29 import redis
29 import redis
30
30
31 flock_org = fcntl.flock
31 flock_org = fcntl.flock
32 from typing import Union
32 from typing import Union
33
33
34 from dogpile.cache.api import Deserializer, Serializer
34 from dogpile.cache.api import Deserializer, Serializer
35 from dogpile.cache.backends import file as file_backend
35 from dogpile.cache.backends import file as file_backend
36 from dogpile.cache.backends import memory as memory_backend
36 from dogpile.cache.backends import memory as memory_backend
37 from dogpile.cache.backends import redis as redis_backend
37 from dogpile.cache.backends import redis as redis_backend
38 from dogpile.cache.backends.file import FileLock
38 from dogpile.cache.backends.file import FileLock
39 from dogpile.cache.util import memoized_property
39 from dogpile.cache.util import memoized_property
40
40
41 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from ...lib.memory_lru_dict import LRUDict, LRUDictDebug
42 from rhodecode.lib.str_utils import safe_bytes, safe_str
42 from ...lib.str_utils import safe_bytes, safe_str
43 from rhodecode.lib.type_utils import str2bool
43 from ...lib.type_utils import str2bool
44
44
45 _default_max_size = 1024
45 _default_max_size = 1024
46
46
47 log = logging.getLogger(__name__)
47 log = logging.getLogger(__name__)
48
48
49
49
50 class LRUMemoryBackend(memory_backend.MemoryBackend):
50 class LRUMemoryBackend(memory_backend.MemoryBackend):
51 key_prefix = 'lru_mem_backend'
51 key_prefix = 'lru_mem_backend'
52 pickle_values = False
52 pickle_values = False
53
53
54 def __init__(self, arguments):
54 def __init__(self, arguments):
55 self.max_size = arguments.pop('max_size', _default_max_size)
55 self.max_size = arguments.pop('max_size', _default_max_size)
56
56
57 LRUDictClass = LRUDict
57 LRUDictClass = LRUDict
58 if arguments.pop('log_key_count', None):
58 if arguments.pop('log_key_count', None):
59 LRUDictClass = LRUDictDebug
59 LRUDictClass = LRUDictDebug
60
60
61 arguments['cache_dict'] = LRUDictClass(self.max_size)
61 arguments['cache_dict'] = LRUDictClass(self.max_size)
62 super().__init__(arguments)
62 super().__init__(arguments)
63
63
64 def __repr__(self):
64 def __repr__(self):
65 return f'{self.__class__}(maxsize=`{self.max_size}`)'
65 return f'{self.__class__}(maxsize=`{self.max_size}`)'
66
66
67 def __str__(self):
67 def __str__(self):
68 return self.__repr__()
68 return self.__repr__()
69
69
70 def delete(self, key):
70 def delete(self, key):
71 try:
71 try:
72 del self._cache[key]
72 del self._cache[key]
73 except KeyError:
73 except KeyError:
74 # we don't care if key isn't there at deletion
74 # we don't care if key isn't there at deletion
75 pass
75 pass
76
76
77 def list_keys(self, prefix):
77 def list_keys(self, prefix):
78 return list(self._cache.keys())
78 return list(self._cache.keys())
79
79
80 def delete_multi(self, keys):
80 def delete_multi(self, keys):
81 for key in keys:
81 for key in keys:
82 self.delete(key)
82 self.delete(key)
83
83
84 def delete_multi_by_prefix(self, prefix):
84 def delete_multi_by_prefix(self, prefix):
85 cache_keys = self.list_keys(prefix=prefix)
85 cache_keys = self.list_keys(prefix=prefix)
86 num_affected_keys = len(cache_keys)
86 num_affected_keys = len(cache_keys)
87 if num_affected_keys:
87 if num_affected_keys:
88 self.delete_multi(cache_keys)
88 self.delete_multi(cache_keys)
89 return num_affected_keys
89 return num_affected_keys
90
90
91
91
92 class PickleSerializer:
92 class PickleSerializer:
93 serializer: None | Serializer = staticmethod( # type: ignore
93 serializer: None | Serializer = staticmethod( # type: ignore
94 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
94 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
95 )
95 )
96 deserializer: None | Deserializer = staticmethod( # type: ignore
96 deserializer: None | Deserializer = staticmethod( # type: ignore
97 functools.partial(pickle.loads)
97 functools.partial(pickle.loads)
98 )
98 )
99
99
100
100
101 class MsgPackSerializer:
101 class MsgPackSerializer:
102 serializer: None | Serializer = staticmethod( # type: ignore
102 serializer: None | Serializer = staticmethod( # type: ignore
103 msgpack.packb
103 msgpack.packb
104 )
104 )
105 deserializer: None | Deserializer = staticmethod( # type: ignore
105 deserializer: None | Deserializer = staticmethod( # type: ignore
106 functools.partial(msgpack.unpackb, use_list=False)
106 functools.partial(msgpack.unpackb, use_list=False)
107 )
107 )
108
108
109
109
110 class CustomLockFactory(FileLock):
110 class CustomLockFactory(FileLock):
111
111
112 @memoized_property
112 @memoized_property
113 def _module(self):
113 def _module(self):
114
114
115 def gevent_flock(fd, operation):
115 def gevent_flock(fd, operation):
116 """
116 """
117 Gevent compatible flock
117 Gevent compatible flock
118 """
118 """
119 # set non-blocking, this will cause an exception if we cannot acquire a lock
119 # set non-blocking, this will cause an exception if we cannot acquire a lock
120 operation |= fcntl.LOCK_NB
120 operation |= fcntl.LOCK_NB
121 start_lock_time = time.time()
121 start_lock_time = time.time()
122 timeout = 60 * 15 # 15min
122 timeout = 60 * 15 # 15min
123 while True:
123 while True:
124 try:
124 try:
125 flock_org(fd, operation)
125 flock_org(fd, operation)
126 # lock has been acquired
126 # lock has been acquired
127 break
127 break
128 except (OSError, IOError) as e:
128 except (OSError, IOError) as e:
129 # raise on other errors than Resource temporarily unavailable
129 # raise on other errors than Resource temporarily unavailable
130 if e.errno != errno.EAGAIN:
130 if e.errno != errno.EAGAIN:
131 raise
131 raise
132 elif (time.time() - start_lock_time) > timeout:
132 elif (time.time() - start_lock_time) > timeout:
133 # waited to much time on a lock, better fail than loop for ever
133 # waited to much time on a lock, better fail than loop for ever
134 log.error('Failed to acquire lock on `%s` after waiting %ss',
134 log.error('Failed to acquire lock on `%s` after waiting %ss',
135 self.filename, timeout)
135 self.filename, timeout)
136 raise
136 raise
137 wait_timeout = 0.03
137 wait_timeout = 0.03
138 log.debug('Failed to acquire lock on `%s`, retry in %ss',
138 log.debug('Failed to acquire lock on `%s`, retry in %ss',
139 self.filename, wait_timeout)
139 self.filename, wait_timeout)
140 gevent.sleep(wait_timeout)
140 gevent.sleep(wait_timeout)
141
141
142 fcntl.flock = gevent_flock
142 fcntl.flock = gevent_flock
143 return fcntl
143 return fcntl
144
144
145
145
146 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
146 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
147 key_prefix = 'file_backend'
147 key_prefix = 'file_backend'
148
148
149 def __init__(self, arguments):
149 def __init__(self, arguments):
150 arguments['lock_factory'] = CustomLockFactory
150 arguments['lock_factory'] = CustomLockFactory
151 db_file = arguments.get('filename')
151 db_file = arguments.get('filename')
152
152
153 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
153 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
154 db_file_dir = os.path.dirname(db_file)
154 db_file_dir = os.path.dirname(db_file)
155 if not os.path.isdir(db_file_dir):
155 if not os.path.isdir(db_file_dir):
156 os.makedirs(db_file_dir)
156 os.makedirs(db_file_dir)
157
157
158 try:
158 try:
159 super().__init__(arguments)
159 super().__init__(arguments)
160 except Exception:
160 except Exception:
161 log.exception('Failed to initialize db at: %s', db_file)
161 log.exception('Failed to initialize db at: %s', db_file)
162 raise
162 raise
163
163
164 def __repr__(self):
164 def __repr__(self):
165 return f'{self.__class__}(file=`{self.filename}`)'
165 return f'{self.__class__}(file=`{self.filename}`)'
166
166
167 def __str__(self):
167 def __str__(self):
168 return self.__repr__()
168 return self.__repr__()
169
169
170 def _get_keys_pattern(self, prefix: bytes = b''):
170 def _get_keys_pattern(self, prefix: bytes = b''):
171 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
171 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
172
172
173 def list_keys(self, prefix: bytes = b''):
173 def list_keys(self, prefix: bytes = b''):
174 prefix = self._get_keys_pattern(prefix)
174 prefix = self._get_keys_pattern(prefix)
175
175
176 def cond(dbm_key: bytes):
176 def cond(dbm_key: bytes):
177 if not prefix:
177 if not prefix:
178 return True
178 return True
179
179
180 if dbm_key.startswith(prefix):
180 if dbm_key.startswith(prefix):
181 return True
181 return True
182 return False
182 return False
183
183
184 with self._dbm_file(True) as dbm:
184 with self._dbm_file(True) as dbm:
185 try:
185 try:
186 return list(filter(cond, dbm.keys()))
186 return list(filter(cond, dbm.keys()))
187 except Exception:
187 except Exception:
188 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
188 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
189 raise
189 raise
190
190
191 def delete_multi_by_prefix(self, prefix):
191 def delete_multi_by_prefix(self, prefix):
192 cache_keys = self.list_keys(prefix=prefix)
192 cache_keys = self.list_keys(prefix=prefix)
193 num_affected_keys = len(cache_keys)
193 num_affected_keys = len(cache_keys)
194 if num_affected_keys:
194 if num_affected_keys:
195 self.delete_multi(cache_keys)
195 self.delete_multi(cache_keys)
196 return num_affected_keys
196 return num_affected_keys
197
197
198 def get_store(self):
198 def get_store(self):
199 return self.filename
199 return self.filename
200
200
201 def cleanup_store(self):
202 for ext in ("db", "dat", "pag", "dir"):
203 final_filename = self.filename + os.extsep + ext
204 if os.path.exists(final_filename):
205 os.remove(final_filename)
206 log.warning('Removed dbm file %s', final_filename)
207
201
208
202 class BaseRedisBackend(redis_backend.RedisBackend):
209 class BaseRedisBackend(redis_backend.RedisBackend):
203 key_prefix = ''
210 key_prefix = ''
204
211
205 def __init__(self, arguments):
212 def __init__(self, arguments):
206 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
213 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
207 super().__init__(arguments)
214 super().__init__(arguments)
208
215
209 self._lock_timeout = self.lock_timeout
216 self._lock_timeout = self.lock_timeout
210 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
217 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
211
218
212 if self._lock_auto_renewal and not self._lock_timeout:
219 if self._lock_auto_renewal and not self._lock_timeout:
213 # set default timeout for auto_renewal
220 # set default timeout for auto_renewal
214 self._lock_timeout = 30
221 self._lock_timeout = 30
215
222
216 def __repr__(self):
223 def __repr__(self):
217 return f'{self.__class__}(conn=`{self.db_conn}`)'
224 return f'{self.__class__}(conn=`{self.db_conn}`)'
218
225
219 def __str__(self):
226 def __str__(self):
220 return self.__repr__()
227 return self.__repr__()
221
228
222 def _create_client(self):
229 def _create_client(self):
223 args = {}
230 args = {}
224
231
225 if self.url is not None:
232 if self.url is not None:
226 args.update(url=self.url)
233 args.update(url=self.url)
227
234
228 else:
235 else:
229 args.update(
236 args.update(
230 host=self.host, password=self.password,
237 host=self.host, password=self.password,
231 port=self.port, db=self.db
238 port=self.port, db=self.db
232 )
239 )
233
240
234 connection_pool = redis.ConnectionPool(**args)
241 connection_pool = redis.ConnectionPool(**args)
235 self.writer_client = redis.StrictRedis(
242 self.writer_client = redis.StrictRedis(
236 connection_pool=connection_pool
243 connection_pool=connection_pool
237 )
244 )
238 self.reader_client = self.writer_client
245 self.reader_client = self.writer_client
239
246
240 def _get_keys_pattern(self, prefix: bytes = b''):
247 def _get_keys_pattern(self, prefix: bytes = b''):
241 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
248 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
242
249
243 def list_keys(self, prefix: bytes = b''):
250 def list_keys(self, prefix: bytes = b''):
244 prefix = self._get_keys_pattern(prefix)
251 prefix = self._get_keys_pattern(prefix)
245 return self.reader_client.keys(prefix)
252 return self.reader_client.keys(prefix)
246
253
247 def delete_multi_by_prefix(self, prefix, use_lua=False):
254 def delete_multi_by_prefix(self, prefix, use_lua=False):
248 if use_lua:
255 if use_lua:
249 # high efficient LUA script to delete ALL keys by prefix...
256 # high efficient LUA script to delete ALL keys by prefix...
250 lua = """local keys = redis.call('keys', ARGV[1])
257 lua = """local keys = redis.call('keys', ARGV[1])
251 for i=1,#keys,5000 do
258 for i=1,#keys,5000 do
252 redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
259 redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
253 end
260 end
254 return #keys"""
261 return #keys"""
255 num_affected_keys = self.writer_client.eval(
262 num_affected_keys = self.writer_client.eval(
256 lua,
263 lua,
257 0,
264 0,
258 f"{prefix}*")
265 f"{prefix}*")
259 else:
266 else:
260 cache_keys = self.list_keys(prefix=prefix)
267 cache_keys = self.list_keys(prefix=prefix)
261 num_affected_keys = len(cache_keys)
268 num_affected_keys = len(cache_keys)
262 if num_affected_keys:
269 if num_affected_keys:
263 self.delete_multi(cache_keys)
270 self.delete_multi(cache_keys)
264 return num_affected_keys
271 return num_affected_keys
265
272
266 def get_store(self):
273 def get_store(self):
267 return self.reader_client.connection_pool
274 return self.reader_client.connection_pool
268
275
269 def get_mutex(self, key):
276 def get_mutex(self, key):
270 if self.distributed_lock:
277 if self.distributed_lock:
271 lock_key = f'_lock_{safe_str(key)}'
278 lock_key = f'_lock_{safe_str(key)}'
272 return get_mutex_lock(
279 return get_mutex_lock(
273 self.writer_client, lock_key,
280 self.writer_client, lock_key,
274 self._lock_timeout,
281 self._lock_timeout,
275 auto_renewal=self._lock_auto_renewal
282 auto_renewal=self._lock_auto_renewal
276 )
283 )
277 else:
284 else:
278 return None
285 return None
279
286
280
287
281 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
288 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
282 key_prefix = 'redis_pickle_backend'
289 key_prefix = 'redis_pickle_backend'
283 pass
290 pass
284
291
285
292
286 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
293 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
287 key_prefix = 'redis_msgpack_backend'
294 key_prefix = 'redis_msgpack_backend'
288 pass
295 pass
289
296
290
297
291 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
298 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
292 from rhodecode.lib._vendor import redis_lock
299 from ...lib._vendor import redis_lock
293
300
294 class _RedisLockWrapper:
301 class _RedisLockWrapper:
295 """LockWrapper for redis_lock"""
302 """LockWrapper for redis_lock"""
296
303
297 @classmethod
304 @classmethod
298 def get_lock(cls):
305 def get_lock(cls):
299 return redis_lock.Lock(
306 return redis_lock.Lock(
300 redis_client=client,
307 redis_client=client,
301 name=lock_key,
308 name=lock_key,
302 expire=lock_timeout,
309 expire=lock_timeout,
303 auto_renewal=auto_renewal,
310 auto_renewal=auto_renewal,
304 strict=True,
311 strict=True,
305 )
312 )
306
313
307 def __repr__(self):
314 def __repr__(self):
308 return f"{self.__class__.__name__}:{lock_key}"
315 return f"{self.__class__.__name__}:{lock_key}"
309
316
310 def __str__(self):
317 def __str__(self):
311 return f"{self.__class__.__name__}:{lock_key}"
318 return f"{self.__class__.__name__}:{lock_key}"
312
319
313 def __init__(self):
320 def __init__(self):
314 self.lock = self.get_lock()
321 self.lock = self.get_lock()
315 self.lock_key = lock_key
322 self.lock_key = lock_key
316
323
317 def acquire(self, wait=True):
324 def acquire(self, wait=True):
318 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
325 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
319 try:
326 try:
320 acquired = self.lock.acquire(wait)
327 acquired = self.lock.acquire(wait)
321 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
328 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
322 return acquired
329 return acquired
323 except redis_lock.AlreadyAcquired:
330 except redis_lock.AlreadyAcquired:
324 return False
331 return False
325 except redis_lock.AlreadyStarted:
332 except redis_lock.AlreadyStarted:
326 # refresh thread exists, but it also means we acquired the lock
333 # refresh thread exists, but it also means we acquired the lock
327 return True
334 return True
328
335
329 def release(self):
336 def release(self):
330 try:
337 try:
331 self.lock.release()
338 self.lock.release()
332 except redis_lock.NotAcquired:
339 except redis_lock.NotAcquired:
333 pass
340 pass
334
341
335 return _RedisLockWrapper()
342 return _RedisLockWrapper()
@@ -1,357 +1,357 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import functools
19 import functools
20 import logging
20 import logging
21 import os
21 import os
22 import threading
22 import threading
23 import time
23 import time
24
24
25 import decorator
25 import decorator
26 from dogpile.cache import CacheRegion
26 from dogpile.cache import CacheRegion
27
27
28 import rhodecode
28 import rhodecode
29 from rhodecode.lib.hash_utils import sha1
29 from ...lib.hash_utils import sha1
30 from rhodecode.lib.str_utils import safe_bytes
30 from ...lib.str_utils import safe_bytes
31 from rhodecode.lib.type_utils import str2bool # noqa :required by imports from .utils
31 from ...lib.type_utils import str2bool # noqa :required by imports from .utils
32
32
33 from . import region_meta
33 from . import region_meta
34
34
35 log = logging.getLogger(__name__)
35 log = logging.getLogger(__name__)
36
36
37
37
38 def isCython(func):
38 def isCython(func):
39 """
39 """
40 Private helper that checks if a function is a cython function.
40 Private helper that checks if a function is a cython function.
41 """
41 """
42 return func.__class__.__name__ == 'cython_function_or_method'
42 return func.__class__.__name__ == 'cython_function_or_method'
43
43
44
44
45 class RhodeCodeCacheRegion(CacheRegion):
45 class RhodeCodeCacheRegion(CacheRegion):
46
46
47 def __repr__(self):
47 def __repr__(self):
48 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
48 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
49
49
50 def conditional_cache_on_arguments(
50 def conditional_cache_on_arguments(
51 self, namespace=None,
51 self, namespace=None,
52 expiration_time=None,
52 expiration_time=None,
53 should_cache_fn=None,
53 should_cache_fn=None,
54 to_str=str,
54 to_str=str,
55 function_key_generator=None,
55 function_key_generator=None,
56 condition=True):
56 condition=True):
57 """
57 """
58 Custom conditional decorator, that will not touch any dogpile internals if
58 Custom conditional decorator, that will not touch any dogpile internals if
59 condition isn't meet. This works a bit different from should_cache_fn
59 condition isn't meet. This works a bit different from should_cache_fn
60 And it's faster in cases we don't ever want to compute cached values
60 And it's faster in cases we don't ever want to compute cached values
61 """
61 """
62 expiration_time_is_callable = callable(expiration_time)
62 expiration_time_is_callable = callable(expiration_time)
63 if not namespace:
63 if not namespace:
64 namespace = getattr(self, '_default_namespace', None)
64 namespace = getattr(self, '_default_namespace', None)
65
65
66 if function_key_generator is None:
66 if function_key_generator is None:
67 function_key_generator = self.function_key_generator
67 function_key_generator = self.function_key_generator
68
68
69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
70
70
71 if not condition:
71 if not condition:
72 log.debug('Calling un-cached method:%s', user_func.__name__)
72 log.debug('Calling un-cached method:%s', user_func.__name__)
73 start = time.time()
73 start = time.time()
74 result = user_func(*arg, **kw)
74 result = user_func(*arg, **kw)
75 total = time.time() - start
75 total = time.time() - start
76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
77 return result
77 return result
78
78
79 key = func_key_generator(*arg, **kw)
79 key = func_key_generator(*arg, **kw)
80
80
81 timeout = expiration_time() if expiration_time_is_callable \
81 timeout = expiration_time() if expiration_time_is_callable \
82 else expiration_time
82 else expiration_time
83
83
84 log.debug('Calling cached method:`%s`', user_func.__name__)
84 log.debug('Calling cached method:`%s`', user_func.__name__)
85 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
85 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
86
86
87 def cache_decorator(user_func):
87 def cache_decorator(user_func):
88 if to_str is str:
88 if to_str is str:
89 # backwards compatible
89 # backwards compatible
90 key_generator = function_key_generator(namespace, user_func)
90 key_generator = function_key_generator(namespace, user_func)
91 else:
91 else:
92 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
92 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
93
93
94 def refresh(*arg, **kw):
94 def refresh(*arg, **kw):
95 """
95 """
96 Like invalidate, but regenerates the value instead
96 Like invalidate, but regenerates the value instead
97 """
97 """
98 key = key_generator(*arg, **kw)
98 key = key_generator(*arg, **kw)
99 value = user_func(*arg, **kw)
99 value = user_func(*arg, **kw)
100 self.set(key, value)
100 self.set(key, value)
101 return value
101 return value
102
102
103 def invalidate(*arg, **kw):
103 def invalidate(*arg, **kw):
104 key = key_generator(*arg, **kw)
104 key = key_generator(*arg, **kw)
105 self.delete(key)
105 self.delete(key)
106
106
107 def set_(value, *arg, **kw):
107 def set_(value, *arg, **kw):
108 key = key_generator(*arg, **kw)
108 key = key_generator(*arg, **kw)
109 self.set(key, value)
109 self.set(key, value)
110
110
111 def get(*arg, **kw):
111 def get(*arg, **kw):
112 key = key_generator(*arg, **kw)
112 key = key_generator(*arg, **kw)
113 return self.get(key)
113 return self.get(key)
114
114
115 user_func.set = set_
115 user_func.set = set_
116 user_func.invalidate = invalidate
116 user_func.invalidate = invalidate
117 user_func.get = get
117 user_func.get = get
118 user_func.refresh = refresh
118 user_func.refresh = refresh
119 user_func.key_generator = key_generator
119 user_func.key_generator = key_generator
120 user_func.original = user_func
120 user_func.original = user_func
121
121
122 # Use `decorate` to preserve the signature of :param:`user_func`.
122 # Use `decorate` to preserve the signature of :param:`user_func`.
123 return decorator.decorate(user_func, functools.partial(
123 return decorator.decorate(user_func, functools.partial(
124 get_or_create_for_user_func, key_generator))
124 get_or_create_for_user_func, key_generator))
125
125
126 return cache_decorator
126 return cache_decorator
127
127
128
128
129 def make_region(*arg, **kw):
129 def make_region(*arg, **kw):
130 return RhodeCodeCacheRegion(*arg, **kw)
130 return RhodeCodeCacheRegion(*arg, **kw)
131
131
132
132
133 def get_default_cache_settings(settings, prefixes=None):
133 def get_default_cache_settings(settings, prefixes=None):
134 prefixes = prefixes or []
134 prefixes = prefixes or []
135 cache_settings = {}
135 cache_settings = {}
136 for key in settings.keys():
136 for key in settings.keys():
137 for prefix in prefixes:
137 for prefix in prefixes:
138 if key.startswith(prefix):
138 if key.startswith(prefix):
139 name = key.split(prefix)[1].strip()
139 name = key.split(prefix)[1].strip()
140 val = settings[key]
140 val = settings[key]
141 if isinstance(val, str):
141 if isinstance(val, str):
142 val = val.strip()
142 val = val.strip()
143 cache_settings[name] = val
143 cache_settings[name] = val
144 return cache_settings
144 return cache_settings
145
145
146
146
147 def compute_key_from_params(*args):
147 def compute_key_from_params(*args):
148 """
148 """
149 Helper to compute key from given params to be used in cache manager
149 Helper to compute key from given params to be used in cache manager
150 """
150 """
151 return sha1(safe_bytes("_".join(map(str, args))))
151 return sha1(safe_bytes("_".join(map(str, args))))
152
152
153
153
154 def custom_key_generator(backend, namespace, fn):
154 def custom_key_generator(backend, namespace, fn):
155 func_name = fn.__name__
155 func_name = fn.__name__
156
156
157 def generate_key(*args):
157 def generate_key(*args):
158 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
158 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
159 namespace_pref = namespace or 'default_namespace'
159 namespace_pref = namespace or 'default_namespace'
160 arg_key = compute_key_from_params(*args)
160 arg_key = compute_key_from_params(*args)
161 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
161 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
162
162
163 return final_key
163 return final_key
164
164
165 return generate_key
165 return generate_key
166
166
167
167
168 def backend_key_generator(backend):
168 def backend_key_generator(backend):
169 """
169 """
170 Special wrapper that also sends over the backend to the key generator
170 Special wrapper that also sends over the backend to the key generator
171 """
171 """
172 def wrapper(namespace, fn):
172 def wrapper(namespace, fn):
173 return custom_key_generator(backend, namespace, fn)
173 return custom_key_generator(backend, namespace, fn)
174 return wrapper
174 return wrapper
175
175
176
176
177 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
177 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
178 from .backends import FileNamespaceBackend
178 from .backends import FileNamespaceBackend
179 from . import async_creation_runner
179 from . import async_creation_runner
180
180
181 region_obj = region_meta.dogpile_cache_regions.get(region_name)
181 region_obj = region_meta.dogpile_cache_regions.get(region_name)
182 if not region_obj:
182 if not region_obj:
183 reg_keys = list(region_meta.dogpile_cache_regions.keys())
183 reg_keys = list(region_meta.dogpile_cache_regions.keys())
184 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
184 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
185
185
186 region_uid_name = f'{region_name}:{region_namespace}'
186 region_uid_name = f'{region_name}:{region_namespace}'
187
187
188 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
188 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
189 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
189 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
190 if not region_namespace:
190 if not region_namespace:
191 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
191 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
192
192
193 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
193 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
194 if region_exist:
194 if region_exist:
195 log.debug('Using already configured region: %s', region_namespace)
195 log.debug('Using already configured region: %s', region_namespace)
196 return region_exist
196 return region_exist
197
197
198 expiration_time = region_obj.expiration_time
198 expiration_time = region_obj.expiration_time
199
199
200 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
200 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
201 namespace_cache_dir = cache_dir
201 namespace_cache_dir = cache_dir
202
202
203 # we default the namespace_cache_dir to our default cache dir.
203 # we default the namespace_cache_dir to our default cache dir.
204 # however, if this backend is configured with filename= param, we prioritize that
204 # however, if this backend is configured with filename= param, we prioritize that
205 # so all caches within that particular region, even those namespaced end up in the same path
205 # so all caches within that particular region, even those namespaced end up in the same path
206 if region_obj.actual_backend.filename:
206 if region_obj.actual_backend.filename:
207 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
207 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
208
208
209 if not os.path.isdir(namespace_cache_dir):
209 if not os.path.isdir(namespace_cache_dir):
210 os.makedirs(namespace_cache_dir)
210 os.makedirs(namespace_cache_dir)
211 new_region = make_region(
211 new_region = make_region(
212 name=region_uid_name,
212 name=region_uid_name,
213 function_key_generator=backend_key_generator(region_obj.actual_backend)
213 function_key_generator=backend_key_generator(region_obj.actual_backend)
214 )
214 )
215
215
216 namespace_filename = os.path.join(
216 namespace_filename = os.path.join(
217 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
217 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
218 # special type that allows 1db per namespace
218 # special type that allows 1db per namespace
219 new_region.configure(
219 new_region.configure(
220 backend='dogpile.cache.rc.file_namespace',
220 backend='dogpile.cache.rc.file_namespace',
221 expiration_time=expiration_time,
221 expiration_time=expiration_time,
222 arguments={"filename": namespace_filename}
222 arguments={"filename": namespace_filename}
223 )
223 )
224
224
225 # create and save in region caches
225 # create and save in region caches
226 log.debug('configuring new region: %s', region_uid_name)
226 log.debug('configuring new region: %s', region_uid_name)
227 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
227 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
228
228
229 region_obj._default_namespace = region_namespace
229 region_obj._default_namespace = region_namespace
230 if use_async_runner:
230 if use_async_runner:
231 region_obj.async_creation_runner = async_creation_runner
231 region_obj.async_creation_runner = async_creation_runner
232 return region_obj
232 return region_obj
233
233
234
234
235 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
235 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
236 from . import CLEAR_DELETE, CLEAR_INVALIDATE
236 from . import CLEAR_DELETE, CLEAR_INVALIDATE
237
237
238 if not isinstance(cache_region, RhodeCodeCacheRegion):
238 if not isinstance(cache_region, RhodeCodeCacheRegion):
239 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
239 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
240 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
240 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
241 cache_region, cache_namespace_uid, method)
241 cache_region, cache_namespace_uid, method)
242
242
243 num_affected_keys = 0
243 num_affected_keys = 0
244
244
245 if method == CLEAR_INVALIDATE:
245 if method == CLEAR_INVALIDATE:
246 # NOTE: The CacheRegion.invalidate() method’s default mode of
246 # NOTE: The CacheRegion.invalidate() method’s default mode of
247 # operation is to set a timestamp local to this CacheRegion in this Python process only.
247 # operation is to set a timestamp local to this CacheRegion in this Python process only.
248 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
248 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
249 cache_region.invalidate(hard=True)
249 cache_region.invalidate(hard=True)
250
250
251 if method == CLEAR_DELETE:
251 if method == CLEAR_DELETE:
252 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
252 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
253 return num_affected_keys
253 return num_affected_keys
254
254
255
255
256 class ActiveRegionCache(object):
256 class ActiveRegionCache(object):
257 def __init__(self, context, cache_data: dict):
257 def __init__(self, context, cache_data: dict):
258 self.context = context
258 self.context = context
259 self.cache_data = cache_data
259 self.cache_data = cache_data
260
260
261 @property
261 @property
262 def state_uid(self) -> str:
262 def state_uid(self) -> str:
263 return self.cache_data['cache_state_uid']
263 return self.cache_data['cache_state_uid']
264
264
265
265
266 class InvalidationContext(object):
266 class InvalidationContext(object):
267 """
267 """
268 usage::
268 usage::
269
269
270 from rhodecode.lib import rc_cache
270 from rhodecode.lib import rc_cache
271
271
272 repo_namespace_key = 'some-cache-for-repo-id-100'
272 repo_namespace_key = 'some-cache-for-repo-id-100'
273 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
273 inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
274
274
275 def cache_generator(_state_uid):
275 def cache_generator(_state_uid):
276
276
277 @region.conditional_cache_on_arguments(namespace='some-common-namespace-100')
277 @region.conditional_cache_on_arguments(namespace='some-common-namespace-100')
278 def _dummy_func(*args):
278 def _dummy_func(*args):
279 # compute heavy function
279 # compute heavy function
280 return _state_uid, 'result'
280 return _state_uid, 'result'
281
281
282 return _dummy_func
282 return _dummy_func
283
283
284 with inv_context_manager as invalidation_context:
284 with inv_context_manager as invalidation_context:
285 cache_state_uid = invalidation_context.state_uid
285 cache_state_uid = invalidation_context.state_uid
286 cache_func = cache_generator(cache_state_uid)
286 cache_func = cache_generator(cache_state_uid)
287 previous_state_uid, result = cache_func(*call_args)
287 previous_state_uid, result = cache_func(*call_args)
288
288
289 should_invalidate = previous_state_uid != cache_state_uid
289 should_invalidate = previous_state_uid != cache_state_uid
290 if should_invalidate:
290 if should_invalidate:
291 _, result = cache_func.refresh(*call_args)
291 _, result = cache_func.refresh(*call_args)
292
292
293 # To send global invalidation signal, simply run
293 # To send global invalidation signal, simply run
294 CacheKey.set_invalidate(repo_namespace_key)
294 CacheKey.set_invalidate(repo_namespace_key)
295
295
296 """
296 """
297
297
298 def __repr__(self):
298 def __repr__(self):
299 return f'<InvalidationContext:{self.cache_key}>'
299 return f'<InvalidationContext:{self.cache_key}>'
300
300
301 def __init__(self, key, raise_exception=False, thread_scoped=None):
301 def __init__(self, key, raise_exception=False, thread_scoped=None):
302 self.cache_key = key
302 self.cache_key = key
303
303
304 self.raise_exception = raise_exception
304 self.raise_exception = raise_exception
305 self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT'
305 self.proc_id = rhodecode.ConfigGet().get_str('instance_id') or 'DEFAULT'
306 self.thread_id = 'global'
306 self.thread_id = 'global'
307
307
308 if thread_scoped is None:
308 if thread_scoped is None:
309 # if we set "default" we can override this via .ini settings
309 # if we set "default" we can override this via .ini settings
310 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
310 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
311
311
312 # Append the thread id to the cache key if this invalidation context
312 # Append the thread id to the cache key if this invalidation context
313 # should be scoped to the current thread.
313 # should be scoped to the current thread.
314 if thread_scoped is True:
314 if thread_scoped is True:
315 self.thread_id = threading.current_thread().ident
315 self.thread_id = threading.current_thread().ident
316
316
317 self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}'
317 self.proc_key = f'proc:{self.proc_id}|thread:{self.thread_id}|key:{self.cache_key}'
318 self.compute_time = 0
318 self.compute_time = 0
319
319
320 def get_or_create_cache_obj(self):
320 def get_or_create_cache_obj(self):
321 from rhodecode.model.db import CacheKey, Session, IntegrityError
321 from rhodecode.model.db import CacheKey, Session, IntegrityError
322
322
323 cache_obj = CacheKey.get_active_cache(self.cache_key)
323 cache_obj = CacheKey.get_active_cache(self.cache_key)
324 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
324 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
325
325
326 if not cache_obj:
326 if not cache_obj:
327 # generate new UID for non-existing cache object
327 # generate new UID for non-existing cache object
328 cache_state_uid = CacheKey.generate_new_state_uid()
328 cache_state_uid = CacheKey.generate_new_state_uid()
329 cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}',
329 cache_obj = CacheKey(self.cache_key, cache_args=f'repo_state:{self._start_time}',
330 cache_state_uid=cache_state_uid, cache_active=True)
330 cache_state_uid=cache_state_uid, cache_active=True)
331 try:
331 try:
332 Session().add(cache_obj)
332 Session().add(cache_obj)
333 Session().commit()
333 Session().commit()
334 except IntegrityError:
334 except IntegrityError:
335 # if we catch integrity error, it means we inserted this object
335 # if we catch integrity error, it means we inserted this object
336 # assumption is that's really an edge race-condition case and
336 # assumption is that's really an edge race-condition case and
337 # it's safe is to skip it
337 # it's safe is to skip it
338 Session().rollback()
338 Session().rollback()
339 except Exception:
339 except Exception:
340 log.exception('Failed to commit on cache key update')
340 log.exception('Failed to commit on cache key update')
341 Session().rollback()
341 Session().rollback()
342 if self.raise_exception:
342 if self.raise_exception:
343 raise
343 raise
344 return cache_obj
344 return cache_obj
345
345
346 def __enter__(self):
346 def __enter__(self):
347 log.debug('Entering cache invalidation check context: %s', self)
347 log.debug('Entering cache invalidation check context: %s', self)
348 self._start_time = time.time()
348 self._start_time = time.time()
349
349
350 self.cache_obj = self.get_or_create_cache_obj()
350 self.cache_obj = self.get_or_create_cache_obj()
351 cache_data = self.cache_obj.get_dict()
351 cache_data = self.cache_obj.get_dict()
352
352
353 return ActiveRegionCache(context=self, cache_data=cache_data)
353 return ActiveRegionCache(context=self, cache_data=cache_data)
354
354
355 def __exit__(self, exc_type, exc_val, exc_tb):
355 def __exit__(self, exc_type, exc_val, exc_tb):
356 # save compute time
356 # save compute time
357 self.compute_time = time.time() - self._start_time
357 self.compute_time = time.time() - self._start_time
General Comments 0
You need to be logged in to leave comments. Login now