##// END OF EJS Templates
caches: use of global cache prefixes so we can keep compatability when switching from OLD rc to new python3 based
super-admin -
r1135:7b7ca856 default
parent child Browse files
Show More
@@ -1,195 +1,194 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17 import os
18 18 import sys
19 19 import tempfile
20 20 import traceback
21 21 import logging
22 22 import urllib.parse
23 23
24 24 from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
25 from vcsserver.lib.rc_cache import region_meta
26 25
27 26 from vcsserver import exceptions
28 27 from vcsserver.exceptions import NoContentException
29 28 from vcsserver.hgcompat import archival
30 29 from vcsserver.str_utils import safe_bytes
31 30
32 31 log = logging.getLogger(__name__)
33 32
34 33
35 34 class RepoFactory(object):
36 35 """
37 36 Utility to create instances of repository
38 37
39 38 It provides internal caching of the `repo` object based on
40 39 the :term:`call context`.
41 40 """
42 41 repo_type = None
43 42
44 43 def __init__(self):
45 self._cache_region = region_meta.dogpile_cache_regions['repo_object']
44 pass
46 45
47 46 def _create_config(self, path, config):
48 47 config = {}
49 48 return config
50 49
51 50 def _create_repo(self, wire, create):
52 51 raise NotImplementedError()
53 52
54 53 def repo(self, wire, create=False):
55 54 raise NotImplementedError()
56 55
57 56
58 57 def obfuscate_qs(query_string):
59 58 if query_string is None:
60 59 return None
61 60
62 61 parsed = []
63 62 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
64 63 if k in ['auth_token', 'api_key']:
65 64 v = "*****"
66 65 parsed.append((k, v))
67 66
68 67 return '&'.join('{}{}'.format(
69 68 k, f'={v}' if v else '') for k, v in parsed)
70 69
71 70
72 71 def raise_from_original(new_type, org_exc: Exception):
73 72 """
74 73 Raise a new exception type with original args and traceback.
75 74 """
76 75
77 76 exc_type, exc_value, exc_traceback = sys.exc_info()
78 77 new_exc = new_type(*exc_value.args)
79 78
80 79 # store the original traceback into the new exc
81 80 new_exc._org_exc_tb = traceback.format_tb(exc_traceback)
82 81
83 82 try:
84 83 raise new_exc.with_traceback(exc_traceback)
85 84 finally:
86 85 del exc_traceback
87 86
88 87
89
90 88 class ArchiveNode(object):
91 89 def __init__(self, path, mode, is_link, raw_bytes):
92 90 self.path = path
93 91 self.mode = mode
94 92 self.is_link = is_link
95 93 self.raw_bytes = raw_bytes
96 94
97 95
98 96 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
99 97 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
100 98 """
101 Function that would store an generate archive and send it to a dedicated backend store
99 Function that would store generate archive and send it to a dedicated backend store
102 100 In here we use diskcache
103 101
104 102 :param node_walker: a generator returning nodes to add to archive
105 103 :param archive_key: key used to store the path
106 104 :param kind: archive kind
107 105 :param mtime: time of creation
108 :param archive_at_path: default '/' the path at archive was started. if this is not '/' it means it's a partial archive
106 :param archive_at_path: default '/' the path at archive was started.
107 If this is not '/' it means it's a partial archive
109 108 :param archive_dir_name: inside dir name when creating an archive
110 109 :param commit_id: commit sha of revision archive was created at
111 110 :param write_metadata:
112 111 :param extra_metadata:
113 112 :param cache_config:
114 113
115 walker should be a file walker, for example:
114 walker should be a file walker, for example,
116 115 def node_walker():
117 116 for file_info in files:
118 117 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
119 118 """
120 119 extra_metadata = extra_metadata or {}
121 120
122 121 d_cache = get_archival_cache_store(config=cache_config)
123 122
124 123 if archive_key in d_cache:
125 124 with d_cache as d_cache_reader:
126 125 reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True)
127 126 return reader.name
128 127
129 128 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
130 129 log.debug('Creating new temp archive in %s', archive_tmp_path)
131 130
132 131 if kind == "tgz":
133 132 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
134 133 elif kind == "tbz2":
135 134 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
136 135 elif kind == 'zip':
137 136 archiver = archival.zipit(archive_tmp_path, mtime)
138 137 else:
139 138 raise exceptions.ArchiveException()(
140 139 f'Remote does not support: "{kind}" archive type.')
141 140
142 141 for f in node_walker(commit_id, archive_at_path):
143 142 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
144 143 try:
145 144 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
146 145 except NoContentException:
147 146 # NOTE(marcink): this is a special case for SVN so we can create "empty"
148 # directories which arent supported by archiver
147 # directories which are not supported by archiver
149 148 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
150 149
151 150 if write_metadata:
152 151 metadata = dict([
153 152 ('commit_id', commit_id),
154 153 ('mtime', mtime),
155 154 ])
156 155 metadata.update(extra_metadata)
157 156
158 157 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
159 158 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
160 159 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
161 160
162 161 archiver.done()
163 162
164 163 # ensure set & get are atomic
165 164 with d_cache.transact():
166 165
167 166 with open(archive_tmp_path, 'rb') as archive_file:
168 167 add_result = d_cache.set(archive_key, archive_file, read=True, tag='db-name', retry=True)
169 168 if not add_result:
170 169 log.error('Failed to store cache for key=%s', archive_key)
171 170
172 171 os.remove(archive_tmp_path)
173 172
174 173 reader, tag = d_cache.get(archive_key, read=True, tag=True, retry=True)
175 174 if not reader:
176 175 raise AssertionError(f'empty reader on key={archive_key} added={add_result}')
177 176
178 177 return reader.name
179 178
180 179
181 180 class BinaryEnvelope(object):
182 181 def __init__(self, val):
183 182 self.val = val
184 183
185 184
186 185 class BytesEnvelope(bytes):
187 186 def __new__(cls, content):
188 187 if isinstance(content, bytes):
189 188 return super().__new__(cls, content)
190 189 else:
191 190 raise TypeError('Content must be bytes.')
192 191
193 192
194 193 class BinaryBytesEnvelope(BytesEnvelope):
195 194 pass
@@ -1,112 +1,114 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import logging
19 19 import threading
20 20
21 21 from dogpile.cache import register_backend
22 22
23 23 from . import region_meta
24 24 from .utils import (
25 25 backend_key_generator,
26 26 clear_cache_namespace,
27 27 get_default_cache_settings,
28 28 get_or_create_region,
29 29 make_region,
30 30 str2bool,
31 31 )
32 32
33 33 module_name = 'vcsserver'
34 34
35 35 register_backend(
36 36 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
37 37 "LRUMemoryBackend")
38 38
39 39 register_backend(
40 40 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
41 41 "FileNamespaceBackend")
42 42
43 43 register_backend(
44 44 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
45 45 "RedisPickleBackend")
46 46
47 47 register_backend(
48 48 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
49 49 "RedisMsgPackBackend")
50 50
51 51
52 52 log = logging.getLogger(__name__)
53 53
54 54
55 CACHE_OBJ_CACHE_VER = 'v2'
56
55 57 CLEAR_DELETE = 'delete'
56 58 CLEAR_INVALIDATE = 'invalidate'
57 59
58 60
59 61 def async_creation_runner(cache, somekey, creator, mutex):
60 62
61 63 def runner():
62 64 try:
63 65 value = creator()
64 66 cache.set(somekey, value)
65 67 finally:
66 68 mutex.release()
67 69
68 70 thread = threading.Thread(target=runner)
69 71 thread.start()
70 72
71 73
72 74 def configure_dogpile_cache(settings):
73 75 cache_dir = settings.get('cache_dir')
74 76 if cache_dir:
75 77 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
76 78
77 79 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
78 80
79 81 # inspect available namespaces
80 82 avail_regions = set()
81 83 for key in rc_cache_data.keys():
82 84 namespace_name = key.split('.', 1)[0]
83 85 if namespace_name in avail_regions:
84 86 continue
85 87
86 88 avail_regions.add(namespace_name)
87 89 log.debug('dogpile: found following cache regions: %s', namespace_name)
88 90
89 91 new_region = make_region(
90 92 name=namespace_name,
91 93 function_key_generator=None,
92 94 async_creation_runner=None
93 95 )
94 96
95 97 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
96 98 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
97 99
98 100 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
99 101 if async_creator:
100 102 log.debug('configuring region %s with async creator', new_region)
101 103 new_region.async_creation_runner = async_creation_runner
102 104
103 105 if log.isEnabledFor(logging.DEBUG):
104 106 region_args = dict(backend=new_region.actual_backend,
105 107 region_invalidator=new_region.region_invalidator.__class__)
106 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
108 log.debug('dogpile: registering a new region key=`%s` args=%s', namespace_name, region_args)
107 109
108 110 region_meta.dogpile_cache_regions[namespace_name] = new_region
109 111
110 112
111 113 def includeme(config):
112 114 configure_dogpile_cache(config.registry.settings)
@@ -1,267 +1,267 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 import errno
18 #import errno
19 19 import fcntl
20 20 import functools
21 21 import logging
22 22 import os
23 23 import pickle
24 24 #import time
25 25
26 26 #import gevent
27 27 import msgpack
28 28 import redis
29 29
30 30 flock_org = fcntl.flock
31 31 from typing import Union
32 32
33 33 from dogpile.cache.api import Deserializer, Serializer
34 34 from dogpile.cache.backends import file as file_backend
35 35 from dogpile.cache.backends import memory as memory_backend
36 36 from dogpile.cache.backends import redis as redis_backend
37 37 from dogpile.cache.backends.file import FileLock
38 38 from dogpile.cache.util import memoized_property
39 39
40 40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
41 41 from vcsserver.str_utils import safe_bytes, safe_str
42 42 from vcsserver.type_utils import str2bool
43 43
44 44 _default_max_size = 1024
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 class LRUMemoryBackend(memory_backend.MemoryBackend):
50 50 key_prefix = 'lru_mem_backend'
51 51 pickle_values = False
52 52
53 53 def __init__(self, arguments):
54 54 self.max_size = arguments.pop('max_size', _default_max_size)
55 55
56 56 LRUDictClass = LRUDict
57 57 if arguments.pop('log_key_count', None):
58 58 LRUDictClass = LRUDictDebug
59 59
60 60 arguments['cache_dict'] = LRUDictClass(self.max_size)
61 61 super().__init__(arguments)
62 62
63 63 def __repr__(self):
64 64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
65 65
66 66 def __str__(self):
67 67 return self.__repr__()
68 68
69 69 def delete(self, key):
70 70 try:
71 71 del self._cache[key]
72 72 except KeyError:
73 73 # we don't care if key isn't there at deletion
74 74 pass
75 75
76 76 def delete_multi(self, keys):
77 77 for key in keys:
78 78 self.delete(key)
79 79
80 80
81 81 class PickleSerializer:
82 82 serializer: None | Serializer = staticmethod( # type: ignore
83 83 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
84 84 )
85 85 deserializer: None | Deserializer = staticmethod( # type: ignore
86 86 functools.partial(pickle.loads)
87 87 )
88 88
89 89
90 90 class MsgPackSerializer(object):
91 91 serializer: None | Serializer = staticmethod( # type: ignore
92 92 msgpack.packb
93 93 )
94 94 deserializer: None | Deserializer = staticmethod( # type: ignore
95 95 functools.partial(msgpack.unpackb, use_list=False)
96 96 )
97 97
98 98
99 99 class CustomLockFactory(FileLock):
100 100
101 101 pass
102 102
103 103
104 104 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
105 105 key_prefix = 'file_backend'
106 106
107 107 def __init__(self, arguments):
108 108 arguments['lock_factory'] = CustomLockFactory
109 109 db_file = arguments.get('filename')
110 110
111 111 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
112 112 db_file_dir = os.path.dirname(db_file)
113 113 if not os.path.isdir(db_file_dir):
114 114 os.makedirs(db_file_dir)
115 115
116 116 try:
117 117 super().__init__(arguments)
118 118 except Exception:
119 119 log.exception('Failed to initialize db at: %s', db_file)
120 120 raise
121 121
122 122 def __repr__(self):
123 123 return f'{self.__class__}(file=`{self.filename}`)'
124 124
125 125 def __str__(self):
126 126 return self.__repr__()
127 127
128 128 def _get_keys_pattern(self, prefix: bytes = b''):
129 129 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
130 130
131 131 def list_keys(self, prefix: bytes = b''):
132 132 prefix = self._get_keys_pattern(prefix)
133 133
134 134 def cond(dbm_key: bytes):
135 135 if not prefix:
136 136 return True
137 137
138 138 if dbm_key.startswith(prefix):
139 139 return True
140 140 return False
141 141
142 142 with self._dbm_file(True) as dbm:
143 143 try:
144 144 return list(filter(cond, dbm.keys()))
145 145 except Exception:
146 146 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
147 147 raise
148 148
149 149 def get_store(self):
150 150 return self.filename
151 151
152 152
153 153 class BaseRedisBackend(redis_backend.RedisBackend):
154 154 key_prefix = ''
155 155
156 156 def __init__(self, arguments):
157 157 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
158 158 super().__init__(arguments)
159 159
160 160 self._lock_timeout = self.lock_timeout
161 161 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
162 162
163 163 if self._lock_auto_renewal and not self._lock_timeout:
164 164 # set default timeout for auto_renewal
165 165 self._lock_timeout = 30
166 166
167 167 def __repr__(self):
168 168 return f'{self.__class__}(conn=`{self.db_conn}`)'
169 169
170 170 def __str__(self):
171 171 return self.__repr__()
172 172
173 173 def _create_client(self):
174 174 args = {}
175 175
176 176 if self.url is not None:
177 177 args.update(url=self.url)
178 178
179 179 else:
180 180 args.update(
181 181 host=self.host, password=self.password,
182 182 port=self.port, db=self.db
183 183 )
184 184
185 185 connection_pool = redis.ConnectionPool(**args)
186 186 self.writer_client = redis.StrictRedis(
187 187 connection_pool=connection_pool
188 188 )
189 189 self.reader_client = self.writer_client
190 190
191 191 def _get_keys_pattern(self, prefix: bytes = b''):
192 192 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
193 193
194 194 def list_keys(self, prefix: bytes = b''):
195 195 prefix = self._get_keys_pattern(prefix)
196 196 return self.reader_client.keys(prefix)
197 197
198 198 def get_store(self):
199 199 return self.reader_client.connection_pool
200 200
201 201 def get_mutex(self, key):
202 202 if self.distributed_lock:
203 203 lock_key = f'_lock_{safe_str(key)}'
204 204 return get_mutex_lock(
205 205 self.writer_client, lock_key,
206 206 self._lock_timeout,
207 207 auto_renewal=self._lock_auto_renewal
208 208 )
209 209 else:
210 210 return None
211 211
212 212
213 213 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
214 214 key_prefix = 'redis_pickle_backend'
215 215 pass
216 216
217 217
218 218 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
219 219 key_prefix = 'redis_msgpack_backend'
220 220 pass
221 221
222 222
223 223 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
224 224 from vcsserver.lib._vendor import redis_lock
225 225
226 226 class _RedisLockWrapper(object):
227 227 """LockWrapper for redis_lock"""
228 228
229 229 @classmethod
230 230 def get_lock(cls):
231 231 return redis_lock.Lock(
232 232 redis_client=client,
233 233 name=lock_key,
234 234 expire=lock_timeout,
235 235 auto_renewal=auto_renewal,
236 236 strict=True,
237 237 )
238 238
239 239 def __repr__(self):
240 240 return f"{self.__class__.__name__}:{lock_key}"
241 241
242 242 def __str__(self):
243 243 return f"{self.__class__.__name__}:{lock_key}"
244 244
245 245 def __init__(self):
246 246 self.lock = self.get_lock()
247 247 self.lock_key = lock_key
248 248
249 249 def acquire(self, wait=True):
250 250 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
251 251 try:
252 252 acquired = self.lock.acquire(wait)
253 253 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
254 254 return acquired
255 255 except redis_lock.AlreadyAcquired:
256 256 return False
257 257 except redis_lock.AlreadyStarted:
258 258 # refresh thread exists, but it also means we acquired the lock
259 259 return True
260 260
261 261 def release(self):
262 262 try:
263 263 self.lock.release()
264 264 except redis_lock.NotAcquired:
265 265 pass
266 266
267 267 return _RedisLockWrapper()
@@ -1,247 +1,248 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import functools
19 19 import logging
20 20 import os
21 21 import threading
22 22 import time
23 23
24 24 import decorator
25 25 from dogpile.cache import CacheRegion
26 26
27 27
28 28 from vcsserver.utils import sha1
29 29 from vcsserver.str_utils import safe_bytes
30 from vcsserver.type_utils import str2bool
30 from vcsserver.type_utils import str2bool # noqa :required by imports from .utils
31 31
32 32 from . import region_meta
33 33
34 34 log = logging.getLogger(__name__)
35 35
36 36
37 37 class RhodeCodeCacheRegion(CacheRegion):
38 38
39 39 def __repr__(self):
40 40 return f'{self.__class__}(name={self.name})'
41 41
42 42 def conditional_cache_on_arguments(
43 43 self, namespace=None,
44 44 expiration_time=None,
45 45 should_cache_fn=None,
46 46 to_str=str,
47 47 function_key_generator=None,
48 48 condition=True):
49 49 """
50 50 Custom conditional decorator, that will not touch any dogpile internals if
51 51 condition isn't meet. This works a bit different from should_cache_fn
52 52 And it's faster in cases we don't ever want to compute cached values
53 53 """
54 54 expiration_time_is_callable = callable(expiration_time)
55 55 if not namespace:
56 56 namespace = getattr(self, '_default_namespace', None)
57 57
58 58 if function_key_generator is None:
59 59 function_key_generator = self.function_key_generator
60 60
61 61 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
62 62
63 63 if not condition:
64 64 log.debug('Calling un-cached method:%s', user_func.__name__)
65 65 start = time.time()
66 66 result = user_func(*arg, **kw)
67 67 total = time.time() - start
68 68 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
69 69 return result
70 70
71 71 key = func_key_generator(*arg, **kw)
72 72
73 73 timeout = expiration_time() if expiration_time_is_callable \
74 74 else expiration_time
75 75
76 76 log.debug('Calling cached method:`%s`', user_func.__name__)
77 77 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
78 78
79 79 def cache_decorator(user_func):
80 80 if to_str is str:
81 81 # backwards compatible
82 82 key_generator = function_key_generator(namespace, user_func)
83 83 else:
84 84 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
85 85
86 86 def refresh(*arg, **kw):
87 87 """
88 88 Like invalidate, but regenerates the value instead
89 89 """
90 90 key = key_generator(*arg, **kw)
91 91 value = user_func(*arg, **kw)
92 92 self.set(key, value)
93 93 return value
94 94
95 95 def invalidate(*arg, **kw):
96 96 key = key_generator(*arg, **kw)
97 97 self.delete(key)
98 98
99 99 def set_(value, *arg, **kw):
100 100 key = key_generator(*arg, **kw)
101 101 self.set(key, value)
102 102
103 103 def get(*arg, **kw):
104 104 key = key_generator(*arg, **kw)
105 105 return self.get(key)
106 106
107 107 user_func.set = set_
108 108 user_func.invalidate = invalidate
109 109 user_func.get = get
110 110 user_func.refresh = refresh
111 111 user_func.key_generator = key_generator
112 112 user_func.original = user_func
113 113
114 114 # Use `decorate` to preserve the signature of :param:`user_func`.
115 115 return decorator.decorate(user_func, functools.partial(
116 116 get_or_create_for_user_func, key_generator))
117 117
118 118 return cache_decorator
119 119
120 120
121 121 def make_region(*arg, **kw):
122 122 return RhodeCodeCacheRegion(*arg, **kw)
123 123
124 124
125 125 def get_default_cache_settings(settings, prefixes=None):
126 126 prefixes = prefixes or []
127 127 cache_settings = {}
128 128 for key in settings.keys():
129 129 for prefix in prefixes:
130 130 if key.startswith(prefix):
131 131 name = key.split(prefix)[1].strip()
132 132 val = settings[key]
133 133 if isinstance(val, str):
134 134 val = val.strip()
135 135 cache_settings[name] = val
136 136 return cache_settings
137 137
138 138
139 139 def compute_key_from_params(*args):
140 140 """
141 141 Helper to compute key from given params to be used in cache manager
142 142 """
143 143 return sha1(safe_bytes("_".join(map(str, args))))
144 144
145 145
146 146 def custom_key_generator(backend, namespace, fn):
147 147 func_name = fn.__name__
148 148
149 149 def generate_key(*args):
150 150 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 151 namespace_pref = namespace or 'default_namespace'
152 152 arg_key = compute_key_from_params(*args)
153 153 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
154 154
155 155 return final_key
156 156
157 157 return generate_key
158 158
159 159
160 160 def backend_key_generator(backend):
161 161 """
162 162 Special wrapper that also sends over the backend to the key generator
163 163 """
164 164 def wrapper(namespace, fn):
165 165 return custom_key_generator(backend, namespace, fn)
166 166 return wrapper
167 167
168 168
169 169 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
170 170 from .backends import FileNamespaceBackend
171 171 from . import async_creation_runner
172 172
173 173 region_obj = region_meta.dogpile_cache_regions.get(region_name)
174 174 if not region_obj:
175 175 reg_keys = list(region_meta.dogpile_cache_regions.keys())
176 176 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
177 177
178 178 region_uid_name = f'{region_name}:{region_namespace}'
179 179
180 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
180 181 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
181 182 if not region_namespace:
182 183 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
183 184
184 185 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
185 186 if region_exist:
186 187 log.debug('Using already configured region: %s', region_namespace)
187 188 return region_exist
188 189
189 190 expiration_time = region_obj.expiration_time
190 191
191 192 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
192 193 namespace_cache_dir = cache_dir
193 194
194 195 # we default the namespace_cache_dir to our default cache dir.
195 # however if this backend is configured with filename= param, we prioritize that
196 # however, if this backend is configured with filename= param, we prioritize that
196 197 # so all caches within that particular region, even those namespaced end up in the same path
197 198 if region_obj.actual_backend.filename:
198 199 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
199 200
200 201 if not os.path.isdir(namespace_cache_dir):
201 202 os.makedirs(namespace_cache_dir)
202 203 new_region = make_region(
203 204 name=region_uid_name,
204 205 function_key_generator=backend_key_generator(region_obj.actual_backend)
205 206 )
206 207
207 208 namespace_filename = os.path.join(
208 209 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
209 210 # special type that allows 1db per namespace
210 211 new_region.configure(
211 212 backend='dogpile.cache.rc.file_namespace',
212 213 expiration_time=expiration_time,
213 214 arguments={"filename": namespace_filename}
214 215 )
215 216
216 217 # create and save in region caches
217 218 log.debug('configuring new region: %s', region_uid_name)
218 219 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
219 220
220 221 region_obj._default_namespace = region_namespace
221 222 if use_async_runner:
222 223 region_obj.async_creation_runner = async_creation_runner
223 224 return region_obj
224 225
225 226
226 227 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str):
227 228 from . import CLEAR_DELETE, CLEAR_INVALIDATE
228 229
229 230 if not isinstance(cache_region, RhodeCodeCacheRegion):
230 231 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
231 232 log.debug('clearing cache region: %s with method=%s', cache_region, method)
232 233
233 234 num_affected_keys = None
234 235
235 236 if method == CLEAR_INVALIDATE:
236 237 # NOTE: The CacheRegion.invalidate() method’s default mode of
237 238 # operation is to set a timestamp local to this CacheRegion in this Python process only.
238 239 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
239 240 cache_region.invalidate(hard=True)
240 241
241 242 if method == CLEAR_DELETE:
242 243 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
243 244 num_affected_keys = len(cache_keys)
244 245 if num_affected_keys:
245 246 cache_region.delete_multi(cache_keys)
246 247
247 248 return num_affected_keys
@@ -1,47 +1,46 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 from vcsserver.lib import rc_cache
19 19
20 20
21 21 class RemoteBase(object):
22 22 EMPTY_COMMIT = '0' * 40
23 23
24 24 def _region(self, wire):
25 25 cache_repo_id = wire.get('cache_repo_id', '')
26 cache_namespace_uid = f'cache_repo.{cache_repo_id}'
26 cache_namespace_uid = f'cache_repo.{rc_cache.CACHE_OBJ_CACHE_VER}.{cache_repo_id}'
27 27 return rc_cache.get_or_create_region('repo_object', cache_namespace_uid)
28 28
29 29 def _cache_on(self, wire):
30 30 context = wire.get('context', '')
31 31 context_uid = f'{context}'
32 32 repo_id = wire.get('repo_id', '')
33 33 cache = wire.get('cache', True)
34 34 cache_on = context and cache
35 35 return cache_on, context_uid, repo_id
36 36
37 37 def vcsserver_invalidate_cache(self, wire, delete):
38 from vcsserver.lib import rc_cache
39 repo_id = wire.get('repo_id', '')
40 38 cache_repo_id = wire.get('cache_repo_id', '')
41 cache_namespace_uid = f'cache_repo.{cache_repo_id}'
39 cache_namespace_uid = f'cache_repo.{rc_cache.CACHE_OBJ_CACHE_VER}.{cache_repo_id}'
42 40
43 41 if delete:
44 42 rc_cache.clear_cache_namespace(
45 43 'repo_object', cache_namespace_uid, method=rc_cache.CLEAR_DELETE)
46 44
45 repo_id = wire.get('repo_id', '')
47 46 return {'invalidated': {'repo_id': repo_id, 'delete': delete}}
General Comments 0
You need to be logged in to leave comments. Login now