##// END OF EJS Templates
feat(disk-cache): rewrite diskcache backend to be k8s and NFS safe....
super-admin -
r1241:29c2a6b0 default
parent child Browse files
Show More
@@ -0,0 +1,31 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 from .fanout_cache import get_archival_cache_store
19 from .fanout_cache import get_archival_config
20
21 from .utils import archive_iterator
22 from .utils import ArchiveCacheLock
23
24
25 def includeme(config):
26 # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode
27 return
28
29 # init our cache at start
30 settings = config.get_settings()
31 get_archival_cache_store(settings)
@@ -0,0 +1,258 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 import codecs
19 import contextlib
20 import functools
21 import os
22 import logging
23 import time
24 import typing
25 import zlib
26
27 from vcsserver.lib.rc_json import json
28 from .lock import GenerationLock
29
30 log = logging.getLogger(__name__)
31
32 cache_meta = None
33
34 UNKNOWN = -241
35 NO_VAL = -917
36
37 MODE_BINARY = 'BINARY'
38
39
40 class FileSystemCache:
41
42 def __init__(self, index, directory, **settings):
43 self._index = index
44 self._directory = directory
45
46 def _write_file(self, full_path, iterator, mode, encoding=None):
47 full_dir, _ = os.path.split(full_path)
48
49 for count in range(1, 11):
50 with contextlib.suppress(OSError):
51 os.makedirs(full_dir)
52
53 try:
54 # Another cache may have deleted the directory before
55 # the file could be opened.
56 writer = open(full_path, mode, encoding=encoding)
57 except OSError:
58 if count == 10:
59 # Give up after 10 tries to open the file.
60 raise
61 continue
62
63 with writer:
64 size = 0
65 for chunk in iterator:
66 size += len(chunk)
67 writer.write(chunk)
68 return size
69
70 def _get_keyfile(self, key):
71 return os.path.join(self._directory, f'{key}.key')
72
73 def store(self, key, value_reader, metadata):
74 filename, full_path = self.random_filename()
75 key_file = self._get_keyfile(key)
76
77 # STORE METADATA
78 _metadata = {
79 "version": "v1",
80 "timestamp": time.time(),
81 "filename": filename,
82 "full_path": full_path,
83 "key_file": key_file,
84 }
85 if metadata:
86 _metadata.update(metadata)
87
88 reader = functools.partial(value_reader.read, 2**22)
89
90 iterator = iter(reader, b'')
91 size = self._write_file(full_path, iterator, 'xb')
92
93 # after archive is finished, we create a key to save the presence of the binary file
94 with open(key_file, 'wb') as f:
95 f.write(json.dumps(_metadata))
96
97 return key, size, MODE_BINARY, filename, _metadata
98
99 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
100 if key not in self:
101 raise KeyError(key)
102
103 key_file = self._get_keyfile(key)
104 with open(key_file, 'rb') as f:
105 metadata = json.loads(f.read())
106
107 filename = metadata['filename']
108
109 return open(os.path.join(self._directory, filename), 'rb'), metadata
110
111 def random_filename(self):
112 """Return filename and full-path tuple for file storage.
113
114 Filename will be a randomly generated 28 character hexadecimal string
115 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
116 reduce the size of directories. On older filesystems, lookups in
117 directories with many files may be slow.
118 """
119
120 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
121 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
122 name = hex_name[4:] + '.archive_cache'
123 filename = os.path.join(sub_dir, name)
124 full_path = os.path.join(self._directory, filename)
125 return filename, full_path
126
127 def hash(self, key):
128 """Compute portable hash for `key`.
129
130 :param key: key to hash
131 :return: hash value
132
133 """
134 mask = 0xFFFFFFFF
135 return zlib.adler32(key.encode('utf-8')) & mask # noqa
136
137 def __contains__(self, key):
138 """Return `True` if `key` matching item is found in cache.
139
140 :param key: key matching item
141 :return: True if key matching item
142
143 """
144 key_file = self._get_keyfile(key)
145 return os.path.exists(key_file)
146
147
148 class FanoutCache:
149 """Cache that shards keys and values."""
150
151 def __init__(
152 self, directory=None, **settings
153 ):
154 """Initialize cache instance.
155
156 :param str directory: cache directory
157 :param settings: settings dict
158
159 """
160 if directory is None:
161 raise ValueError('directory cannot be None')
162
163 directory = str(directory)
164 directory = os.path.expanduser(directory)
165 directory = os.path.expandvars(directory)
166 self._directory = directory
167
168 self._count = settings.pop('cache_shards')
169 self._locking_url = settings.pop('locking_url')
170
171 self._shards = tuple(
172 FileSystemCache(
173 index=num,
174 directory=os.path.join(directory, 'shard_%03d' % num),
175 **settings,
176 )
177 for num in range(self._count)
178 )
179 self._hash = self._shards[0].hash
180
181 def get_lock(self, lock_key):
182 return GenerationLock(lock_key, self._locking_url)
183
184 def _get_shard(self, key) -> FileSystemCache:
185 index = self._hash(key) % self._count
186 shard = self._shards[index]
187 return shard
188
189 def store(self, key, value_reader, metadata=None):
190 shard = self._get_shard(key)
191 return shard.store(key, value_reader, metadata)
192
193 def fetch(self, key):
194 """Return file handle corresponding to `key` from cache.
195 """
196 shard = self._get_shard(key)
197 return shard.fetch(key)
198
199 def has_key(self, key):
200 """Return `True` if `key` matching item is found in cache.
201
202 :param key: key for item
203 :return: True if key is found
204
205 """
206 shard = self._get_shard(key)
207 return key in shard
208
209 def __contains__(self, item):
210 return self.has_key(item)
211
212
213 def get_archival_config(config):
214
215 final_config = {
216
217 }
218
219 for k, v in config.items():
220 if k.startswith('archive_cache'):
221 final_config[k] = v
222
223 return final_config
224
225
226 def get_archival_cache_store(config):
227
228 global cache_meta
229 if cache_meta is not None:
230 return cache_meta
231
232 config = get_archival_config(config)
233 backend = config['archive_cache.backend.type']
234 if backend != 'filesystem':
235 raise ValueError('archive_cache.backend.type only supports "filesystem"')
236
237 archive_cache_locking_url = config['archive_cache.locking.url']
238 archive_cache_dir = config['archive_cache.filesystem.store_dir']
239 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
240 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
241 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
242
243 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
244
245 # check if it's ok to write, and re-create the archive cache
246 if not os.path.isdir(archive_cache_dir):
247 os.makedirs(archive_cache_dir, exist_ok=True)
248
249 d_cache = FanoutCache(
250 archive_cache_dir,
251 locking_url=archive_cache_locking_url,
252 cache_shards=archive_cache_shards,
253 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
254 cache_eviction_policy=archive_cache_eviction_policy
255 )
256 cache_meta = d_cache
257 return cache_meta
258
@@ -0,0 +1,59 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 import redis
19 from vcsserver.lib._vendor import redis_lock
20
21 from .utils import ArchiveCacheLock
22
23
24 class GenerationLock:
25 """
26 Locking mechanism that detects if a lock is acquired
27
28 with GenerationLock(lock_key):
29 compute_archive()
30 """
31 lock_timeout = 7200
32
33 def __init__(self, lock_key, url):
34 self.lock_key = lock_key
35 self._create_client(url)
36 self.lock = self.get_lock()
37
38 def _create_client(self, url):
39 connection_pool = redis.ConnectionPool.from_url(url)
40 self.writer_client = redis.StrictRedis(
41 connection_pool=connection_pool
42 )
43 self.reader_client = self.writer_client
44
45 def get_lock(self):
46 return redis_lock.Lock(
47 redis_client=self.writer_client,
48 name=self.lock_key,
49 expire=self.lock_timeout,
50 strict=True
51 )
52
53 def __enter__(self):
54 acquired = self.lock.acquire(blocking=False)
55 if not acquired:
56 raise ArchiveCacheLock('Failed to create a lock')
57
58 def __exit__(self, exc_type, exc_val, exc_tb):
59 self.lock.release()
@@ -0,0 +1,29 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18
19 class ArchiveCacheLock(Exception):
20 pass
21
22
23 def archive_iterator(_reader, block_size: int = 4096 * 512):
24 # 4096 * 64 = 64KB
25 while 1:
26 data = _reader.read(block_size)
27 if not data:
28 break
29 yield data
@@ -23,7 +23,6 b' celery==5.3.6'
23 tzdata==2024.1
23 tzdata==2024.1
24 vine==5.1.0
24 vine==5.1.0
25 contextlib2==21.6.0
25 contextlib2==21.6.0
26 diskcache==5.6.3
27 dogpile.cache==1.3.3
26 dogpile.cache==1.3.3
28 decorator==5.1.1
27 decorator==5.1.1
29 stevedore==5.1.0
28 stevedore==5.1.0
@@ -120,9 +120,8 b' def store_archive_in_cache(node_walker, '
120 d_cache = get_archival_cache_store(config=cache_config)
120 d_cache = get_archival_cache_store(config=cache_config)
121
121
122 if archive_key in d_cache:
122 if archive_key in d_cache:
123 with d_cache as d_cache_reader:
123 reader, metadata = d_cache.fetch(archive_key)
124 reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True)
124 return reader.name
125 return reader.name
126
125
127 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
126 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
128 log.debug('Creating new temp archive in %s', archive_tmp_path)
127 log.debug('Creating new temp archive in %s', archive_tmp_path)
@@ -139,6 +138,7 b' def store_archive_in_cache(node_walker, '
139
138
140 for f in node_walker(commit_id, archive_at_path):
139 for f in node_walker(commit_id, archive_at_path):
141 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
140 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
141
142 try:
142 try:
143 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
143 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
144 except NoContentException:
144 except NoContentException:
@@ -146,34 +146,28 b' def store_archive_in_cache(node_walker, '
146 # directories which are not supported by archiver
146 # directories which are not supported by archiver
147 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
147 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
148
148
149 metadata = dict([
150 ('commit_id', commit_id),
151 ('mtime', mtime),
152 ])
153 metadata.update(extra_metadata)
149 if write_metadata:
154 if write_metadata:
150 metadata = dict([
151 ('commit_id', commit_id),
152 ('mtime', mtime),
153 ])
154 metadata.update(extra_metadata)
155
156 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
155 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
157 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
156 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
158 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
157 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
159
158
160 archiver.done()
159 archiver.done()
161
160
162 # ensure set & get are atomic
161 with open(archive_tmp_path, 'rb') as archive_file:
163 with d_cache.transact():
162 add_result = d_cache.store(archive_key, archive_file, metadata=metadata)
164
163 if not add_result:
165 with open(archive_tmp_path, 'rb') as archive_file:
164 log.error('Failed to store cache for key=%s', archive_key)
166 add_result = d_cache.set(archive_key, archive_file, read=True, tag='db-name', retry=True)
167 if not add_result:
168 log.error('Failed to store cache for key=%s', archive_key)
169
165
170 os.remove(archive_tmp_path)
166 os.remove(archive_tmp_path)
171
167
172 reader, tag = d_cache.get(archive_key, read=True, tag=True, retry=True)
168 reader, metadata = d_cache.fetch(archive_key)
173 if not reader:
174 raise AssertionError(f'empty reader on key={archive_key} added={add_result}')
175
169
176 return reader.name
170 return reader.name
177
171
178
172
179 class BinaryEnvelope:
173 class BinaryEnvelope:
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now