Show More
@@ -0,0 +1,31 b'' | |||||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
|
3 | # | |||
|
4 | # This program is free software; you can redistribute it and/or modify | |||
|
5 | # it under the terms of the GNU General Public License as published by | |||
|
6 | # the Free Software Foundation; either version 3 of the License, or | |||
|
7 | # (at your option) any later version. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU General Public License | |||
|
15 | # along with this program; if not, write to the Free Software Foundation, | |||
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |||
|
17 | ||||
|
18 | from .fanout_cache import get_archival_cache_store | |||
|
19 | from .fanout_cache import get_archival_config | |||
|
20 | ||||
|
21 | from .utils import archive_iterator | |||
|
22 | from .utils import ArchiveCacheLock | |||
|
23 | ||||
|
24 | ||||
|
25 | def includeme(config): | |||
|
26 | # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode | |||
|
27 | return | |||
|
28 | ||||
|
29 | # init our cache at start | |||
|
30 | settings = config.get_settings() | |||
|
31 | get_archival_cache_store(settings) |
@@ -0,0 +1,258 b'' | |||||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
|
3 | # | |||
|
4 | # This program is free software; you can redistribute it and/or modify | |||
|
5 | # it under the terms of the GNU General Public License as published by | |||
|
6 | # the Free Software Foundation; either version 3 of the License, or | |||
|
7 | # (at your option) any later version. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU General Public License | |||
|
15 | # along with this program; if not, write to the Free Software Foundation, | |||
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |||
|
17 | ||||
|
18 | import codecs | |||
|
19 | import contextlib | |||
|
20 | import functools | |||
|
21 | import os | |||
|
22 | import logging | |||
|
23 | import time | |||
|
24 | import typing | |||
|
25 | import zlib | |||
|
26 | ||||
|
27 | from vcsserver.lib.rc_json import json | |||
|
28 | from .lock import GenerationLock | |||
|
29 | ||||
|
30 | log = logging.getLogger(__name__) | |||
|
31 | ||||
|
32 | cache_meta = None | |||
|
33 | ||||
|
34 | UNKNOWN = -241 | |||
|
35 | NO_VAL = -917 | |||
|
36 | ||||
|
37 | MODE_BINARY = 'BINARY' | |||
|
38 | ||||
|
39 | ||||
|
40 | class FileSystemCache: | |||
|
41 | ||||
|
42 | def __init__(self, index, directory, **settings): | |||
|
43 | self._index = index | |||
|
44 | self._directory = directory | |||
|
45 | ||||
|
46 | def _write_file(self, full_path, iterator, mode, encoding=None): | |||
|
47 | full_dir, _ = os.path.split(full_path) | |||
|
48 | ||||
|
49 | for count in range(1, 11): | |||
|
50 | with contextlib.suppress(OSError): | |||
|
51 | os.makedirs(full_dir) | |||
|
52 | ||||
|
53 | try: | |||
|
54 | # Another cache may have deleted the directory before | |||
|
55 | # the file could be opened. | |||
|
56 | writer = open(full_path, mode, encoding=encoding) | |||
|
57 | except OSError: | |||
|
58 | if count == 10: | |||
|
59 | # Give up after 10 tries to open the file. | |||
|
60 | raise | |||
|
61 | continue | |||
|
62 | ||||
|
63 | with writer: | |||
|
64 | size = 0 | |||
|
65 | for chunk in iterator: | |||
|
66 | size += len(chunk) | |||
|
67 | writer.write(chunk) | |||
|
68 | return size | |||
|
69 | ||||
|
70 | def _get_keyfile(self, key): | |||
|
71 | return os.path.join(self._directory, f'{key}.key') | |||
|
72 | ||||
|
73 | def store(self, key, value_reader, metadata): | |||
|
74 | filename, full_path = self.random_filename() | |||
|
75 | key_file = self._get_keyfile(key) | |||
|
76 | ||||
|
77 | # STORE METADATA | |||
|
78 | _metadata = { | |||
|
79 | "version": "v1", | |||
|
80 | "timestamp": time.time(), | |||
|
81 | "filename": filename, | |||
|
82 | "full_path": full_path, | |||
|
83 | "key_file": key_file, | |||
|
84 | } | |||
|
85 | if metadata: | |||
|
86 | _metadata.update(metadata) | |||
|
87 | ||||
|
88 | reader = functools.partial(value_reader.read, 2**22) | |||
|
89 | ||||
|
90 | iterator = iter(reader, b'') | |||
|
91 | size = self._write_file(full_path, iterator, 'xb') | |||
|
92 | ||||
|
93 | # after archive is finished, we create a key to save the presence of the binary file | |||
|
94 | with open(key_file, 'wb') as f: | |||
|
95 | f.write(json.dumps(_metadata)) | |||
|
96 | ||||
|
97 | return key, size, MODE_BINARY, filename, _metadata | |||
|
98 | ||||
|
99 | def fetch(self, key) -> tuple[typing.BinaryIO, dict]: | |||
|
100 | if key not in self: | |||
|
101 | raise KeyError(key) | |||
|
102 | ||||
|
103 | key_file = self._get_keyfile(key) | |||
|
104 | with open(key_file, 'rb') as f: | |||
|
105 | metadata = json.loads(f.read()) | |||
|
106 | ||||
|
107 | filename = metadata['filename'] | |||
|
108 | ||||
|
109 | return open(os.path.join(self._directory, filename), 'rb'), metadata | |||
|
110 | ||||
|
111 | def random_filename(self): | |||
|
112 | """Return filename and full-path tuple for file storage. | |||
|
113 | ||||
|
114 | Filename will be a randomly generated 28 character hexadecimal string | |||
|
115 | with ".archive_cache" suffixed. Two levels of sub-directories will be used to | |||
|
116 | reduce the size of directories. On older filesystems, lookups in | |||
|
117 | directories with many files may be slow. | |||
|
118 | """ | |||
|
119 | ||||
|
120 | hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') | |||
|
121 | sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) | |||
|
122 | name = hex_name[4:] + '.archive_cache' | |||
|
123 | filename = os.path.join(sub_dir, name) | |||
|
124 | full_path = os.path.join(self._directory, filename) | |||
|
125 | return filename, full_path | |||
|
126 | ||||
|
127 | def hash(self, key): | |||
|
128 | """Compute portable hash for `key`. | |||
|
129 | ||||
|
130 | :param key: key to hash | |||
|
131 | :return: hash value | |||
|
132 | ||||
|
133 | """ | |||
|
134 | mask = 0xFFFFFFFF | |||
|
135 | return zlib.adler32(key.encode('utf-8')) & mask # noqa | |||
|
136 | ||||
|
137 | def __contains__(self, key): | |||
|
138 | """Return `True` if `key` matching item is found in cache. | |||
|
139 | ||||
|
140 | :param key: key matching item | |||
|
141 | :return: True if key matching item | |||
|
142 | ||||
|
143 | """ | |||
|
144 | key_file = self._get_keyfile(key) | |||
|
145 | return os.path.exists(key_file) | |||
|
146 | ||||
|
147 | ||||
|
148 | class FanoutCache: | |||
|
149 | """Cache that shards keys and values.""" | |||
|
150 | ||||
|
151 | def __init__( | |||
|
152 | self, directory=None, **settings | |||
|
153 | ): | |||
|
154 | """Initialize cache instance. | |||
|
155 | ||||
|
156 | :param str directory: cache directory | |||
|
157 | :param settings: settings dict | |||
|
158 | ||||
|
159 | """ | |||
|
160 | if directory is None: | |||
|
161 | raise ValueError('directory cannot be None') | |||
|
162 | ||||
|
163 | directory = str(directory) | |||
|
164 | directory = os.path.expanduser(directory) | |||
|
165 | directory = os.path.expandvars(directory) | |||
|
166 | self._directory = directory | |||
|
167 | ||||
|
168 | self._count = settings.pop('cache_shards') | |||
|
169 | self._locking_url = settings.pop('locking_url') | |||
|
170 | ||||
|
171 | self._shards = tuple( | |||
|
172 | FileSystemCache( | |||
|
173 | index=num, | |||
|
174 | directory=os.path.join(directory, 'shard_%03d' % num), | |||
|
175 | **settings, | |||
|
176 | ) | |||
|
177 | for num in range(self._count) | |||
|
178 | ) | |||
|
179 | self._hash = self._shards[0].hash | |||
|
180 | ||||
|
181 | def get_lock(self, lock_key): | |||
|
182 | return GenerationLock(lock_key, self._locking_url) | |||
|
183 | ||||
|
184 | def _get_shard(self, key) -> FileSystemCache: | |||
|
185 | index = self._hash(key) % self._count | |||
|
186 | shard = self._shards[index] | |||
|
187 | return shard | |||
|
188 | ||||
|
189 | def store(self, key, value_reader, metadata=None): | |||
|
190 | shard = self._get_shard(key) | |||
|
191 | return shard.store(key, value_reader, metadata) | |||
|
192 | ||||
|
193 | def fetch(self, key): | |||
|
194 | """Return file handle corresponding to `key` from cache. | |||
|
195 | """ | |||
|
196 | shard = self._get_shard(key) | |||
|
197 | return shard.fetch(key) | |||
|
198 | ||||
|
199 | def has_key(self, key): | |||
|
200 | """Return `True` if `key` matching item is found in cache. | |||
|
201 | ||||
|
202 | :param key: key for item | |||
|
203 | :return: True if key is found | |||
|
204 | ||||
|
205 | """ | |||
|
206 | shard = self._get_shard(key) | |||
|
207 | return key in shard | |||
|
208 | ||||
|
209 | def __contains__(self, item): | |||
|
210 | return self.has_key(item) | |||
|
211 | ||||
|
212 | ||||
|
213 | def get_archival_config(config): | |||
|
214 | ||||
|
215 | final_config = { | |||
|
216 | ||||
|
217 | } | |||
|
218 | ||||
|
219 | for k, v in config.items(): | |||
|
220 | if k.startswith('archive_cache'): | |||
|
221 | final_config[k] = v | |||
|
222 | ||||
|
223 | return final_config | |||
|
224 | ||||
|
225 | ||||
|
226 | def get_archival_cache_store(config): | |||
|
227 | ||||
|
228 | global cache_meta | |||
|
229 | if cache_meta is not None: | |||
|
230 | return cache_meta | |||
|
231 | ||||
|
232 | config = get_archival_config(config) | |||
|
233 | backend = config['archive_cache.backend.type'] | |||
|
234 | if backend != 'filesystem': | |||
|
235 | raise ValueError('archive_cache.backend.type only supports "filesystem"') | |||
|
236 | ||||
|
237 | archive_cache_locking_url = config['archive_cache.locking.url'] | |||
|
238 | archive_cache_dir = config['archive_cache.filesystem.store_dir'] | |||
|
239 | archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] | |||
|
240 | archive_cache_shards = config['archive_cache.filesystem.cache_shards'] | |||
|
241 | archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] | |||
|
242 | ||||
|
243 | log.debug('Initializing archival cache instance under %s', archive_cache_dir) | |||
|
244 | ||||
|
245 | # check if it's ok to write, and re-create the archive cache | |||
|
246 | if not os.path.isdir(archive_cache_dir): | |||
|
247 | os.makedirs(archive_cache_dir, exist_ok=True) | |||
|
248 | ||||
|
249 | d_cache = FanoutCache( | |||
|
250 | archive_cache_dir, | |||
|
251 | locking_url=archive_cache_locking_url, | |||
|
252 | cache_shards=archive_cache_shards, | |||
|
253 | cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, | |||
|
254 | cache_eviction_policy=archive_cache_eviction_policy | |||
|
255 | ) | |||
|
256 | cache_meta = d_cache | |||
|
257 | return cache_meta | |||
|
258 |
@@ -0,0 +1,59 b'' | |||||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
|
3 | # | |||
|
4 | # This program is free software; you can redistribute it and/or modify | |||
|
5 | # it under the terms of the GNU General Public License as published by | |||
|
6 | # the Free Software Foundation; either version 3 of the License, or | |||
|
7 | # (at your option) any later version. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU General Public License | |||
|
15 | # along with this program; if not, write to the Free Software Foundation, | |||
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |||
|
17 | ||||
|
18 | import redis | |||
|
19 | from vcsserver.lib._vendor import redis_lock | |||
|
20 | ||||
|
21 | from .utils import ArchiveCacheLock | |||
|
22 | ||||
|
23 | ||||
|
24 | class GenerationLock: | |||
|
25 | """ | |||
|
26 | Locking mechanism that detects if a lock is acquired | |||
|
27 | ||||
|
28 | with GenerationLock(lock_key): | |||
|
29 | compute_archive() | |||
|
30 | """ | |||
|
31 | lock_timeout = 7200 | |||
|
32 | ||||
|
33 | def __init__(self, lock_key, url): | |||
|
34 | self.lock_key = lock_key | |||
|
35 | self._create_client(url) | |||
|
36 | self.lock = self.get_lock() | |||
|
37 | ||||
|
38 | def _create_client(self, url): | |||
|
39 | connection_pool = redis.ConnectionPool.from_url(url) | |||
|
40 | self.writer_client = redis.StrictRedis( | |||
|
41 | connection_pool=connection_pool | |||
|
42 | ) | |||
|
43 | self.reader_client = self.writer_client | |||
|
44 | ||||
|
45 | def get_lock(self): | |||
|
46 | return redis_lock.Lock( | |||
|
47 | redis_client=self.writer_client, | |||
|
48 | name=self.lock_key, | |||
|
49 | expire=self.lock_timeout, | |||
|
50 | strict=True | |||
|
51 | ) | |||
|
52 | ||||
|
53 | def __enter__(self): | |||
|
54 | acquired = self.lock.acquire(blocking=False) | |||
|
55 | if not acquired: | |||
|
56 | raise ArchiveCacheLock('Failed to create a lock') | |||
|
57 | ||||
|
58 | def __exit__(self, exc_type, exc_val, exc_tb): | |||
|
59 | self.lock.release() |
@@ -0,0 +1,29 b'' | |||||
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
|
2 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
|
3 | # | |||
|
4 | # This program is free software; you can redistribute it and/or modify | |||
|
5 | # it under the terms of the GNU General Public License as published by | |||
|
6 | # the Free Software Foundation; either version 3 of the License, or | |||
|
7 | # (at your option) any later version. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU General Public License | |||
|
15 | # along with this program; if not, write to the Free Software Foundation, | |||
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |||
|
17 | ||||
|
18 | ||||
|
19 | class ArchiveCacheLock(Exception): | |||
|
20 | pass | |||
|
21 | ||||
|
22 | ||||
|
23 | def archive_iterator(_reader, block_size: int = 4096 * 512): | |||
|
24 | # 4096 * 64 = 64KB | |||
|
25 | while 1: | |||
|
26 | data = _reader.read(block_size) | |||
|
27 | if not data: | |||
|
28 | break | |||
|
29 | yield data |
@@ -23,7 +23,6 b' celery==5.3.6' | |||||
23 | tzdata==2024.1 |
|
23 | tzdata==2024.1 | |
24 | vine==5.1.0 |
|
24 | vine==5.1.0 | |
25 | contextlib2==21.6.0 |
|
25 | contextlib2==21.6.0 | |
26 | diskcache==5.6.3 |
|
|||
27 | dogpile.cache==1.3.3 |
|
26 | dogpile.cache==1.3.3 | |
28 | decorator==5.1.1 |
|
27 | decorator==5.1.1 | |
29 | stevedore==5.1.0 |
|
28 | stevedore==5.1.0 |
@@ -120,8 +120,7 b' def store_archive_in_cache(node_walker, ' | |||||
120 | d_cache = get_archival_cache_store(config=cache_config) |
|
120 | d_cache = get_archival_cache_store(config=cache_config) | |
121 |
|
121 | |||
122 | if archive_key in d_cache: |
|
122 | if archive_key in d_cache: | |
123 | with d_cache as d_cache_reader: |
|
123 | reader, metadata = d_cache.fetch(archive_key) | |
124 | reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True) |
|
|||
125 |
|
|
124 | return reader.name | |
126 |
|
125 | |||
127 | archive_tmp_path = safe_bytes(tempfile.mkstemp()[1]) |
|
126 | archive_tmp_path = safe_bytes(tempfile.mkstemp()[1]) | |
@@ -139,6 +138,7 b' def store_archive_in_cache(node_walker, ' | |||||
139 |
|
138 | |||
140 | for f in node_walker(commit_id, archive_at_path): |
|
139 | for f in node_walker(commit_id, archive_at_path): | |
141 | f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/')) |
|
140 | f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/')) | |
|
141 | ||||
142 | try: |
|
142 | try: | |
143 | archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes()) |
|
143 | archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes()) | |
144 | except NoContentException: |
|
144 | except NoContentException: | |
@@ -146,32 +146,26 b' def store_archive_in_cache(node_walker, ' | |||||
146 | # directories which are not supported by archiver |
|
146 | # directories which are not supported by archiver | |
147 | archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'') |
|
147 | archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'') | |
148 |
|
148 | |||
149 | if write_metadata: |
|
|||
150 |
|
|
149 | metadata = dict([ | |
151 |
|
|
150 | ('commit_id', commit_id), | |
152 |
|
|
151 | ('mtime', mtime), | |
153 |
|
|
152 | ]) | |
154 |
|
|
153 | metadata.update(extra_metadata) | |
155 |
|
154 | if write_metadata: | ||
156 | meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()] |
|
155 | meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()] | |
157 | f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt') |
|
156 | f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt') | |
158 | archiver.addfile(f_path, 0o644, False, b'\n'.join(meta)) |
|
157 | archiver.addfile(f_path, 0o644, False, b'\n'.join(meta)) | |
159 |
|
158 | |||
160 | archiver.done() |
|
159 | archiver.done() | |
161 |
|
160 | |||
162 | # ensure set & get are atomic |
|
|||
163 | with d_cache.transact(): |
|
|||
164 |
|
||||
165 |
|
|
161 | with open(archive_tmp_path, 'rb') as archive_file: | |
166 |
|
|
162 | add_result = d_cache.store(archive_key, archive_file, metadata=metadata) | |
167 |
|
|
163 | if not add_result: | |
168 |
|
|
164 | log.error('Failed to store cache for key=%s', archive_key) | |
169 |
|
165 | |||
170 |
|
|
166 | os.remove(archive_tmp_path) | |
171 |
|
167 | |||
172 |
|
|
168 | reader, metadata = d_cache.fetch(archive_key) | |
173 | if not reader: |
|
|||
174 | raise AssertionError(f'empty reader on key={archive_key} added={add_result}') |
|
|||
175 |
|
169 | |||
176 |
|
|
170 | return reader.name | |
177 |
|
171 |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now