##// END OF EJS Templates
fix(caches): synced rc_cache module with rhodecode-ce
super-admin -
r1206:cf300ad3 default
parent child Browse files
Show More
@@ -1,114 +1,114 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import logging
18 import logging
19 import threading
19 import threading
20
20
21 from dogpile.cache import register_backend
21 from dogpile.cache import register_backend
22
22
23 from . import region_meta
23 from . import region_meta
24 from .utils import (
24 from .utils import (
25 backend_key_generator,
25 backend_key_generator,
26 clear_cache_namespace,
26 clear_cache_namespace,
27 get_default_cache_settings,
27 get_default_cache_settings,
28 get_or_create_region,
28 get_or_create_region,
29 make_region,
29 make_region,
30 str2bool,
30 str2bool,
31 )
31 )
32
32
33 module_name = 'vcsserver'
33 module_name = 'vcsserver'
34
34
35 register_backend(
35 register_backend(
36 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
36 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
37 "LRUMemoryBackend")
37 "LRUMemoryBackend")
38
38
39 register_backend(
39 register_backend(
40 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
40 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
41 "FileNamespaceBackend")
41 "FileNamespaceBackend")
42
42
43 register_backend(
43 register_backend(
44 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
44 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
45 "RedisPickleBackend")
45 "RedisPickleBackend")
46
46
47 register_backend(
47 register_backend(
48 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
48 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
49 "RedisMsgPackBackend")
49 "RedisMsgPackBackend")
50
50
51
51
52 log = logging.getLogger(__name__)
52 log = logging.getLogger(__name__)
53
53
54
54
55 CACHE_OBJ_CACHE_VER = 'v2'
55 CACHE_OBJ_CACHE_VER = 'v2'
56
56
57 CLEAR_DELETE = 'delete'
57 CLEAR_DELETE = 'delete'
58 CLEAR_INVALIDATE = 'invalidate'
58 CLEAR_INVALIDATE = 'invalidate'
59
59
60
60
61 def async_creation_runner(cache, somekey, creator, mutex):
61 def async_creation_runner(cache, cache_key, creator, mutex):
62
62
63 def runner():
63 def runner():
64 try:
64 try:
65 value = creator()
65 value = creator()
66 cache.set(somekey, value)
66 cache.set(cache_key, value)
67 finally:
67 finally:
68 mutex.release()
68 mutex.release()
69
69
70 thread = threading.Thread(target=runner)
70 thread = threading.Thread(target=runner)
71 thread.start()
71 thread.start()
72
72
73
73
74 def configure_dogpile_cache(settings):
74 def configure_dogpile_cache(settings):
75 cache_dir = settings.get('cache_dir')
75 cache_dir = settings.get('cache_dir')
76 if cache_dir:
76 if cache_dir:
77 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
77 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
78
78
79 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
79 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
80
80
81 # inspect available namespaces
81 # inspect available namespaces
82 avail_regions = set()
82 avail_regions = set()
83 for key in rc_cache_data.keys():
83 for key in rc_cache_data.keys():
84 namespace_name = key.split('.', 1)[0]
84 namespace_name = key.split('.', 1)[0]
85 if namespace_name in avail_regions:
85 if namespace_name in avail_regions:
86 continue
86 continue
87
87
88 avail_regions.add(namespace_name)
88 avail_regions.add(namespace_name)
89 log.debug('dogpile: found following cache regions: %s', namespace_name)
89 log.debug('dogpile: found following cache regions: %s', namespace_name)
90
90
91 new_region = make_region(
91 new_region = make_region(
92 name=namespace_name,
92 name=namespace_name,
93 function_key_generator=None,
93 function_key_generator=None,
94 async_creation_runner=None
94 async_creation_runner=None
95 )
95 )
96
96
97 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
97 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
98 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
98 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
99
99
100 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
100 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
101 if async_creator:
101 if async_creator:
102 log.debug('configuring region %s with async creator', new_region)
102 log.debug('configuring region %s with async creator', new_region)
103 new_region.async_creation_runner = async_creation_runner
103 new_region.async_creation_runner = async_creation_runner
104
104
105 if log.isEnabledFor(logging.DEBUG):
105 if log.isEnabledFor(logging.DEBUG):
106 region_args = dict(backend=new_region.actual_backend,
106 region_args = dict(backend=new_region.actual_backend,
107 region_invalidator=new_region.region_invalidator.__class__)
107 region_invalidator=new_region.region_invalidator.__class__)
108 log.debug('dogpile: registering a new region key=`%s` args=%s', namespace_name, region_args)
108 log.debug('dogpile: registering a new region key=`%s` args=%s', namespace_name, region_args)
109
109
110 region_meta.dogpile_cache_regions[namespace_name] = new_region
110 region_meta.dogpile_cache_regions[namespace_name] = new_region
111
111
112
112
113 def includeme(config):
113 def includeme(config):
114 configure_dogpile_cache(config.registry.settings)
114 configure_dogpile_cache(config.registry.settings)
@@ -1,267 +1,303 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 #import errno
18 #import errno
19 import fcntl
19 import fcntl
20 import functools
20 import functools
21 import logging
21 import logging
22 import os
22 import os
23 import pickle
23 import pickle
24 #import time
24 #import time
25
25
26 #import gevent
26 #import gevent
27 import msgpack
27 import msgpack
28 import redis
28 import redis
29
29
30 flock_org = fcntl.flock
30 flock_org = fcntl.flock
31 from typing import Union
31 from typing import Union
32
32
33 from dogpile.cache.api import Deserializer, Serializer
33 from dogpile.cache.api import Deserializer, Serializer
34 from dogpile.cache.backends import file as file_backend
34 from dogpile.cache.backends import file as file_backend
35 from dogpile.cache.backends import memory as memory_backend
35 from dogpile.cache.backends import memory as memory_backend
36 from dogpile.cache.backends import redis as redis_backend
36 from dogpile.cache.backends import redis as redis_backend
37 from dogpile.cache.backends.file import FileLock
37 from dogpile.cache.backends.file import FileLock
38 from dogpile.cache.util import memoized_property
38 from dogpile.cache.util import memoized_property
39
39
40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from vcsserver.str_utils import safe_bytes, safe_str
41 from vcsserver.str_utils import safe_bytes, safe_str
42 from vcsserver.type_utils import str2bool
42 from vcsserver.type_utils import str2bool
43
43
44 _default_max_size = 1024
44 _default_max_size = 1024
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 class LRUMemoryBackend(memory_backend.MemoryBackend):
49 class LRUMemoryBackend(memory_backend.MemoryBackend):
50 key_prefix = 'lru_mem_backend'
50 key_prefix = 'lru_mem_backend'
51 pickle_values = False
51 pickle_values = False
52
52
53 def __init__(self, arguments):
53 def __init__(self, arguments):
54 self.max_size = arguments.pop('max_size', _default_max_size)
54 self.max_size = arguments.pop('max_size', _default_max_size)
55
55
56 LRUDictClass = LRUDict
56 LRUDictClass = LRUDict
57 if arguments.pop('log_key_count', None):
57 if arguments.pop('log_key_count', None):
58 LRUDictClass = LRUDictDebug
58 LRUDictClass = LRUDictDebug
59
59
60 arguments['cache_dict'] = LRUDictClass(self.max_size)
60 arguments['cache_dict'] = LRUDictClass(self.max_size)
61 super().__init__(arguments)
61 super().__init__(arguments)
62
62
63 def __repr__(self):
63 def __repr__(self):
64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
65
65
66 def __str__(self):
66 def __str__(self):
67 return self.__repr__()
67 return self.__repr__()
68
68
69 def delete(self, key):
69 def delete(self, key):
70 try:
70 try:
71 del self._cache[key]
71 del self._cache[key]
72 except KeyError:
72 except KeyError:
73 # we don't care if key isn't there at deletion
73 # we don't care if key isn't there at deletion
74 pass
74 pass
75
75
76 def list_keys(self, prefix):
77 return list(self._cache.keys())
78
76 def delete_multi(self, keys):
79 def delete_multi(self, keys):
77 for key in keys:
80 for key in keys:
78 self.delete(key)
81 self.delete(key)
79
82
83 def delete_multi_by_prefix(self, prefix):
84 cache_keys = self.list_keys(prefix=prefix)
85 num_affected_keys = len(cache_keys)
86 if num_affected_keys:
87 self.delete_multi(cache_keys)
88 return num_affected_keys
89
80
90
81 class PickleSerializer:
91 class PickleSerializer:
82 serializer: None | Serializer = staticmethod( # type: ignore
92 serializer: None | Serializer = staticmethod( # type: ignore
83 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
93 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
84 )
94 )
85 deserializer: None | Deserializer = staticmethod( # type: ignore
95 deserializer: None | Deserializer = staticmethod( # type: ignore
86 functools.partial(pickle.loads)
96 functools.partial(pickle.loads)
87 )
97 )
88
98
89
99
90 class MsgPackSerializer:
100 class MsgPackSerializer:
91 serializer: None | Serializer = staticmethod( # type: ignore
101 serializer: None | Serializer = staticmethod( # type: ignore
92 msgpack.packb
102 msgpack.packb
93 )
103 )
94 deserializer: None | Deserializer = staticmethod( # type: ignore
104 deserializer: None | Deserializer = staticmethod( # type: ignore
95 functools.partial(msgpack.unpackb, use_list=False)
105 functools.partial(msgpack.unpackb, use_list=False)
96 )
106 )
97
107
98
108
99 class CustomLockFactory(FileLock):
109 class CustomLockFactory(FileLock):
100
110
101 pass
111 pass
102
112
103
113
104 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
114 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
105 key_prefix = 'file_backend'
115 key_prefix = 'file_backend'
106
116
107 def __init__(self, arguments):
117 def __init__(self, arguments):
108 arguments['lock_factory'] = CustomLockFactory
118 arguments['lock_factory'] = CustomLockFactory
109 db_file = arguments.get('filename')
119 db_file = arguments.get('filename')
110
120
111 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
121 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
112 db_file_dir = os.path.dirname(db_file)
122 db_file_dir = os.path.dirname(db_file)
113 if not os.path.isdir(db_file_dir):
123 if not os.path.isdir(db_file_dir):
114 os.makedirs(db_file_dir)
124 os.makedirs(db_file_dir)
115
125
116 try:
126 try:
117 super().__init__(arguments)
127 super().__init__(arguments)
118 except Exception:
128 except Exception:
119 log.exception('Failed to initialize db at: %s', db_file)
129 log.exception('Failed to initialize db at: %s', db_file)
120 raise
130 raise
121
131
122 def __repr__(self):
132 def __repr__(self):
123 return f'{self.__class__}(file=`{self.filename}`)'
133 return f'{self.__class__}(file=`{self.filename}`)'
124
134
125 def __str__(self):
135 def __str__(self):
126 return self.__repr__()
136 return self.__repr__()
127
137
128 def _get_keys_pattern(self, prefix: bytes = b''):
138 def _get_keys_pattern(self, prefix: bytes = b''):
129 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
139 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
130
140
131 def list_keys(self, prefix: bytes = b''):
141 def list_keys(self, prefix: bytes = b''):
132 prefix = self._get_keys_pattern(prefix)
142 prefix = self._get_keys_pattern(prefix)
133
143
134 def cond(dbm_key: bytes):
144 def cond(dbm_key: bytes):
135 if not prefix:
145 if not prefix:
136 return True
146 return True
137
147
138 if dbm_key.startswith(prefix):
148 if dbm_key.startswith(prefix):
139 return True
149 return True
140 return False
150 return False
141
151
142 with self._dbm_file(True) as dbm:
152 with self._dbm_file(True) as dbm:
143 try:
153 try:
144 return list(filter(cond, dbm.keys()))
154 return list(filter(cond, dbm.keys()))
145 except Exception:
155 except Exception:
146 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
156 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
147 raise
157 raise
148
158
159 def delete_multi_by_prefix(self, prefix):
160 cache_keys = self.list_keys(prefix=prefix)
161 num_affected_keys = len(cache_keys)
162 if num_affected_keys:
163 self.delete_multi(cache_keys)
164 return num_affected_keys
165
149 def get_store(self):
166 def get_store(self):
150 return self.filename
167 return self.filename
151
168
152
169
153 class BaseRedisBackend(redis_backend.RedisBackend):
170 class BaseRedisBackend(redis_backend.RedisBackend):
154 key_prefix = ''
171 key_prefix = ''
155
172
156 def __init__(self, arguments):
173 def __init__(self, arguments):
157 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
174 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
158 super().__init__(arguments)
175 super().__init__(arguments)
159
176
160 self._lock_timeout = self.lock_timeout
177 self._lock_timeout = self.lock_timeout
161 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
178 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
162
179
163 if self._lock_auto_renewal and not self._lock_timeout:
180 if self._lock_auto_renewal and not self._lock_timeout:
164 # set default timeout for auto_renewal
181 # set default timeout for auto_renewal
165 self._lock_timeout = 30
182 self._lock_timeout = 30
166
183
167 def __repr__(self):
184 def __repr__(self):
168 return f'{self.__class__}(conn=`{self.db_conn}`)'
185 return f'{self.__class__}(conn=`{self.db_conn}`)'
169
186
170 def __str__(self):
187 def __str__(self):
171 return self.__repr__()
188 return self.__repr__()
172
189
173 def _create_client(self):
190 def _create_client(self):
174 args = {}
191 args = {}
175
192
176 if self.url is not None:
193 if self.url is not None:
177 args.update(url=self.url)
194 args.update(url=self.url)
178
195
179 else:
196 else:
180 args.update(
197 args.update(
181 host=self.host, password=self.password,
198 host=self.host, password=self.password,
182 port=self.port, db=self.db
199 port=self.port, db=self.db
183 )
200 )
184
201
185 connection_pool = redis.ConnectionPool(**args)
202 connection_pool = redis.ConnectionPool(**args)
186 self.writer_client = redis.StrictRedis(
203 self.writer_client = redis.StrictRedis(
187 connection_pool=connection_pool
204 connection_pool=connection_pool
188 )
205 )
189 self.reader_client = self.writer_client
206 self.reader_client = self.writer_client
190
207
191 def _get_keys_pattern(self, prefix: bytes = b''):
208 def _get_keys_pattern(self, prefix: bytes = b''):
192 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
209 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
193
210
194 def list_keys(self, prefix: bytes = b''):
211 def list_keys(self, prefix: bytes = b''):
195 prefix = self._get_keys_pattern(prefix)
212 prefix = self._get_keys_pattern(prefix)
196 return self.reader_client.keys(prefix)
213 return self.reader_client.keys(prefix)
197
214
215 def delete_multi_by_prefix(self, prefix, use_lua=False):
216 if use_lua:
217 # high efficient LUA script to delete ALL keys by prefix...
218 lua = """local keys = redis.call('keys', ARGV[1])
219 for i=1,#keys,5000 do
220 redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
221 end
222 return #keys"""
223 num_affected_keys = self.writer_client.eval(
224 lua,
225 0,
226 f"{prefix}*")
227 else:
228 cache_keys = self.list_keys(prefix=prefix)
229 num_affected_keys = len(cache_keys)
230 if num_affected_keys:
231 self.delete_multi(cache_keys)
232 return num_affected_keys
233
198 def get_store(self):
234 def get_store(self):
199 return self.reader_client.connection_pool
235 return self.reader_client.connection_pool
200
236
201 def get_mutex(self, key):
237 def get_mutex(self, key):
202 if self.distributed_lock:
238 if self.distributed_lock:
203 lock_key = f'_lock_{safe_str(key)}'
239 lock_key = f'_lock_{safe_str(key)}'
204 return get_mutex_lock(
240 return get_mutex_lock(
205 self.writer_client, lock_key,
241 self.writer_client, lock_key,
206 self._lock_timeout,
242 self._lock_timeout,
207 auto_renewal=self._lock_auto_renewal
243 auto_renewal=self._lock_auto_renewal
208 )
244 )
209 else:
245 else:
210 return None
246 return None
211
247
212
248
213 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
249 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
214 key_prefix = 'redis_pickle_backend'
250 key_prefix = 'redis_pickle_backend'
215 pass
251 pass
216
252
217
253
218 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
254 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
219 key_prefix = 'redis_msgpack_backend'
255 key_prefix = 'redis_msgpack_backend'
220 pass
256 pass
221
257
222
258
223 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
259 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
224 from vcsserver.lib._vendor import redis_lock
260 from vcsserver.lib._vendor import redis_lock
225
261
226 class _RedisLockWrapper:
262 class _RedisLockWrapper:
227 """LockWrapper for redis_lock"""
263 """LockWrapper for redis_lock"""
228
264
229 @classmethod
265 @classmethod
230 def get_lock(cls):
266 def get_lock(cls):
231 return redis_lock.Lock(
267 return redis_lock.Lock(
232 redis_client=client,
268 redis_client=client,
233 name=lock_key,
269 name=lock_key,
234 expire=lock_timeout,
270 expire=lock_timeout,
235 auto_renewal=auto_renewal,
271 auto_renewal=auto_renewal,
236 strict=True,
272 strict=True,
237 )
273 )
238
274
239 def __repr__(self):
275 def __repr__(self):
240 return f"{self.__class__.__name__}:{lock_key}"
276 return f"{self.__class__.__name__}:{lock_key}"
241
277
242 def __str__(self):
278 def __str__(self):
243 return f"{self.__class__.__name__}:{lock_key}"
279 return f"{self.__class__.__name__}:{lock_key}"
244
280
245 def __init__(self):
281 def __init__(self):
246 self.lock = self.get_lock()
282 self.lock = self.get_lock()
247 self.lock_key = lock_key
283 self.lock_key = lock_key
248
284
249 def acquire(self, wait=True):
285 def acquire(self, wait=True):
250 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
286 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
251 try:
287 try:
252 acquired = self.lock.acquire(wait)
288 acquired = self.lock.acquire(wait)
253 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
289 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
254 return acquired
290 return acquired
255 except redis_lock.AlreadyAcquired:
291 except redis_lock.AlreadyAcquired:
256 return False
292 return False
257 except redis_lock.AlreadyStarted:
293 except redis_lock.AlreadyStarted:
258 # refresh thread exists, but it also means we acquired the lock
294 # refresh thread exists, but it also means we acquired the lock
259 return True
295 return True
260
296
261 def release(self):
297 def release(self):
262 try:
298 try:
263 self.lock.release()
299 self.lock.release()
264 except redis_lock.NotAcquired:
300 except redis_lock.NotAcquired:
265 pass
301 pass
266
302
267 return _RedisLockWrapper()
303 return _RedisLockWrapper()
@@ -1,248 +1,245 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import functools
18 import functools
19 import logging
19 import logging
20 import os
20 import os
21 import threading
21 import threading
22 import time
22 import time
23
23
24 import decorator
24 import decorator
25 from dogpile.cache import CacheRegion
25 from dogpile.cache import CacheRegion
26
26
27
27
28 from vcsserver.utils import sha1
28 from vcsserver.utils import sha1
29 from vcsserver.str_utils import safe_bytes
29 from vcsserver.str_utils import safe_bytes
30 from vcsserver.type_utils import str2bool # noqa :required by imports from .utils
30 from vcsserver.type_utils import str2bool # noqa :required by imports from .utils
31
31
32 from . import region_meta
32 from . import region_meta
33
33
34 log = logging.getLogger(__name__)
34 log = logging.getLogger(__name__)
35
35
36
36
37 class RhodeCodeCacheRegion(CacheRegion):
37 class RhodeCodeCacheRegion(CacheRegion):
38
38
39 def __repr__(self):
39 def __repr__(self):
40 return f'{self.__class__}(name={self.name})'
40 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
41
41
42 def conditional_cache_on_arguments(
42 def conditional_cache_on_arguments(
43 self, namespace=None,
43 self, namespace=None,
44 expiration_time=None,
44 expiration_time=None,
45 should_cache_fn=None,
45 should_cache_fn=None,
46 to_str=str,
46 to_str=str,
47 function_key_generator=None,
47 function_key_generator=None,
48 condition=True):
48 condition=True):
49 """
49 """
50 Custom conditional decorator, that will not touch any dogpile internals if
50 Custom conditional decorator, that will not touch any dogpile internals if
51 condition isn't meet. This works a bit different from should_cache_fn
51 condition isn't meet. This works a bit different from should_cache_fn
52 And it's faster in cases we don't ever want to compute cached values
52 And it's faster in cases we don't ever want to compute cached values
53 """
53 """
54 expiration_time_is_callable = callable(expiration_time)
54 expiration_time_is_callable = callable(expiration_time)
55 if not namespace:
55 if not namespace:
56 namespace = getattr(self, '_default_namespace', None)
56 namespace = getattr(self, '_default_namespace', None)
57
57
58 if function_key_generator is None:
58 if function_key_generator is None:
59 function_key_generator = self.function_key_generator
59 function_key_generator = self.function_key_generator
60
60
61 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
61 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
62
62
63 if not condition:
63 if not condition:
64 log.debug('Calling un-cached method:%s', user_func.__name__)
64 log.debug('Calling un-cached method:%s', user_func.__name__)
65 start = time.time()
65 start = time.time()
66 result = user_func(*arg, **kw)
66 result = user_func(*arg, **kw)
67 total = time.time() - start
67 total = time.time() - start
68 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
68 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
69 return result
69 return result
70
70
71 key = func_key_generator(*arg, **kw)
71 key = func_key_generator(*arg, **kw)
72
72
73 timeout = expiration_time() if expiration_time_is_callable \
73 timeout = expiration_time() if expiration_time_is_callable \
74 else expiration_time
74 else expiration_time
75
75
76 log.debug('Calling cached method:`%s`', user_func.__name__)
76 log.debug('Calling cached method:`%s`', user_func.__name__)
77 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
77 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
78
78
79 def cache_decorator(user_func):
79 def cache_decorator(user_func):
80 if to_str is str:
80 if to_str is str:
81 # backwards compatible
81 # backwards compatible
82 key_generator = function_key_generator(namespace, user_func)
82 key_generator = function_key_generator(namespace, user_func)
83 else:
83 else:
84 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
84 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
85
85
86 def refresh(*arg, **kw):
86 def refresh(*arg, **kw):
87 """
87 """
88 Like invalidate, but regenerates the value instead
88 Like invalidate, but regenerates the value instead
89 """
89 """
90 key = key_generator(*arg, **kw)
90 key = key_generator(*arg, **kw)
91 value = user_func(*arg, **kw)
91 value = user_func(*arg, **kw)
92 self.set(key, value)
92 self.set(key, value)
93 return value
93 return value
94
94
95 def invalidate(*arg, **kw):
95 def invalidate(*arg, **kw):
96 key = key_generator(*arg, **kw)
96 key = key_generator(*arg, **kw)
97 self.delete(key)
97 self.delete(key)
98
98
99 def set_(value, *arg, **kw):
99 def set_(value, *arg, **kw):
100 key = key_generator(*arg, **kw)
100 key = key_generator(*arg, **kw)
101 self.set(key, value)
101 self.set(key, value)
102
102
103 def get(*arg, **kw):
103 def get(*arg, **kw):
104 key = key_generator(*arg, **kw)
104 key = key_generator(*arg, **kw)
105 return self.get(key)
105 return self.get(key)
106
106
107 user_func.set = set_
107 user_func.set = set_
108 user_func.invalidate = invalidate
108 user_func.invalidate = invalidate
109 user_func.get = get
109 user_func.get = get
110 user_func.refresh = refresh
110 user_func.refresh = refresh
111 user_func.key_generator = key_generator
111 user_func.key_generator = key_generator
112 user_func.original = user_func
112 user_func.original = user_func
113
113
114 # Use `decorate` to preserve the signature of :param:`user_func`.
114 # Use `decorate` to preserve the signature of :param:`user_func`.
115 return decorator.decorate(user_func, functools.partial(
115 return decorator.decorate(user_func, functools.partial(
116 get_or_create_for_user_func, key_generator))
116 get_or_create_for_user_func, key_generator))
117
117
118 return cache_decorator
118 return cache_decorator
119
119
120
120
121 def make_region(*arg, **kw):
121 def make_region(*arg, **kw):
122 return RhodeCodeCacheRegion(*arg, **kw)
122 return RhodeCodeCacheRegion(*arg, **kw)
123
123
124
124
125 def get_default_cache_settings(settings, prefixes=None):
125 def get_default_cache_settings(settings, prefixes=None):
126 prefixes = prefixes or []
126 prefixes = prefixes or []
127 cache_settings = {}
127 cache_settings = {}
128 for key in settings.keys():
128 for key in settings.keys():
129 for prefix in prefixes:
129 for prefix in prefixes:
130 if key.startswith(prefix):
130 if key.startswith(prefix):
131 name = key.split(prefix)[1].strip()
131 name = key.split(prefix)[1].strip()
132 val = settings[key]
132 val = settings[key]
133 if isinstance(val, str):
133 if isinstance(val, str):
134 val = val.strip()
134 val = val.strip()
135 cache_settings[name] = val
135 cache_settings[name] = val
136 return cache_settings
136 return cache_settings
137
137
138
138
139 def compute_key_from_params(*args):
139 def compute_key_from_params(*args):
140 """
140 """
141 Helper to compute key from given params to be used in cache manager
141 Helper to compute key from given params to be used in cache manager
142 """
142 """
143 return sha1(safe_bytes("_".join(map(str, args))))
143 return sha1(safe_bytes("_".join(map(str, args))))
144
144
145
145
146 def custom_key_generator(backend, namespace, fn):
146 def custom_key_generator(backend, namespace, fn):
147 func_name = fn.__name__
147 func_name = fn.__name__
148
148
149 def generate_key(*args):
149 def generate_key(*args):
150 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
150 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 namespace_pref = namespace or 'default_namespace'
151 namespace_pref = namespace or 'default_namespace'
152 arg_key = compute_key_from_params(*args)
152 arg_key = compute_key_from_params(*args)
153 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
153 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
154
154
155 return final_key
155 return final_key
156
156
157 return generate_key
157 return generate_key
158
158
159
159
160 def backend_key_generator(backend):
160 def backend_key_generator(backend):
161 """
161 """
162 Special wrapper that also sends over the backend to the key generator
162 Special wrapper that also sends over the backend to the key generator
163 """
163 """
164 def wrapper(namespace, fn):
164 def wrapper(namespace, fn):
165 return custom_key_generator(backend, namespace, fn)
165 return custom_key_generator(backend, namespace, fn)
166 return wrapper
166 return wrapper
167
167
168
168
169 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
169 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
170 from .backends import FileNamespaceBackend
170 from .backends import FileNamespaceBackend
171 from . import async_creation_runner
171 from . import async_creation_runner
172
172
173 region_obj = region_meta.dogpile_cache_regions.get(region_name)
173 region_obj = region_meta.dogpile_cache_regions.get(region_name)
174 if not region_obj:
174 if not region_obj:
175 reg_keys = list(region_meta.dogpile_cache_regions.keys())
175 reg_keys = list(region_meta.dogpile_cache_regions.keys())
176 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
176 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
177
177
178 region_uid_name = f'{region_name}:{region_namespace}'
178 region_uid_name = f'{region_name}:{region_namespace}'
179
179
180 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
180 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
181 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
181 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
182 if not region_namespace:
182 if not region_namespace:
183 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
183 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
184
184
185 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
185 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
186 if region_exist:
186 if region_exist:
187 log.debug('Using already configured region: %s', region_namespace)
187 log.debug('Using already configured region: %s', region_namespace)
188 return region_exist
188 return region_exist
189
189
190 expiration_time = region_obj.expiration_time
190 expiration_time = region_obj.expiration_time
191
191
192 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
192 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
193 namespace_cache_dir = cache_dir
193 namespace_cache_dir = cache_dir
194
194
195 # we default the namespace_cache_dir to our default cache dir.
195 # we default the namespace_cache_dir to our default cache dir.
196 # however, if this backend is configured with filename= param, we prioritize that
196 # however, if this backend is configured with filename= param, we prioritize that
197 # so all caches within that particular region, even those namespaced end up in the same path
197 # so all caches within that particular region, even those namespaced end up in the same path
198 if region_obj.actual_backend.filename:
198 if region_obj.actual_backend.filename:
199 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
199 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
200
200
201 if not os.path.isdir(namespace_cache_dir):
201 if not os.path.isdir(namespace_cache_dir):
202 os.makedirs(namespace_cache_dir)
202 os.makedirs(namespace_cache_dir)
203 new_region = make_region(
203 new_region = make_region(
204 name=region_uid_name,
204 name=region_uid_name,
205 function_key_generator=backend_key_generator(region_obj.actual_backend)
205 function_key_generator=backend_key_generator(region_obj.actual_backend)
206 )
206 )
207
207
208 namespace_filename = os.path.join(
208 namespace_filename = os.path.join(
209 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
209 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
210 # special type that allows 1db per namespace
210 # special type that allows 1db per namespace
211 new_region.configure(
211 new_region.configure(
212 backend='dogpile.cache.rc.file_namespace',
212 backend='dogpile.cache.rc.file_namespace',
213 expiration_time=expiration_time,
213 expiration_time=expiration_time,
214 arguments={"filename": namespace_filename}
214 arguments={"filename": namespace_filename}
215 )
215 )
216
216
217 # create and save in region caches
217 # create and save in region caches
218 log.debug('configuring new region: %s', region_uid_name)
218 log.debug('configuring new region: %s', region_uid_name)
219 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
219 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
220
220
221 region_obj._default_namespace = region_namespace
221 region_obj._default_namespace = region_namespace
222 if use_async_runner:
222 if use_async_runner:
223 region_obj.async_creation_runner = async_creation_runner
223 region_obj.async_creation_runner = async_creation_runner
224 return region_obj
224 return region_obj
225
225
226
226
227 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str):
227 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
228 from . import CLEAR_DELETE, CLEAR_INVALIDATE
228 from . import CLEAR_DELETE, CLEAR_INVALIDATE
229
229
230 if not isinstance(cache_region, RhodeCodeCacheRegion):
230 if not isinstance(cache_region, RhodeCodeCacheRegion):
231 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
231 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
232 log.debug('clearing cache region: %s with method=%s', cache_region, method)
232 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
233 cache_region, cache_namespace_uid, method)
233
234
234 num_affected_keys = None
235 num_affected_keys = 0
235
236
236 if method == CLEAR_INVALIDATE:
237 if method == CLEAR_INVALIDATE:
237 # NOTE: The CacheRegion.invalidate() method’s default mode of
238 # NOTE: The CacheRegion.invalidate() method’s default mode of
238 # operation is to set a timestamp local to this CacheRegion in this Python process only.
239 # operation is to set a timestamp local to this CacheRegion in this Python process only.
239 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
240 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
240 cache_region.invalidate(hard=True)
241 cache_region.invalidate(hard=True)
241
242
242 if method == CLEAR_DELETE:
243 if method == CLEAR_DELETE:
243 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
244 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
244 num_affected_keys = len(cache_keys)
245 if num_affected_keys:
246 cache_region.delete_multi(cache_keys)
247
248 return num_affected_keys
245 return num_affected_keys
General Comments 0
You need to be logged in to leave comments. Login now