Show More
@@ -0,0 +1,31 b'' | |||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |
|
3 | # | |
|
4 | # This program is free software; you can redistribute it and/or modify | |
|
5 | # it under the terms of the GNU General Public License as published by | |
|
6 | # the Free Software Foundation; either version 3 of the License, or | |
|
7 | # (at your option) any later version. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU General Public License | |
|
15 | # along with this program; if not, write to the Free Software Foundation, | |
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
|
17 | ||
|
18 | from .fanout_cache import get_archival_cache_store | |
|
19 | from .fanout_cache import get_archival_config | |
|
20 | ||
|
21 | from .utils import archive_iterator | |
|
22 | from .utils import ArchiveCacheLock | |
|
23 | ||
|
24 | ||
|
25 | def includeme(config): | |
|
26 | # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode | |
|
27 | return | |
|
28 | ||
|
29 | # init our cache at start | |
|
30 | settings = config.get_settings() | |
|
31 | get_archival_cache_store(settings) |
@@ -0,0 +1,258 b'' | |||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |
|
3 | # | |
|
4 | # This program is free software; you can redistribute it and/or modify | |
|
5 | # it under the terms of the GNU General Public License as published by | |
|
6 | # the Free Software Foundation; either version 3 of the License, or | |
|
7 | # (at your option) any later version. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU General Public License | |
|
15 | # along with this program; if not, write to the Free Software Foundation, | |
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
|
17 | ||
|
18 | import codecs | |
|
19 | import contextlib | |
|
20 | import functools | |
|
21 | import os | |
|
22 | import logging | |
|
23 | import time | |
|
24 | import typing | |
|
25 | import zlib | |
|
26 | ||
|
27 | from vcsserver.lib.rc_json import json | |
|
28 | from .lock import GenerationLock | |
|
29 | ||
|
30 | log = logging.getLogger(__name__) | |
|
31 | ||
|
32 | cache_meta = None | |
|
33 | ||
|
34 | UNKNOWN = -241 | |
|
35 | NO_VAL = -917 | |
|
36 | ||
|
37 | MODE_BINARY = 'BINARY' | |
|
38 | ||
|
39 | ||
|
40 | class FileSystemCache: | |
|
41 | ||
|
42 | def __init__(self, index, directory, **settings): | |
|
43 | self._index = index | |
|
44 | self._directory = directory | |
|
45 | ||
|
46 | def _write_file(self, full_path, iterator, mode, encoding=None): | |
|
47 | full_dir, _ = os.path.split(full_path) | |
|
48 | ||
|
49 | for count in range(1, 11): | |
|
50 | with contextlib.suppress(OSError): | |
|
51 | os.makedirs(full_dir) | |
|
52 | ||
|
53 | try: | |
|
54 | # Another cache may have deleted the directory before | |
|
55 | # the file could be opened. | |
|
56 | writer = open(full_path, mode, encoding=encoding) | |
|
57 | except OSError: | |
|
58 | if count == 10: | |
|
59 | # Give up after 10 tries to open the file. | |
|
60 | raise | |
|
61 | continue | |
|
62 | ||
|
63 | with writer: | |
|
64 | size = 0 | |
|
65 | for chunk in iterator: | |
|
66 | size += len(chunk) | |
|
67 | writer.write(chunk) | |
|
68 | return size | |
|
69 | ||
|
70 | def _get_keyfile(self, key): | |
|
71 | return os.path.join(self._directory, f'{key}.key') | |
|
72 | ||
|
73 | def store(self, key, value_reader, metadata): | |
|
74 | filename, full_path = self.random_filename() | |
|
75 | key_file = self._get_keyfile(key) | |
|
76 | ||
|
77 | # STORE METADATA | |
|
78 | _metadata = { | |
|
79 | "version": "v1", | |
|
80 | "timestamp": time.time(), | |
|
81 | "filename": filename, | |
|
82 | "full_path": full_path, | |
|
83 | "key_file": key_file, | |
|
84 | } | |
|
85 | if metadata: | |
|
86 | _metadata.update(metadata) | |
|
87 | ||
|
88 | reader = functools.partial(value_reader.read, 2**22) | |
|
89 | ||
|
90 | iterator = iter(reader, b'') | |
|
91 | size = self._write_file(full_path, iterator, 'xb') | |
|
92 | ||
|
93 | # after archive is finished, we create a key to save the presence of the binary file | |
|
94 | with open(key_file, 'wb') as f: | |
|
95 | f.write(json.dumps(_metadata)) | |
|
96 | ||
|
97 | return key, size, MODE_BINARY, filename, _metadata | |
|
98 | ||
|
99 | def fetch(self, key) -> tuple[typing.BinaryIO, dict]: | |
|
100 | if key not in self: | |
|
101 | raise KeyError(key) | |
|
102 | ||
|
103 | key_file = self._get_keyfile(key) | |
|
104 | with open(key_file, 'rb') as f: | |
|
105 | metadata = json.loads(f.read()) | |
|
106 | ||
|
107 | filename = metadata['filename'] | |
|
108 | ||
|
109 | return open(os.path.join(self._directory, filename), 'rb'), metadata | |
|
110 | ||
|
111 | def random_filename(self): | |
|
112 | """Return filename and full-path tuple for file storage. | |
|
113 | ||
|
114 | Filename will be a randomly generated 28 character hexadecimal string | |
|
115 | with ".archive_cache" suffixed. Two levels of sub-directories will be used to | |
|
116 | reduce the size of directories. On older filesystems, lookups in | |
|
117 | directories with many files may be slow. | |
|
118 | """ | |
|
119 | ||
|
120 | hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') | |
|
121 | sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) | |
|
122 | name = hex_name[4:] + '.archive_cache' | |
|
123 | filename = os.path.join(sub_dir, name) | |
|
124 | full_path = os.path.join(self._directory, filename) | |
|
125 | return filename, full_path | |
|
126 | ||
|
127 | def hash(self, key): | |
|
128 | """Compute portable hash for `key`. | |
|
129 | ||
|
130 | :param key: key to hash | |
|
131 | :return: hash value | |
|
132 | ||
|
133 | """ | |
|
134 | mask = 0xFFFFFFFF | |
|
135 | return zlib.adler32(key.encode('utf-8')) & mask # noqa | |
|
136 | ||
|
137 | def __contains__(self, key): | |
|
138 | """Return `True` if `key` matching item is found in cache. | |
|
139 | ||
|
140 | :param key: key matching item | |
|
141 | :return: True if key matching item | |
|
142 | ||
|
143 | """ | |
|
144 | key_file = self._get_keyfile(key) | |
|
145 | return os.path.exists(key_file) | |
|
146 | ||
|
147 | ||
|
148 | class FanoutCache: | |
|
149 | """Cache that shards keys and values.""" | |
|
150 | ||
|
151 | def __init__( | |
|
152 | self, directory=None, **settings | |
|
153 | ): | |
|
154 | """Initialize cache instance. | |
|
155 | ||
|
156 | :param str directory: cache directory | |
|
157 | :param settings: settings dict | |
|
158 | ||
|
159 | """ | |
|
160 | if directory is None: | |
|
161 | raise ValueError('directory cannot be None') | |
|
162 | ||
|
163 | directory = str(directory) | |
|
164 | directory = os.path.expanduser(directory) | |
|
165 | directory = os.path.expandvars(directory) | |
|
166 | self._directory = directory | |
|
167 | ||
|
168 | self._count = settings.pop('cache_shards') | |
|
169 | self._locking_url = settings.pop('locking_url') | |
|
170 | ||
|
171 | self._shards = tuple( | |
|
172 | FileSystemCache( | |
|
173 | index=num, | |
|
174 | directory=os.path.join(directory, 'shard_%03d' % num), | |
|
175 | **settings, | |
|
176 | ) | |
|
177 | for num in range(self._count) | |
|
178 | ) | |
|
179 | self._hash = self._shards[0].hash | |
|
180 | ||
|
181 | def get_lock(self, lock_key): | |
|
182 | return GenerationLock(lock_key, self._locking_url) | |
|
183 | ||
|
184 | def _get_shard(self, key) -> FileSystemCache: | |
|
185 | index = self._hash(key) % self._count | |
|
186 | shard = self._shards[index] | |
|
187 | return shard | |
|
188 | ||
|
189 | def store(self, key, value_reader, metadata=None): | |
|
190 | shard = self._get_shard(key) | |
|
191 | return shard.store(key, value_reader, metadata) | |
|
192 | ||
|
193 | def fetch(self, key): | |
|
194 | """Return file handle corresponding to `key` from cache. | |
|
195 | """ | |
|
196 | shard = self._get_shard(key) | |
|
197 | return shard.fetch(key) | |
|
198 | ||
|
199 | def has_key(self, key): | |
|
200 | """Return `True` if `key` matching item is found in cache. | |
|
201 | ||
|
202 | :param key: key for item | |
|
203 | :return: True if key is found | |
|
204 | ||
|
205 | """ | |
|
206 | shard = self._get_shard(key) | |
|
207 | return key in shard | |
|
208 | ||
|
209 | def __contains__(self, item): | |
|
210 | return self.has_key(item) | |
|
211 | ||
|
212 | ||
|
213 | def get_archival_config(config): | |
|
214 | ||
|
215 | final_config = { | |
|
216 | ||
|
217 | } | |
|
218 | ||
|
219 | for k, v in config.items(): | |
|
220 | if k.startswith('archive_cache'): | |
|
221 | final_config[k] = v | |
|
222 | ||
|
223 | return final_config | |
|
224 | ||
|
225 | ||
|
226 | def get_archival_cache_store(config): | |
|
227 | ||
|
228 | global cache_meta | |
|
229 | if cache_meta is not None: | |
|
230 | return cache_meta | |
|
231 | ||
|
232 | config = get_archival_config(config) | |
|
233 | backend = config['archive_cache.backend.type'] | |
|
234 | if backend != 'filesystem': | |
|
235 | raise ValueError('archive_cache.backend.type only supports "filesystem"') | |
|
236 | ||
|
237 | archive_cache_locking_url = config['archive_cache.locking.url'] | |
|
238 | archive_cache_dir = config['archive_cache.filesystem.store_dir'] | |
|
239 | archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] | |
|
240 | archive_cache_shards = config['archive_cache.filesystem.cache_shards'] | |
|
241 | archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] | |
|
242 | ||
|
243 | log.debug('Initializing archival cache instance under %s', archive_cache_dir) | |
|
244 | ||
|
245 | # check if it's ok to write, and re-create the archive cache | |
|
246 | if not os.path.isdir(archive_cache_dir): | |
|
247 | os.makedirs(archive_cache_dir, exist_ok=True) | |
|
248 | ||
|
249 | d_cache = FanoutCache( | |
|
250 | archive_cache_dir, | |
|
251 | locking_url=archive_cache_locking_url, | |
|
252 | cache_shards=archive_cache_shards, | |
|
253 | cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, | |
|
254 | cache_eviction_policy=archive_cache_eviction_policy | |
|
255 | ) | |
|
256 | cache_meta = d_cache | |
|
257 | return cache_meta | |
|
258 |
@@ -0,0 +1,59 b'' | |||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |
|
3 | # | |
|
4 | # This program is free software; you can redistribute it and/or modify | |
|
5 | # it under the terms of the GNU General Public License as published by | |
|
6 | # the Free Software Foundation; either version 3 of the License, or | |
|
7 | # (at your option) any later version. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU General Public License | |
|
15 | # along with this program; if not, write to the Free Software Foundation, | |
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
|
17 | ||
|
18 | import redis | |
|
19 | from vcsserver.lib._vendor import redis_lock | |
|
20 | ||
|
21 | from .utils import ArchiveCacheLock | |
|
22 | ||
|
23 | ||
|
24 | class GenerationLock: | |
|
25 | """ | |
|
26 | Locking mechanism that detects if a lock is acquired | |
|
27 | ||
|
28 | with GenerationLock(lock_key): | |
|
29 | compute_archive() | |
|
30 | """ | |
|
31 | lock_timeout = 7200 | |
|
32 | ||
|
33 | def __init__(self, lock_key, url): | |
|
34 | self.lock_key = lock_key | |
|
35 | self._create_client(url) | |
|
36 | self.lock = self.get_lock() | |
|
37 | ||
|
38 | def _create_client(self, url): | |
|
39 | connection_pool = redis.ConnectionPool.from_url(url) | |
|
40 | self.writer_client = redis.StrictRedis( | |
|
41 | connection_pool=connection_pool | |
|
42 | ) | |
|
43 | self.reader_client = self.writer_client | |
|
44 | ||
|
45 | def get_lock(self): | |
|
46 | return redis_lock.Lock( | |
|
47 | redis_client=self.writer_client, | |
|
48 | name=self.lock_key, | |
|
49 | expire=self.lock_timeout, | |
|
50 | strict=True | |
|
51 | ) | |
|
52 | ||
|
53 | def __enter__(self): | |
|
54 | acquired = self.lock.acquire(blocking=False) | |
|
55 | if not acquired: | |
|
56 | raise ArchiveCacheLock('Failed to create a lock') | |
|
57 | ||
|
58 | def __exit__(self, exc_type, exc_val, exc_tb): | |
|
59 | self.lock.release() |
@@ -0,0 +1,29 b'' | |||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |
|
3 | # | |
|
4 | # This program is free software; you can redistribute it and/or modify | |
|
5 | # it under the terms of the GNU General Public License as published by | |
|
6 | # the Free Software Foundation; either version 3 of the License, or | |
|
7 | # (at your option) any later version. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU General Public License | |
|
15 | # along with this program; if not, write to the Free Software Foundation, | |
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
|
17 | ||
|
18 | ||
|
19 | class ArchiveCacheLock(Exception): | |
|
20 | pass | |
|
21 | ||
|
22 | ||
|
23 | def archive_iterator(_reader, block_size: int = 4096 * 512): | |
|
24 | # 4096 * 64 = 64KB | |
|
25 | while 1: | |
|
26 | data = _reader.read(block_size) | |
|
27 | if not data: | |
|
28 | break | |
|
29 | yield data |
@@ -23,7 +23,6 b' celery==5.3.6' | |||
|
23 | 23 | tzdata==2024.1 |
|
24 | 24 | vine==5.1.0 |
|
25 | 25 | contextlib2==21.6.0 |
|
26 | diskcache==5.6.3 | |
|
27 | 26 | dogpile.cache==1.3.3 |
|
28 | 27 | decorator==5.1.1 |
|
29 | 28 | stevedore==5.1.0 |
@@ -120,9 +120,8 b' def store_archive_in_cache(node_walker, ' | |||
|
120 | 120 | d_cache = get_archival_cache_store(config=cache_config) |
|
121 | 121 | |
|
122 | 122 | if archive_key in d_cache: |
|
123 | with d_cache as d_cache_reader: | |
|
124 | reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True) | |
|
125 | return reader.name | |
|
123 | reader, metadata = d_cache.fetch(archive_key) | |
|
124 | return reader.name | |
|
126 | 125 | |
|
127 | 126 | archive_tmp_path = safe_bytes(tempfile.mkstemp()[1]) |
|
128 | 127 | log.debug('Creating new temp archive in %s', archive_tmp_path) |
@@ -139,6 +138,7 b' def store_archive_in_cache(node_walker, ' | |||
|
139 | 138 | |
|
140 | 139 | for f in node_walker(commit_id, archive_at_path): |
|
141 | 140 | f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/')) |
|
141 | ||
|
142 | 142 | try: |
|
143 | 143 | archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes()) |
|
144 | 144 | except NoContentException: |
@@ -146,34 +146,28 b' def store_archive_in_cache(node_walker, ' | |||
|
146 | 146 | # directories which are not supported by archiver |
|
147 | 147 | archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'') |
|
148 | 148 | |
|
149 | metadata = dict([ | |
|
150 | ('commit_id', commit_id), | |
|
151 | ('mtime', mtime), | |
|
152 | ]) | |
|
153 | metadata.update(extra_metadata) | |
|
149 | 154 | if write_metadata: |
|
150 | metadata = dict([ | |
|
151 | ('commit_id', commit_id), | |
|
152 | ('mtime', mtime), | |
|
153 | ]) | |
|
154 | metadata.update(extra_metadata) | |
|
155 | ||
|
156 | 155 | meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()] |
|
157 | 156 | f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt') |
|
158 | 157 | archiver.addfile(f_path, 0o644, False, b'\n'.join(meta)) |
|
159 | 158 | |
|
160 | 159 | archiver.done() |
|
161 | 160 | |
|
162 | # ensure set & get are atomic | |
|
163 | with d_cache.transact(): | |
|
164 | ||
|
165 | with open(archive_tmp_path, 'rb') as archive_file: | |
|
166 | add_result = d_cache.set(archive_key, archive_file, read=True, tag='db-name', retry=True) | |
|
167 | if not add_result: | |
|
168 | log.error('Failed to store cache for key=%s', archive_key) | |
|
161 | with open(archive_tmp_path, 'rb') as archive_file: | |
|
162 | add_result = d_cache.store(archive_key, archive_file, metadata=metadata) | |
|
163 | if not add_result: | |
|
164 | log.error('Failed to store cache for key=%s', archive_key) | |
|
169 | 165 | |
|
170 |
|
|
|
166 | os.remove(archive_tmp_path) | |
|
171 | 167 | |
|
172 |
|
|
|
173 | if not reader: | |
|
174 | raise AssertionError(f'empty reader on key={archive_key} added={add_result}') | |
|
168 | reader, metadata = d_cache.fetch(archive_key) | |
|
175 | 169 | |
|
176 |
|
|
|
170 | return reader.name | |
|
177 | 171 | |
|
178 | 172 | |
|
179 | 173 | class BinaryEnvelope: |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now