##// END OF EJS Templates
chore(caches): cleanup rc_cache and implement cleanup_backend for filesystem
super-admin -
r1275:ed405288 default
parent child Browse files
Show More
@@ -0,0 +1,53 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 import hashlib
19 from vcsserver.lib.str_utils import safe_bytes, safe_str
20
21
22 def md5(s):
23 return hashlib.md5(s).hexdigest()
24
25
26 def md5_safe(s, return_type=''):
27
28 val = md5(safe_bytes(s))
29 if return_type == 'str':
30 val = safe_str(val)
31 return val
32
33
34 def sha1(s):
35 return hashlib.sha1(s).hexdigest()
36
37
38 def sha1_safe(s, return_type=''):
39 val = sha1(safe_bytes(s))
40 if return_type == 'str':
41 val = safe_str(val)
42 return val
43
44
45 def sha256(s):
46 return hashlib.sha256(s).hexdigest()
47
48
49 def sha256_safe(s, return_type=''):
50 val = sha256(safe_bytes(s))
51 if return_type == 'str':
52 val = safe_str(val)
53 return val
@@ -1,303 +1,310 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 #import errno
19 19 import fcntl
20 20 import functools
21 21 import logging
22 22 import os
23 23 import pickle
24 24 #import time
25 25
26 26 #import gevent
27 27 import msgpack
28 28 import redis
29 29
30 30 flock_org = fcntl.flock
31 31 from typing import Union
32 32
33 33 from dogpile.cache.api import Deserializer, Serializer
34 34 from dogpile.cache.backends import file as file_backend
35 35 from dogpile.cache.backends import memory as memory_backend
36 36 from dogpile.cache.backends import redis as redis_backend
37 37 from dogpile.cache.backends.file import FileLock
38 38 from dogpile.cache.util import memoized_property
39 39
40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from vcsserver.lib.str_utils import safe_bytes, safe_str
42 from vcsserver.lib.type_utils import str2bool
40 from ...lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from ...lib.str_utils import safe_bytes, safe_str
42 from ...lib.type_utils import str2bool
43 43
44 44 _default_max_size = 1024
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 class LRUMemoryBackend(memory_backend.MemoryBackend):
50 50 key_prefix = 'lru_mem_backend'
51 51 pickle_values = False
52 52
53 53 def __init__(self, arguments):
54 54 self.max_size = arguments.pop('max_size', _default_max_size)
55 55
56 56 LRUDictClass = LRUDict
57 57 if arguments.pop('log_key_count', None):
58 58 LRUDictClass = LRUDictDebug
59 59
60 60 arguments['cache_dict'] = LRUDictClass(self.max_size)
61 61 super().__init__(arguments)
62 62
63 63 def __repr__(self):
64 64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
65 65
66 66 def __str__(self):
67 67 return self.__repr__()
68 68
69 69 def delete(self, key):
70 70 try:
71 71 del self._cache[key]
72 72 except KeyError:
73 73 # we don't care if key isn't there at deletion
74 74 pass
75 75
76 76 def list_keys(self, prefix):
77 77 return list(self._cache.keys())
78 78
79 79 def delete_multi(self, keys):
80 80 for key in keys:
81 81 self.delete(key)
82 82
83 83 def delete_multi_by_prefix(self, prefix):
84 84 cache_keys = self.list_keys(prefix=prefix)
85 85 num_affected_keys = len(cache_keys)
86 86 if num_affected_keys:
87 87 self.delete_multi(cache_keys)
88 88 return num_affected_keys
89 89
90 90
91 91 class PickleSerializer:
92 92 serializer: None | Serializer = staticmethod( # type: ignore
93 93 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
94 94 )
95 95 deserializer: None | Deserializer = staticmethod( # type: ignore
96 96 functools.partial(pickle.loads)
97 97 )
98 98
99 99
100 100 class MsgPackSerializer:
101 101 serializer: None | Serializer = staticmethod( # type: ignore
102 102 msgpack.packb
103 103 )
104 104 deserializer: None | Deserializer = staticmethod( # type: ignore
105 105 functools.partial(msgpack.unpackb, use_list=False)
106 106 )
107 107
108 108
109 109 class CustomLockFactory(FileLock):
110 110
111 111 pass
112 112
113 113
114 114 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
115 115 key_prefix = 'file_backend'
116 116
117 117 def __init__(self, arguments):
118 118 arguments['lock_factory'] = CustomLockFactory
119 119 db_file = arguments.get('filename')
120 120
121 121 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
122 122 db_file_dir = os.path.dirname(db_file)
123 123 if not os.path.isdir(db_file_dir):
124 124 os.makedirs(db_file_dir)
125 125
126 126 try:
127 127 super().__init__(arguments)
128 128 except Exception:
129 129 log.exception('Failed to initialize db at: %s', db_file)
130 130 raise
131 131
132 132 def __repr__(self):
133 133 return f'{self.__class__}(file=`{self.filename}`)'
134 134
135 135 def __str__(self):
136 136 return self.__repr__()
137 137
138 138 def _get_keys_pattern(self, prefix: bytes = b''):
139 139 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
140 140
141 141 def list_keys(self, prefix: bytes = b''):
142 142 prefix = self._get_keys_pattern(prefix)
143 143
144 144 def cond(dbm_key: bytes):
145 145 if not prefix:
146 146 return True
147 147
148 148 if dbm_key.startswith(prefix):
149 149 return True
150 150 return False
151 151
152 152 with self._dbm_file(True) as dbm:
153 153 try:
154 154 return list(filter(cond, dbm.keys()))
155 155 except Exception:
156 156 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
157 157 raise
158 158
159 159 def delete_multi_by_prefix(self, prefix):
160 160 cache_keys = self.list_keys(prefix=prefix)
161 161 num_affected_keys = len(cache_keys)
162 162 if num_affected_keys:
163 163 self.delete_multi(cache_keys)
164 164 return num_affected_keys
165 165
166 166 def get_store(self):
167 167 return self.filename
168 168
169 def cleanup_store(self):
170 for ext in ("db", "dat", "pag", "dir"):
171 final_filename = self.filename + os.extsep + ext
172 if os.path.exists(final_filename):
173 os.remove(final_filename)
174 log.warning('Removed dbm file %s', final_filename)
175
169 176
170 177 class BaseRedisBackend(redis_backend.RedisBackend):
171 178 key_prefix = ''
172 179
173 180 def __init__(self, arguments):
174 181 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
175 182 super().__init__(arguments)
176 183
177 184 self._lock_timeout = self.lock_timeout
178 185 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
179 186
180 187 if self._lock_auto_renewal and not self._lock_timeout:
181 188 # set default timeout for auto_renewal
182 189 self._lock_timeout = 30
183 190
184 191 def __repr__(self):
185 192 return f'{self.__class__}(conn=`{self.db_conn}`)'
186 193
187 194 def __str__(self):
188 195 return self.__repr__()
189 196
190 197 def _create_client(self):
191 198 args = {}
192 199
193 200 if self.url is not None:
194 201 args.update(url=self.url)
195 202
196 203 else:
197 204 args.update(
198 205 host=self.host, password=self.password,
199 206 port=self.port, db=self.db
200 207 )
201 208
202 209 connection_pool = redis.ConnectionPool(**args)
203 210 self.writer_client = redis.StrictRedis(
204 211 connection_pool=connection_pool
205 212 )
206 213 self.reader_client = self.writer_client
207 214
208 215 def _get_keys_pattern(self, prefix: bytes = b''):
209 216 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
210 217
211 218 def list_keys(self, prefix: bytes = b''):
212 219 prefix = self._get_keys_pattern(prefix)
213 220 return self.reader_client.keys(prefix)
214 221
215 222 def delete_multi_by_prefix(self, prefix, use_lua=False):
216 223 if use_lua:
217 224 # high efficient LUA script to delete ALL keys by prefix...
218 225 lua = """local keys = redis.call('keys', ARGV[1])
219 226 for i=1,#keys,5000 do
220 227 redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
221 228 end
222 229 return #keys"""
223 230 num_affected_keys = self.writer_client.eval(
224 231 lua,
225 232 0,
226 233 f"{prefix}*")
227 234 else:
228 235 cache_keys = self.list_keys(prefix=prefix)
229 236 num_affected_keys = len(cache_keys)
230 237 if num_affected_keys:
231 238 self.delete_multi(cache_keys)
232 239 return num_affected_keys
233 240
234 241 def get_store(self):
235 242 return self.reader_client.connection_pool
236 243
237 244 def get_mutex(self, key):
238 245 if self.distributed_lock:
239 246 lock_key = f'_lock_{safe_str(key)}'
240 247 return get_mutex_lock(
241 248 self.writer_client, lock_key,
242 249 self._lock_timeout,
243 250 auto_renewal=self._lock_auto_renewal
244 251 )
245 252 else:
246 253 return None
247 254
248 255
249 256 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
250 257 key_prefix = 'redis_pickle_backend'
251 258 pass
252 259
253 260
254 261 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
255 262 key_prefix = 'redis_msgpack_backend'
256 263 pass
257 264
258 265
259 266 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
260 from vcsserver.lib._vendor import redis_lock
267 from ...lib._vendor import redis_lock
261 268
262 269 class _RedisLockWrapper:
263 270 """LockWrapper for redis_lock"""
264 271
265 272 @classmethod
266 273 def get_lock(cls):
267 274 return redis_lock.Lock(
268 275 redis_client=client,
269 276 name=lock_key,
270 277 expire=lock_timeout,
271 278 auto_renewal=auto_renewal,
272 279 strict=True,
273 280 )
274 281
275 282 def __repr__(self):
276 283 return f"{self.__class__.__name__}:{lock_key}"
277 284
278 285 def __str__(self):
279 286 return f"{self.__class__.__name__}:{lock_key}"
280 287
281 288 def __init__(self):
282 289 self.lock = self.get_lock()
283 290 self.lock_key = lock_key
284 291
285 292 def acquire(self, wait=True):
286 293 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
287 294 try:
288 295 acquired = self.lock.acquire(wait)
289 296 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
290 297 return acquired
291 298 except redis_lock.AlreadyAcquired:
292 299 return False
293 300 except redis_lock.AlreadyStarted:
294 301 # refresh thread exists, but it also means we acquired the lock
295 302 return True
296 303
297 304 def release(self):
298 305 try:
299 306 self.lock.release()
300 307 except redis_lock.NotAcquired:
301 308 pass
302 309
303 310 return _RedisLockWrapper()
@@ -1,245 +1,245 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import functools
19 19 import logging
20 20 import os
21 21 import threading
22 22 import time
23 23
24 24 import decorator
25 25 from dogpile.cache import CacheRegion
26 26
27 27
28 from vcsserver.utils import sha1
29 from vcsserver.lib.str_utils import safe_bytes
30 from vcsserver.lib.type_utils import str2bool # noqa :required by imports from .utils
28 from ...lib.hash_utils import sha1
29 from ...lib.str_utils import safe_bytes
30 from ...lib.type_utils import str2bool # noqa :required by imports from .utils
31 31
32 32 from . import region_meta
33 33
34 34 log = logging.getLogger(__name__)
35 35
36 36
37 37 class RhodeCodeCacheRegion(CacheRegion):
38 38
39 39 def __repr__(self):
40 40 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
41 41
42 42 def conditional_cache_on_arguments(
43 43 self, namespace=None,
44 44 expiration_time=None,
45 45 should_cache_fn=None,
46 46 to_str=str,
47 47 function_key_generator=None,
48 48 condition=True):
49 49 """
50 50 Custom conditional decorator, that will not touch any dogpile internals if
51 51 condition isn't meet. This works a bit different from should_cache_fn
52 52 And it's faster in cases we don't ever want to compute cached values
53 53 """
54 54 expiration_time_is_callable = callable(expiration_time)
55 55 if not namespace:
56 56 namespace = getattr(self, '_default_namespace', None)
57 57
58 58 if function_key_generator is None:
59 59 function_key_generator = self.function_key_generator
60 60
61 61 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
62 62
63 63 if not condition:
64 64 log.debug('Calling un-cached method:%s', user_func.__name__)
65 65 start = time.time()
66 66 result = user_func(*arg, **kw)
67 67 total = time.time() - start
68 68 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
69 69 return result
70 70
71 71 key = func_key_generator(*arg, **kw)
72 72
73 73 timeout = expiration_time() if expiration_time_is_callable \
74 74 else expiration_time
75 75
76 76 log.debug('Calling cached method:`%s`', user_func.__name__)
77 77 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
78 78
79 79 def cache_decorator(user_func):
80 80 if to_str is str:
81 81 # backwards compatible
82 82 key_generator = function_key_generator(namespace, user_func)
83 83 else:
84 84 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
85 85
86 86 def refresh(*arg, **kw):
87 87 """
88 88 Like invalidate, but regenerates the value instead
89 89 """
90 90 key = key_generator(*arg, **kw)
91 91 value = user_func(*arg, **kw)
92 92 self.set(key, value)
93 93 return value
94 94
95 95 def invalidate(*arg, **kw):
96 96 key = key_generator(*arg, **kw)
97 97 self.delete(key)
98 98
99 99 def set_(value, *arg, **kw):
100 100 key = key_generator(*arg, **kw)
101 101 self.set(key, value)
102 102
103 103 def get(*arg, **kw):
104 104 key = key_generator(*arg, **kw)
105 105 return self.get(key)
106 106
107 107 user_func.set = set_
108 108 user_func.invalidate = invalidate
109 109 user_func.get = get
110 110 user_func.refresh = refresh
111 111 user_func.key_generator = key_generator
112 112 user_func.original = user_func
113 113
114 114 # Use `decorate` to preserve the signature of :param:`user_func`.
115 115 return decorator.decorate(user_func, functools.partial(
116 116 get_or_create_for_user_func, key_generator))
117 117
118 118 return cache_decorator
119 119
120 120
121 121 def make_region(*arg, **kw):
122 122 return RhodeCodeCacheRegion(*arg, **kw)
123 123
124 124
125 125 def get_default_cache_settings(settings, prefixes=None):
126 126 prefixes = prefixes or []
127 127 cache_settings = {}
128 128 for key in settings.keys():
129 129 for prefix in prefixes:
130 130 if key.startswith(prefix):
131 131 name = key.split(prefix)[1].strip()
132 132 val = settings[key]
133 133 if isinstance(val, str):
134 134 val = val.strip()
135 135 cache_settings[name] = val
136 136 return cache_settings
137 137
138 138
139 139 def compute_key_from_params(*args):
140 140 """
141 141 Helper to compute key from given params to be used in cache manager
142 142 """
143 143 return sha1(safe_bytes("_".join(map(str, args))))
144 144
145 145
146 146 def custom_key_generator(backend, namespace, fn):
147 147 func_name = fn.__name__
148 148
149 149 def generate_key(*args):
150 150 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 151 namespace_pref = namespace or 'default_namespace'
152 152 arg_key = compute_key_from_params(*args)
153 153 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
154 154
155 155 return final_key
156 156
157 157 return generate_key
158 158
159 159
160 160 def backend_key_generator(backend):
161 161 """
162 162 Special wrapper that also sends over the backend to the key generator
163 163 """
164 164 def wrapper(namespace, fn):
165 165 return custom_key_generator(backend, namespace, fn)
166 166 return wrapper
167 167
168 168
169 169 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
170 170 from .backends import FileNamespaceBackend
171 171 from . import async_creation_runner
172 172
173 173 region_obj = region_meta.dogpile_cache_regions.get(region_name)
174 174 if not region_obj:
175 175 reg_keys = list(region_meta.dogpile_cache_regions.keys())
176 176 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
177 177
178 178 region_uid_name = f'{region_name}:{region_namespace}'
179 179
180 180 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
181 181 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
182 182 if not region_namespace:
183 183 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
184 184
185 185 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
186 186 if region_exist:
187 187 log.debug('Using already configured region: %s', region_namespace)
188 188 return region_exist
189 189
190 190 expiration_time = region_obj.expiration_time
191 191
192 192 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
193 193 namespace_cache_dir = cache_dir
194 194
195 195 # we default the namespace_cache_dir to our default cache dir.
196 196 # however, if this backend is configured with filename= param, we prioritize that
197 197 # so all caches within that particular region, even those namespaced end up in the same path
198 198 if region_obj.actual_backend.filename:
199 199 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
200 200
201 201 if not os.path.isdir(namespace_cache_dir):
202 202 os.makedirs(namespace_cache_dir)
203 203 new_region = make_region(
204 204 name=region_uid_name,
205 205 function_key_generator=backend_key_generator(region_obj.actual_backend)
206 206 )
207 207
208 208 namespace_filename = os.path.join(
209 209 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
210 210 # special type that allows 1db per namespace
211 211 new_region.configure(
212 212 backend='dogpile.cache.rc.file_namespace',
213 213 expiration_time=expiration_time,
214 214 arguments={"filename": namespace_filename}
215 215 )
216 216
217 217 # create and save in region caches
218 218 log.debug('configuring new region: %s', region_uid_name)
219 219 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
220 220
221 221 region_obj._default_namespace = region_namespace
222 222 if use_async_runner:
223 223 region_obj.async_creation_runner = async_creation_runner
224 224 return region_obj
225 225
226 226
227 227 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
228 228 from . import CLEAR_DELETE, CLEAR_INVALIDATE
229 229
230 230 if not isinstance(cache_region, RhodeCodeCacheRegion):
231 231 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
232 232 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
233 233 cache_region, cache_namespace_uid, method)
234 234
235 235 num_affected_keys = 0
236 236
237 237 if method == CLEAR_INVALIDATE:
238 238 # NOTE: The CacheRegion.invalidate() method’s default mode of
239 239 # operation is to set a timestamp local to this CacheRegion in this Python process only.
240 240 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
241 241 cache_region.invalidate(hard=True)
242 242
243 243 if method == CLEAR_DELETE:
244 244 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
245 245 return num_affected_keys
General Comments 0
You need to be logged in to leave comments. Login now