##// END OF EJS Templates
feat(archive-cache): implemented s3 based backend for filecaches
super-admin -
r5433:d96689c8 default
parent child Browse files
Show More
@@ -0,0 +1,17 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
@@ -0,0 +1,348 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import os
20 import functools
21 import logging
22 import typing
23 import time
24 import zlib
25
26 from ...ext_json import json
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 from ..lock import GenerationLock
29
30 log = logging.getLogger(__name__)
31
32
33 class BaseShard:
34 storage_type: str = ''
35 fs = None
36
37 @classmethod
38 def hash(cls, key):
39 """Compute portable hash for `key`.
40
41 :param key: key to hash
42 :return: hash value
43
44 """
45 mask = 0xFFFFFFFF
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47
48 def _write_file(self, full_path, read_iterator, mode):
49 raise NotImplementedError
50
51 def _get_keyfile(self, key):
52 raise NotImplementedError
53
54 def random_filename(self):
55 raise NotImplementedError
56
57 def _store(self, key, value_reader, metadata, mode):
58 (filename, # hash-name
59 full_path # full-path/hash-name
60 ) = self.random_filename()
61
62 key_file, key_file_path = self._get_keyfile(key)
63
64 # STORE METADATA
65 _metadata = {
66 "version": "v1",
67
68 "key_file": key_file, # this is the .key.json file storing meta
69 "key_file_path": key_file_path, # full path to key_file
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 "archive_filename": filename, # the actual filename we stored that file under
72 "archive_full_path": full_path,
73
74 "store_time": time.time(),
75 "access_count": 0,
76 "access_time": 0,
77
78 "size": 0
79 }
80 if metadata:
81 _metadata.update(metadata)
82
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 _metadata['size'] = size
86 _metadata['sha256'] = sha256
87
88 # after archive is finished, we create a key to save the presence of the binary file
89 with self.fs.open(key_file_path, 'wb') as f:
90 f.write(json.dumps(_metadata))
91
92 return key, filename, size, _metadata
93
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
95 if retry is NOT_GIVEN:
96 retry = False
97 if retry_attempts is NOT_GIVEN:
98 retry_attempts = 0
99
100 if retry and retry_attempts > 0:
101 for attempt in range(1, retry_attempts + 1):
102 if key in self:
103 break
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 time.sleep(retry_backoff)
106
107 if key not in self:
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 raise KeyError(key)
110
111 key_file, key_file_path = self._get_keyfile(key)
112 with self.fs.open(key_file_path, 'rb') as f:
113 metadata = json.loads(f.read())
114
115 archive_path = metadata['archive_full_path']
116
117 try:
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 finally:
120 # update usage stats, count and accessed
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 metadata["access_time"] = time.time()
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 key_file, metadata['access_count'], metadata['access_time'])
125 with self.fs.open(key_file_path, 'wb') as f:
126 f.write(json.dumps(metadata))
127
128 def _remove(self, key):
129 if key not in self:
130 log.exception(f'requested key={key} not found in {self}')
131 raise KeyError(key)
132
133 key_file, key_file_path = self._get_keyfile(key)
134 with self.fs.open(key_file_path, 'rb') as f:
135 metadata = json.loads(f.read())
136
137 archive_path = metadata['archive_full_path']
138 self.fs.rm(archive_path)
139 self.fs.rm(key_file_path)
140 return 1
141
142 @property
143 def storage_medium(self):
144 return getattr(self, self.storage_type)
145
146 @property
147 def key_suffix(self):
148 return 'key.json'
149
150 def __contains__(self, key):
151 """Return `True` if `key` matching item is found in cache.
152
153 :param key: key matching item
154 :return: True if key matching item
155
156 """
157 key_file, key_file_path = self._get_keyfile(key)
158 return self.fs.exists(key_file_path)
159
160
161 class BaseCache:
162 _locking_url: str = ''
163 _storage_path: str = ''
164 _config = {}
165 retry = False
166 retry_attempts = 0
167 retry_backoff = 1
168 _shards = tuple()
169
170 def __contains__(self, key):
171 """Return `True` if `key` matching item is found in cache.
172
173 :param key: key matching item
174 :return: True if key matching item
175
176 """
177 return self.has_key(key)
178
179 def __repr__(self):
180 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181
182 @classmethod
183 def gb_to_bytes(cls, gb):
184 return gb * (1024 ** 3)
185
186 @property
187 def storage_path(self):
188 return self._storage_path
189
190 @classmethod
191 def get_stats_db(cls):
192 return StatsDB()
193
194 def get_conf(self, key, pop=False):
195 if key not in self._config:
196 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 val = self._config[key]
198 if pop:
199 del self._config[key]
200 return val
201
202 def _get_shard(self, key):
203 raise NotImplementedError
204
205 def _get_size(self, shard, archive_path):
206 raise NotImplementedError
207
208 def store(self, key, value_reader, metadata=None):
209 shard = self._get_shard(key)
210 return shard.store(key, value_reader, metadata)
211
212 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
213 """
214 Return file handle corresponding to `key` from specific shard cache.
215 """
216 if retry is NOT_GIVEN:
217 retry = self.retry
218 if retry_attempts is NOT_GIVEN:
219 retry_attempts = self.retry_attempts
220 retry_backoff = self.retry_backoff
221
222 shard = self._get_shard(key)
223 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
224
225 def remove(self, key):
226 shard = self._get_shard(key)
227 return shard.remove(key)
228
229 def has_key(self, archive_key):
230 """Return `True` if `key` matching item is found in cache.
231
232 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
233 :return: True if key is found
234
235 """
236 shard = self._get_shard(archive_key)
237 return archive_key in shard
238
239 def iter_keys(self):
240 for shard in self._shards:
241 if shard.fs.exists(shard.storage_medium):
242 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
243 for key_file_path in _files:
244 if key_file_path.endswith(shard.key_suffix):
245 yield shard, key_file_path
246
247 def get_lock(self, lock_key):
248 return GenerationLock(lock_key, self._locking_url)
249
250 def evict(self, policy=None, size_limit=None) -> int:
251 """
252 Remove old items based on the conditions
253
254
255 explanation of this algo:
256 iterate over each shard, then for each shard iterate over the .key files
257 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
258 access data, time creation, and access counts.
259
260 Store that into a memory DB so we can run different sorting strategies easily.
261 Summing the size is a sum sql query.
262
263 Then we run a sorting strategy based on eviction policy.
264 We iterate over sorted keys, and remove each checking if we hit the overall limit.
265 """
266
267 policy = policy or self._eviction_policy
268 size_limit = size_limit or self._cache_size_limit
269
270 select_policy = EVICTION_POLICY[policy]['evict']
271
272 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
273 policy, format_size(size_limit))
274
275 if select_policy is None:
276 return 0
277
278 db = self.get_stats_db()
279
280 data = []
281 cnt = 1
282
283 for shard, key_file in self.iter_keys():
284 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
285 metadata = json.loads(f.read())
286
287 key_file_path = os.path.join(shard.storage_medium, key_file)
288
289 archive_key = metadata['archive_key']
290 archive_path = metadata['archive_full_path']
291
292 size = metadata.get('size')
293 if not size:
294 # in case we don't have size re-calc it...
295 size = self._get_size(shard, archive_path)
296
297 data.append([
298 cnt,
299 key_file,
300 key_file_path,
301 archive_key,
302 archive_path,
303 metadata.get('store_time', 0),
304 metadata.get('access_time', 0),
305 metadata.get('access_count', 0),
306 size,
307 ])
308 cnt += 1
309
310 # Insert bulk data using executemany
311 db.bulk_insert(data)
312
313 total_size = db.get_total_size()
314 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
315 len(data), format_size(total_size), format_size(size_limit))
316
317 removed_items = 0
318 removed_size = 0
319 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
320 # simulate removal impact BEFORE removal
321 total_size -= size
322
323 if total_size <= size_limit:
324 # we obtained what we wanted...
325 break
326
327 self.remove(archive_key)
328 removed_items += 1
329 removed_size += size
330
331 log.debug('Removed %s cache archives, and reduced size by: %s',
332 removed_items, format_size(removed_size))
333 return removed_items
334
335 def get_statistics(self):
336 total_files = 0
337 total_size = 0
338 meta = {}
339
340 for shard, key_file in self.iter_keys():
341 json_key = f"{shard.storage_medium}/{key_file}"
342 with shard.fs.open(json_key, 'rb') as f:
343 total_files += 1
344 metadata = json.loads(f.read())
345 total_size += metadata['size']
346
347 return total_files, total_size, meta
348
@@ -0,0 +1,150 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import codecs
20 import hashlib
21 import logging
22 import os
23
24 import fsspec
25
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
29
30 log = logging.getLogger(__name__)
31
32
33 class S3Shard(BaseShard):
34
35 def __init__(self, index, bucket, **settings):
36 self._index = index
37 self._bucket = bucket
38 self.storage_type = 'bucket'
39
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
44 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
45
46 @property
47 def bucket(self):
48 """Cache bucket."""
49 return self._bucket
50
51 def _get_keyfile(self, archive_key) -> tuple[str, str]:
52 key_file = f'{archive_key}-{self.key_suffix}'
53 return key_file, os.path.join(self.bucket, key_file)
54
55 def _get_writer(self, path, mode):
56 return self.fs.open(path, 'wb')
57
58 def _write_file(self, full_path, iterator, mode):
59 # ensure bucket exists
60 destination = self.bucket
61 if not self.fs.exists(destination):
62 self.fs.mkdir(destination, s3_additional_kwargs={})
63
64 writer = self._get_writer(full_path, mode)
65
66 digest = hashlib.sha256()
67 with writer:
68 size = 0
69 for chunk in iterator:
70 size += len(chunk)
71 digest.update(chunk)
72 writer.write(chunk)
73
74 sha256 = digest.hexdigest()
75 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
76 return size, sha256
77
78 def store(self, key, value_reader, metadata: dict | None = None):
79 return self._store(key, value_reader, metadata, mode='wb')
80
81 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
82 return self._fetch(key, retry, retry_attempts, retry_backoff)
83
84 def remove(self, key):
85 return self._remove(key)
86
87 def random_filename(self):
88 """Return filename and full-path tuple for file storage.
89
90 Filename will be a randomly generated 28 character hexadecimal string
91 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
92 reduce the size of directories. On older filesystems, lookups in
93 directories with many files may be slow.
94 """
95
96 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
97
98 archive_name = hex_name[4:] + '.archive_cache'
99 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
100
101 full_path = os.path.join(self.bucket, filename)
102 return archive_name, full_path
103
104 def __repr__(self):
105 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
106
107
108 class ObjectStoreCache(BaseCache):
109
110 def __init__(self, locking_url, **settings):
111 """
112 Initialize objectstore cache instance.
113
114 :param str locking_url: redis url for a lock
115 :param settings: settings dict
116
117 """
118 self._locking_url = locking_url
119 self._config = settings
120
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 self._storage_path = objectstore_url
123
124 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125
126 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
127 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
128
129 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
130 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
131 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
132
133 log.debug('Initializing archival cache instance under %s', objectstore_url)
134 self._shards = tuple(
135 S3Shard(
136 index=num,
137 bucket='rhodecode-archivecache-%03d' % num,
138 **settings,
139 )
140 for num in range(self._count)
141 )
142 self._hash = self._shards[0].hash
143
144 def _get_shard(self, key) -> S3Shard:
145 index = self._hash(key) % self._count
146 shard = self._shards[index]
147 return shard
148
149 def _get_size(self, shard, archive_path):
150 return shard.fs.info(archive_path)['size']
@@ -0,0 +1,105 b''
1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import time
20 import pytest
21 import rhodecode
22 import os
23 import shutil
24 from tempfile import mkdtemp
25
26 from rhodecode.lib import archive_cache
27
28
29 def file_reader(temp_store):
30 with open(temp_store, 'w') as f:
31 for cnt in range(10000):
32 f.write(str(cnt))
33 return open(temp_store, 'rb')
34
35
36 @pytest.fixture()
37 def d_cache_instance(ini_settings):
38 config = ini_settings
39 d_cache = archive_cache.get_archival_cache_store(config=config, always_init=True)
40 return d_cache
41
42
43 @pytest.mark.usefixtures('app')
44 class TestArchiveCaches(object):
45
46 def test_archivecache_empty_stats(self, d_cache_instance):
47 d_cache = d_cache_instance
48 shutil.rmtree(d_cache._directory)
49
50 stats = d_cache.get_statistics()
51 assert (0, 0, {}) == stats
52
53 def test_archivecache_store_keys(self, d_cache_instance, tmp_path):
54 d_cache = d_cache_instance
55 shutil.rmtree(d_cache._directory)
56
57 for n in range(100):
58
59 archive_name = f'my-archive-abc-{n}.zip'
60 temp_archive_path = os.path.join(tmp_path, archive_name)
61 d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'})
62 reader, meta = d_cache.fetch(archive_name)
63 content = reader.read()
64 assert content == open(temp_archive_path, 'rb').read()
65
66 stats = d_cache.get_statistics()
67 assert (100, 3889000, {}) == stats
68
69 def test_archivecache_remove_keys(self, d_cache_instance, tmp_path):
70 d_cache = d_cache_instance
71 shutil.rmtree(d_cache._directory)
72
73 n = 1
74 archive_name = f'my-archive-abc-{n}.zip'
75 temp_archive_path = os.path.join(tmp_path, archive_name)
76
77 d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'})
78 stats = d_cache.get_statistics()
79 assert (1, 38890, {}) == stats
80
81 assert 1 == d_cache.remove(archive_name)
82
83 stats = d_cache.get_statistics()
84 assert (0, 0, {}) == stats
85
86 def test_archivecache_evict_keys(self, d_cache_instance, tmp_path):
87 d_cache = d_cache_instance
88 shutil.rmtree(d_cache._directory)
89 tries = 500
90 for n in range(tries):
91
92 archive_name = f'my-archive-abc-{n}.zip'
93 temp_archive_path = os.path.join(tmp_path, archive_name)
94 d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'})
95
96 stats = d_cache.get_statistics()
97 assert (tries, 19445000, {}) == stats
98 evict_to = 0.005 # around (5mb)
99 evicted_items = d_cache.evict(size_limit=d_cache.gb_to_bytes(evict_to))
100 evicted = 361
101 assert evicted == evicted_items
102
103 stats = d_cache.get_statistics()
104 assert (tries - evicted, 5405710, {}) == stats
105
@@ -290,19 +290,41 b' file_store.backend = local'
290 290 ; path to store the uploaded binaries and artifacts
291 291 file_store.storage_path = /var/opt/rhodecode_data/file_store
292 292
293 ; Uncomment and set this path to control settings for archive download cache.
293
294 ; Redis url to acquire/check generation of archives locks
295 archive_cache.locking.url = redis://redis:6379/1
296
297 ; Storage backend, only 'filesystem' and 'objectstore' are available now
298 archive_cache.backend.type = filesystem
299
300 ; url for s3 compatible storage that allows to upload artifacts
301 ; e.g http://minio:9000
302 archive_cache.objectstore.url = http://s3-minio:9000
303
304 ; key for s3 auth
305 archive_cache.objectstore.key = key
306
307 ; secret for s3 auth
308 archive_cache.objectstore.secret = secret
309
310 ; number of sharded buckets to create to distribute archives across
311 ; default is 8 shards
312 archive_cache.objectstore.bucket_shards = 8
313
314 ; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
315 archive_cache.objectstore.retry = false
316
317 ; number of seconds to wait for next try using retry
318 archive_cache.objectstore.retry_backoff = 1
319
320 ; how many tries do do a retry fetch from this backend
321 archive_cache.objectstore.retry_attempts = 10
322
323 ; Default is $cache_dir/archive_cache if not set
294 324 ; Generated repo archives will be cached at this location
295 325 ; and served from the cache during subsequent requests for the same archive of
296 326 ; the repository. This path is important to be shared across filesystems and with
297 327 ; RhodeCode and vcsserver
298
299 ; Redis url to acquire/check generation of archives locks
300 archive_cache.locking.url = redis://redis:6379/1
301
302 ; Storage backend, only 'filesystem' is available now
303 archive_cache.backend.type = filesystem
304
305 ; Default is $cache_dir/archive_cache if not set
306 328 archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
307 329
308 330 ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
@@ -312,8 +334,18 b' archive_cache.filesystem.cache_size_gb ='
312 334 archive_cache.filesystem.eviction_policy = least-recently-stored
313 335
314 336 ; By default cache uses sharding technique, this specifies how many shards are there
337 ; default is 8 shards
315 338 archive_cache.filesystem.cache_shards = 8
316 339
340 ; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
341 archive_cache.filesystem.retry = false
342
343 ; number of seconds to wait for next try using retry
344 archive_cache.filesystem.retry_backoff = 1
345
346 ; how many tries do do a retry fetch from this backend
347 archive_cache.filesystem.retry_attempts = 10
348
317 349
318 350 ; #############
319 351 ; CELERY CONFIG
@@ -258,19 +258,41 b' file_store.backend = local'
258 258 ; path to store the uploaded binaries and artifacts
259 259 file_store.storage_path = /var/opt/rhodecode_data/file_store
260 260
261 ; Uncomment and set this path to control settings for archive download cache.
261
262 ; Redis url to acquire/check generation of archives locks
263 archive_cache.locking.url = redis://redis:6379/1
264
265 ; Storage backend, only 'filesystem' and 'objectstore' are available now
266 archive_cache.backend.type = filesystem
267
268 ; url for s3 compatible storage that allows to upload artifacts
269 ; e.g http://minio:9000
270 archive_cache.objectstore.url = http://s3-minio:9000
271
272 ; key for s3 auth
273 archive_cache.objectstore.key = key
274
275 ; secret for s3 auth
276 archive_cache.objectstore.secret = secret
277
278 ; number of sharded buckets to create to distribute archives across
279 ; default is 8 shards
280 archive_cache.objectstore.bucket_shards = 8
281
282 ; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
283 archive_cache.objectstore.retry = false
284
285 ; number of seconds to wait for next try using retry
286 archive_cache.objectstore.retry_backoff = 1
287
288 ; how many tries do do a retry fetch from this backend
289 archive_cache.objectstore.retry_attempts = 10
290
291 ; Default is $cache_dir/archive_cache if not set
262 292 ; Generated repo archives will be cached at this location
263 293 ; and served from the cache during subsequent requests for the same archive of
264 294 ; the repository. This path is important to be shared across filesystems and with
265 295 ; RhodeCode and vcsserver
266
267 ; Redis url to acquire/check generation of archives locks
268 archive_cache.locking.url = redis://redis:6379/1
269
270 ; Storage backend, only 'filesystem' is available now
271 archive_cache.backend.type = filesystem
272
273 ; Default is $cache_dir/archive_cache if not set
274 296 archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
275 297
276 298 ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
@@ -280,8 +302,18 b' archive_cache.filesystem.cache_size_gb ='
280 302 archive_cache.filesystem.eviction_policy = least-recently-stored
281 303
282 304 ; By default cache uses sharding technique, this specifies how many shards are there
305 ; default is 8 shards
283 306 archive_cache.filesystem.cache_shards = 8
284 307
308 ; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
309 archive_cache.filesystem.retry = false
310
311 ; number of seconds to wait for next try using retry
312 archive_cache.filesystem.retry_backoff = 1
313
314 ; how many tries do do a retry fetch from this backend
315 archive_cache.filesystem.retry_attempts = 10
316
285 317
286 318 ; #############
287 319 ; CELERY CONFIG
@@ -89,6 +89,7 b' dogpile.cache==1.3.3'
89 89 pbr==5.11.1
90 90 formencode==2.1.0
91 91 six==1.16.0
92 fsspec==2024.6.0
92 93 gunicorn==21.2.0
93 94 packaging==24.0
94 95 gevent==24.2.1
@@ -257,6 +258,34 b' regex==2022.10.31'
257 258 routes==2.5.1
258 259 repoze.lru==0.7
259 260 six==1.16.0
261 s3fs==2024.6.0
262 aiobotocore==2.13.0
263 aiohttp==3.9.5
264 aiosignal==1.3.1
265 frozenlist==1.4.1
266 attrs==22.2.0
267 frozenlist==1.4.1
268 multidict==6.0.5
269 yarl==1.9.4
270 idna==3.4
271 multidict==6.0.5
272 aioitertools==0.11.0
273 botocore==1.34.106
274 jmespath==1.0.1
275 python-dateutil==2.8.2
276 six==1.16.0
277 urllib3==1.26.14
278 wrapt==1.16.0
279 aiohttp==3.9.5
280 aiosignal==1.3.1
281 frozenlist==1.4.1
282 attrs==22.2.0
283 frozenlist==1.4.1
284 multidict==6.0.5
285 yarl==1.9.4
286 idna==3.4
287 multidict==6.0.5
288 fsspec==2024.6.0
260 289 simplejson==3.19.2
261 290 sshpubkeys==3.3.1
262 291 cryptography==40.0.2
@@ -39,7 +39,7 b' from rhodecode.apps._base import RepoApp'
39 39 from rhodecode.lib import diffs, helpers as h, rc_cache
40 40 from rhodecode.lib import audit_logger
41 41 from rhodecode.lib.hash_utils import sha1_safe
42 from rhodecode.lib.rc_cache.archive_cache import (
42 from rhodecode.lib.archive_cache import (
43 43 get_archival_cache_store, get_archival_config, ArchiveCacheGenerationLock, archive_iterator)
44 44 from rhodecode.lib.str_utils import safe_bytes, convert_special_chars
45 45 from rhodecode.lib.view_utils import parse_path_ref
@@ -193,10 +193,26 b' def sanitize_settings_and_apply_defaults'
193 193 settings_maker.make_setting('archive_cache.backend.type', 'filesystem')
194 194
195 195 settings_maker.make_setting('archive_cache.filesystem.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
196 settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int')
196 197 settings_maker.make_setting('archive_cache.filesystem.cache_size_gb', 10, parser='float')
197 settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int')
198 198 settings_maker.make_setting('archive_cache.filesystem.eviction_policy', 'least-recently-stored')
199 199
200 settings_maker.make_setting('archive_cache.filesystem.retry', False, parser='bool')
201 settings_maker.make_setting('archive_cache.filesystem.retry_backoff', 1, parser='int')
202 settings_maker.make_setting('archive_cache.filesystem.retry_attempts', 10, parser='int')
203
204 settings_maker.make_setting('archive_cache.objectstore.url', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
205 settings_maker.make_setting('archive_cache.objectstore.key', '')
206 settings_maker.make_setting('archive_cache.objectstore.secret', '')
207 settings_maker.make_setting('archive_cache.objectstore.bucket_shards', 8, parser='int')
208
209 settings_maker.make_setting('archive_cache.objectstore.cache_size_gb', 10, parser='float')
210 settings_maker.make_setting('archive_cache.objectstore.eviction_policy', 'least-recently-stored')
211
212 settings_maker.make_setting('archive_cache.objectstore.retry', False, parser='bool')
213 settings_maker.make_setting('archive_cache.objectstore.retry_backoff', 1, parser='int')
214 settings_maker.make_setting('archive_cache.objectstore.retry_attempts', 10, parser='int')
215
200 216 settings_maker.env_expand()
201 217
202 218 # configure instance id
@@ -326,7 +326,7 b' def includeme(config, auth_resources=Non'
326 326 config.include('pyramid_mako')
327 327 config.include('rhodecode.lib.rc_beaker')
328 328 config.include('rhodecode.lib.rc_cache')
329 config.include('rhodecode.lib.rc_cache.archive_cache')
329 config.include('rhodecode.lib.archive_cache')
330 330
331 331 config.include('rhodecode.apps._base.navigation')
332 332 config.include('rhodecode.apps._base.subscribers')
@@ -16,14 +16,63 b''
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 from .fanout_cache import get_archival_cache_store
20 from .fanout_cache import get_archival_config
19 import logging
20
21 from .backends.fanout_cache import FileSystemFanoutCache
22 from .backends.objectstore_cache import ObjectStoreCache
21 23
22 from .utils import archive_iterator
23 from .utils import ArchiveCacheGenerationLock
24 from .utils import archive_iterator # noqa
25 from .lock import ArchiveCacheGenerationLock # noqa
26
27 log = logging.getLogger(__name__)
28
29
30 cache_meta = None
24 31
25 32
26 33 def includeme(config):
27 34 # init our cache at start
28 35 settings = config.get_settings()
29 36 get_archival_cache_store(settings)
37
38
39 def get_archival_config(config):
40
41 final_config = {
42
43 }
44
45 for k, v in config.items():
46 if k.startswith('archive_cache'):
47 final_config[k] = v
48
49 return final_config
50
51
52 def get_archival_cache_store(config, always_init=False):
53
54 global cache_meta
55 if cache_meta is not None and not always_init:
56 return cache_meta
57
58 config = get_archival_config(config)
59 backend = config['archive_cache.backend.type']
60
61 archive_cache_locking_url = config['archive_cache.locking.url']
62
63 match backend:
64 case 'filesystem':
65 d_cache = FileSystemFanoutCache(
66 locking_url=archive_cache_locking_url,
67 **config
68 )
69 case 'objectstore':
70 d_cache = ObjectStoreCache(
71 locking_url=archive_cache_locking_url,
72 **config
73 )
74 case _:
75 raise ValueError(f'archive_cache.backend.type only supports "filesystem" or "objectstore" got {backend} ')
76
77 cache_meta = d_cache
78 return cache_meta
@@ -17,127 +17,62 b''
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 import contextlib
21 import functools
22 import os
20 import hashlib
23 21 import logging
24 import time
25 import typing
26 import zlib
27 import sqlite3
22 import os
23
24 import fsspec
28 25
29 from ...ext_json import json
30 from .lock import GenerationLock
31 from .utils import format_size
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
32 29
33 30 log = logging.getLogger(__name__)
34 31
35 cache_meta = None
36 32
37 UNKNOWN = -241
38 NO_VAL = -917
39
40 MODE_BINARY = 'BINARY'
41
42
43 EVICTION_POLICY = {
44 'none': {
45 'evict': None,
46 },
47 'least-recently-stored': {
48 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
49 },
50 'least-recently-used': {
51 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
52 },
53 'least-frequently-used': {
54 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
55 },
56 }
57
58
59 class DB:
60
61 def __init__(self):
62 self.connection = sqlite3.connect(':memory:')
63 self._init_db()
64
65 def _init_db(self):
66 qry = '''
67 CREATE TABLE IF NOT EXISTS archive_cache (
68 rowid INTEGER PRIMARY KEY,
69 key_file TEXT,
70 key_file_path TEXT,
71 filename TEXT,
72 full_path TEXT,
73 store_time REAL,
74 access_time REAL,
75 access_count INTEGER DEFAULT 0,
76 size INTEGER DEFAULT 0
77 )
78 '''
79
80 self.sql(qry)
81 self.connection.commit()
82
83 @property
84 def sql(self):
85 return self.connection.execute
86
87 def bulk_insert(self, rows):
88 qry = '''
89 INSERT INTO archive_cache (
90 rowid,
91 key_file,
92 key_file_path,
93 filename,
94 full_path,
95 store_time,
96 access_time,
97 access_count,
98 size
99 )
100 VALUES (
101 ?, ?, ?, ?, ?, ?, ?, ?, ?
102 )
103 '''
104 cursor = self.connection.cursor()
105 cursor.executemany(qry, rows)
106 self.connection.commit()
107
108
109 class FileSystemCache:
33 class FileSystemShard(BaseShard):
110 34
111 35 def __init__(self, index, directory, **settings):
112 36 self._index = index
113 37 self._directory = directory
38 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
114 40
115 41 @property
116 42 def directory(self):
117 43 """Cache directory."""
118 44 return self._directory
119 45
120 def _write_file(self, full_path, iterator, mode, encoding=None):
121 full_dir, _ = os.path.split(full_path)
46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 key_file = f'{archive_key}.{self.key_suffix}'
48 return key_file, os.path.join(self.directory, key_file)
122 49
50 def _get_writer(self, path, mode):
123 51 for count in range(1, 11):
124 with contextlib.suppress(OSError):
125 os.makedirs(full_dir)
126
127 52 try:
128 53 # Another cache may have deleted the directory before
129 54 # the file could be opened.
130 writer = open(full_path, mode, encoding=encoding)
55 return self.fs.open(path, mode)
131 56 except OSError:
132 57 if count == 10:
133 58 # Give up after 10 tries to open the file.
134 59 raise
135 60 continue
136 61
62 def _write_file(self, full_path, iterator, mode):
63 # ensure dir exists
64 destination, _ = os.path.split(full_path)
65 if not self.fs.exists(destination):
66 self.fs.makedirs(destination)
67
68 writer = self._get_writer(full_path, mode)
69
70 digest = hashlib.sha256()
137 71 with writer:
138 72 size = 0
139 73 for chunk in iterator:
140 74 size += len(chunk)
75 digest.update(chunk)
141 76 writer.write(chunk)
142 77 writer.flush()
143 78 # Get the file descriptor
@@ -145,70 +80,18 b' class FileSystemCache:'
145 80
146 81 # Sync the file descriptor to disk, helps with NFS cases...
147 82 os.fsync(fd)
148 log.debug('written new archive cache under %s', full_path)
149 return size
150
151 def _get_keyfile(self, key):
152 return os.path.join(self._directory, f'{key}.key')
153
154 def store(self, key, value_reader, metadata):
155 filename, full_path = self.random_filename()
156 key_file = self._get_keyfile(key)
157
158 # STORE METADATA
159 _metadata = {
160 "version": "v1",
161 "filename": filename,
162 "full_path": full_path,
163 "key_file": key_file,
164 "store_time": time.time(),
165 "access_count": 1,
166 "access_time": 0,
167 "size": 0
168 }
169 if metadata:
170 _metadata.update(metadata)
171
172 reader = functools.partial(value_reader.read, 2**22)
173
174 iterator = iter(reader, b'')
175 size = self._write_file(full_path, iterator, 'xb')
176 metadata['size'] = size
83 sha256 = digest.hexdigest()
84 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
85 return size, sha256
177 86
178 # after archive is finished, we create a key to save the presence of the binary file
179 with open(key_file, 'wb') as f:
180 f.write(json.dumps(_metadata))
181
182 return key, size, MODE_BINARY, filename, _metadata
183
184 def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
185
186 if retry:
187 for attempt in range(retry_attempts):
188 if key in self:
189 break
190 # we dind't find the key, wait 1s, and re-check
191 time.sleep(1)
87 def store(self, key, value_reader, metadata: dict | None = None):
88 return self._store(key, value_reader, metadata, mode='xb')
192 89
193 if key not in self:
194 log.exception('requested {key} not found in {self}', key, self)
195 raise KeyError(key)
196
197 key_file = self._get_keyfile(key)
198 with open(key_file, 'rb') as f:
199 metadata = json.loads(f.read())
200
201 filename = metadata['filename']
90 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
91 return self._fetch(key, retry, retry_attempts, retry_backoff)
202 92
203 try:
204 return open(os.path.join(self.directory, filename), 'rb'), metadata
205 finally:
206 # update usage stats, count and accessed
207 metadata["access_count"] = metadata.get("access_count", 0) + 1
208 metadata["access_time"] = time.time()
209
210 with open(key_file, 'wb') as f:
211 f.write(json.dumps(metadata))
93 def remove(self, key):
94 return self._remove(key)
212 95
213 96 def random_filename(self):
214 97 """Return filename and full-path tuple for file storage.
@@ -220,64 +103,52 b' class FileSystemCache:'
220 103 """
221 104
222 105 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
223 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
224 name = hex_name[4:] + '.archive_cache'
225 filename = os.path.join(sub_dir, name)
226 full_path = os.path.join(self.directory, filename)
227 return filename, full_path
228
229 def hash(self, key):
230 """Compute portable hash for `key`.
231
232 :param key: key to hash
233 :return: hash value
234 106
235 """
236 mask = 0xFFFFFFFF
237 return zlib.adler32(key.encode('utf-8')) & mask # noqa
238
239 def __contains__(self, key):
240 """Return `True` if `key` matching item is found in cache.
107 archive_name = hex_name[4:] + '.archive_cache'
108 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
241 109
242 :param key: key matching item
243 :return: True if key matching item
244
245 """
246 key_file = self._get_keyfile(key)
247 return os.path.exists(key_file)
110 full_path = os.path.join(self.directory, filename)
111 return archive_name, full_path
248 112
249 113 def __repr__(self):
250 return f'FileSystemCache(index={self._index}, dir={self.directory})'
114 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
251 115
252 116
253 class FanoutCache:
254 """Cache that shards keys and values."""
117 class FileSystemFanoutCache(BaseCache):
255 118
256 def __init__(
257 self, directory=None, **settings
258 ):
259 """Initialize cache instance.
119 def __init__(self, locking_url, **settings):
120 """
121 Initialize file system cache instance.
260 122
261 :param str directory: cache directory
123 :param str locking_url: redis url for a lock
262 124 :param settings: settings dict
263 125
264 126 """
265 if directory is None:
266 raise ValueError('directory cannot be None')
267
268 directory = str(directory)
127 self._locking_url = locking_url
128 self._config = settings
129 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
130 directory = str(cache_dir)
269 131 directory = os.path.expanduser(directory)
270 132 directory = os.path.expandvars(directory)
271 133 self._directory = directory
134 self._storage_path = directory
272 135
273 self._count = settings.pop('cache_shards')
274 self._locking_url = settings.pop('locking_url')
136 # check if it's ok to write, and re-create the archive cache
137 if not os.path.isdir(self._directory):
138 os.makedirs(self._directory, exist_ok=True)
139
140 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
275 141
276 self._eviction_policy = settings['cache_eviction_policy']
277 self._cache_size_limit = settings['cache_size_limit']
142 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
143 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
278 144
145 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
146 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
147 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
148
149 log.debug('Initializing archival cache instance under %s', self._directory)
279 150 self._shards = tuple(
280 FileSystemCache(
151 FileSystemShard(
281 152 index=num,
282 153 directory=os.path.join(directory, 'shard_%03d' % num),
283 154 **settings,
@@ -286,171 +157,10 b' class FanoutCache:'
286 157 )
287 158 self._hash = self._shards[0].hash
288 159
289 @property
290 def directory(self):
291 """Cache directory."""
292 return self._directory
293
294 def get_lock(self, lock_key):
295 return GenerationLock(lock_key, self._locking_url)
296
297 def _get_shard(self, key) -> FileSystemCache:
160 def _get_shard(self, key) -> FileSystemShard:
298 161 index = self._hash(key) % self._count
299 162 shard = self._shards[index]
300 163 return shard
301 164
302 def store(self, key, value_reader, metadata=None):
303 shard = self._get_shard(key)
304 return shard.store(key, value_reader, metadata)
305
306 def fetch(self, key, retry=False, retry_attempts=10):
307 """Return file handle corresponding to `key` from cache.
308 """
309 shard = self._get_shard(key)
310 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
311
312 def has_key(self, key):
313 """Return `True` if `key` matching item is found in cache.
314
315 :param key: key for item
316 :return: True if key is found
317
318 """
319 shard = self._get_shard(key)
320 return key in shard
321
322 def __contains__(self, item):
323 return self.has_key(item)
324
325 def evict(self, policy=None, size_limit=None):
326 """
327 Remove old items based on the conditions
328
329
330 explanation of this algo:
331 iterate over each shard, then for each shard iterate over the .key files
332 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
333 access data, time creation, and access counts.
334
335 Store that into a memory DB so we can run different sorting strategies easily.
336 Summing the size is a sum sql query.
337
338 Then we run a sorting strategy based on eviction policy.
339 We iterate over sorted keys, and remove each checking if we hit the overall limit.
340 """
341
342 policy = policy or self._eviction_policy
343 size_limit = size_limit or self._cache_size_limit
344
345 select_policy = EVICTION_POLICY[policy]['evict']
346
347 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
348 policy, format_size(size_limit))
349
350 if select_policy is None:
351 return 0
352
353 db = DB()
354
355 data = []
356 cnt = 1
357 for shard in self._shards:
358 for key_file in os.listdir(shard.directory):
359 if key_file.endswith('.key'):
360 key_file_path = os.path.join(shard.directory, key_file)
361 with open(key_file_path, 'rb') as f:
362 metadata = json.loads(f.read())
363
364 size = metadata.get('size')
365 filename = metadata.get('filename')
366 full_path = metadata.get('full_path')
367
368 if not size:
369 # in case we don't have size re-calc it...
370 size = os.stat(full_path).st_size
371
372 data.append([
373 cnt,
374 key_file,
375 key_file_path,
376 filename,
377 full_path,
378 metadata.get('store_time', 0),
379 metadata.get('access_time', 0),
380 metadata.get('access_count', 0),
381 size,
382 ])
383 cnt += 1
384
385 # Insert bulk data using executemany
386 db.bulk_insert(data)
387
388 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
389 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
390 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
391 sorted_keys = db.sql(select_policy_qry).fetchall()
392
393 removed_items = 0
394 removed_size = 0
395 for key, cached_file, size in sorted_keys:
396 # simulate removal impact BEFORE removal
397 total_size -= size
398
399 if total_size <= size_limit:
400 # we obtained what we wanted...
401 break
402
403 os.remove(cached_file)
404 os.remove(key)
405 removed_items += 1
406 removed_size += size
407
408 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
409 return removed_items
410
411
412 def get_archival_config(config):
413
414 final_config = {
415
416 }
417
418 for k, v in config.items():
419 if k.startswith('archive_cache'):
420 final_config[k] = v
421
422 return final_config
423
424
425 def get_archival_cache_store(config):
426
427 global cache_meta
428 if cache_meta is not None:
429 return cache_meta
430
431 config = get_archival_config(config)
432 backend = config['archive_cache.backend.type']
433 if backend != 'filesystem':
434 raise ValueError('archive_cache.backend.type only supports "filesystem"')
435
436 archive_cache_locking_url = config['archive_cache.locking.url']
437 archive_cache_dir = config['archive_cache.filesystem.store_dir']
438 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
439 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
440 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
441
442 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
443
444 # check if it's ok to write, and re-create the archive cache
445 if not os.path.isdir(archive_cache_dir):
446 os.makedirs(archive_cache_dir, exist_ok=True)
447
448 d_cache = FanoutCache(
449 archive_cache_dir,
450 locking_url=archive_cache_locking_url,
451 cache_shards=archive_cache_shards,
452 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
453 cache_eviction_policy=archive_cache_eviction_policy
454 )
455 cache_meta = d_cache
456 return cache_meta
165 def _get_size(self, shard, archive_path):
166 return os.stat(archive_path).st_size
@@ -17,8 +17,11 b''
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import redis
20 from ..._vendor import redis_lock
21 from .utils import ArchiveCacheGenerationLock
20 from .._vendor import redis_lock
21
22
23 class ArchiveCacheGenerationLock(Exception):
24 pass
22 25
23 26
24 27 class GenerationLock:
@@ -16,11 +16,26 b''
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 import os
19 import sqlite3
20 import s3fs.core
21
22 NOT_GIVEN = -917
20 23
21 24
22 class ArchiveCacheGenerationLock(Exception):
23 pass
25 EVICTION_POLICY = {
26 'none': {
27 'evict': None,
28 },
29 'least-recently-stored': {
30 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
31 },
32 'least-recently-used': {
33 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
34 },
35 'least-frequently-used': {
36 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
37 },
38 }
24 39
25 40
26 41 def archive_iterator(_reader, block_size: int = 4096 * 512):
@@ -32,41 +47,88 b' def archive_iterator(_reader, block_size'
32 47 yield data
33 48
34 49
35 def get_directory_statistics(start_path):
36 """
37 total_files, total_size, directory_stats = get_directory_statistics(start_path)
38
39 print(f"Directory statistics for: {start_path}\n")
40 print(f"Total files: {total_files}")
41 print(f"Total size: {format_size(total_size)}\n")
42
43 :param start_path:
44 :return:
45 """
46
47 total_files = 0
48 total_size = 0
49 directory_stats = {}
50
51 for dir_path, dir_names, file_names in os.walk(start_path):
52 dir_size = 0
53 file_count = len(file_names)
54
55 for file in file_names:
56 filepath = os.path.join(dir_path, file)
57 file_size = os.path.getsize(filepath)
58 dir_size += file_size
59
60 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
61 total_files += file_count
62 total_size += dir_size
63
64 return total_files, total_size, directory_stats
65
66
67 50 def format_size(size):
68 51 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
69 52 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
70 53 if size < 1024:
71 54 return f"{size:.2f} {unit}"
72 55 size /= 1024
56
57
58 class StatsDB:
59
60 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
63
64 def _init_db(self):
65 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
69 key_file_path TEXT,
70 archive_key TEXT,
71 archive_path TEXT,
72 store_time REAL,
73 access_time REAL,
74 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
76 )
77 '''
78
79 self.sql(qry)
80 self.connection.commit()
81
82 @property
83 def sql(self):
84 return self.connection.execute
85
86 def bulk_insert(self, rows):
87 qry = '''
88 INSERT INTO archive_cache (
89 rowid,
90 key_file,
91 key_file_path,
92 archive_key,
93 archive_path,
94 store_time,
95 access_time,
96 access_count,
97 size
98 )
99 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
102 '''
103 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
105 self.connection.commit()
106
107 def get_total_size(self):
108 qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache'
109 ((total_size,),) = self.sql(qry).fetchall()
110 return total_size
111
112 def get_sorted_keys(self, select_policy):
113 select_policy_qry = select_policy.format(fields='key_file, archive_key, size')
114 return self.sql(select_policy_qry).fetchall()
115
116
117 class ShardFileReader:
118
119 def __init__(self, file_like_reader):
120 self._file_like_reader = file_like_reader
121
122 def __getattr__(self, item):
123 if isinstance(self._file_like_reader, s3fs.core.S3File):
124 match item:
125 case 'name':
126 # S3 FileWrapper doesn't support name attribute, and we use it
127 return self._file_like_reader.full_name
128 case _:
129 return getattr(self._file_like_reader, item)
130 else:
131 return getattr(self._file_like_reader, item)
132
133 def __repr__(self):
134 return f'<{self.__class__.__name__}={self._file_like_reader}>'
@@ -578,7 +578,7 b' def bootstrap_config(request, registry_n'
578 578 config.include('pyramid_mako')
579 579 config.include('rhodecode.lib.rc_beaker')
580 580 config.include('rhodecode.lib.rc_cache')
581 config.include('rhodecode.lib.rc_cache.archive_cache')
581 config.include('rhodecode.lib.archive_cache')
582 582 add_events_routes(config)
583 583
584 584 return config
@@ -2196,3 +2196,35 b' class IssuesRegistry(object):'
2196 2196 @property
2197 2197 def issues_unique_count(self):
2198 2198 return len(set(i['id'] for i in self.issues))
2199
2200
2201 def get_directory_statistics(start_path):
2202 """
2203 total_files, total_size, directory_stats = get_directory_statistics(start_path)
2204
2205 print(f"Directory statistics for: {start_path}\n")
2206 print(f"Total files: {total_files}")
2207 print(f"Total size: {format_size(total_size)}\n")
2208
2209 :param start_path:
2210 :return:
2211 """
2212
2213 total_files = 0
2214 total_size = 0
2215 directory_stats = {}
2216
2217 for dir_path, dir_names, file_names in os.walk(start_path):
2218 dir_size = 0
2219 file_count = len(file_names)
2220
2221 for fname in file_names:
2222 filepath = os.path.join(dir_path, fname)
2223 file_size = os.path.getsize(filepath)
2224 dir_size += file_size
2225
2226 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
2227 total_files += file_count
2228 total_size += dir_size
2229
2230 return total_files, total_size, directory_stats
@@ -399,29 +399,23 b' def storage_inodes():'
399 399 def storage_archives():
400 400 import rhodecode
401 401 from rhodecode.lib.helpers import format_byte_size_binary
402 from rhodecode.lib.rc_cache.archive_cache.utils import get_directory_statistics
402 from rhodecode.lib.archive_cache import get_archival_cache_store
403 403
404 404 storage_type = rhodecode.ConfigGet().get_str('archive_cache.backend.type')
405 storage_key = 'archive_cache.filesystem.store_dir'
406 405
407 default_msg = 'Archive cache storage is controlled by '\
408 f'{storage_key}=/path/to/cache option in the .ini file'
409 path = rhodecode.ConfigGet().get_str(storage_key, missing=default_msg)
410
411 value = dict(percent=0, used=0, total=0, items=0, path=path, text='', type=storage_type)
406 value = dict(percent=0, used=0, total=0, items=0, path='', text='', type=storage_type)
412 407 state = STATE_OK_DEFAULT
413 408 try:
414 if storage_type != 'filesystem':
415 # raise Exc to stop reporting on different type
416 raise ValueError('Storage type must be "filesystem"')
409 d_cache = get_archival_cache_store(config=rhodecode.CONFIG)
417 410
418 total_files, total_size, _directory_stats = get_directory_statistics(path)
411 total_files, total_size, _directory_stats = d_cache.get_statistics()
419 412
420 413 value.update({
421 414 'percent': 100,
422 415 'used': total_size,
423 416 'total': total_size,
424 'items': total_files
417 'items': total_files,
418 'path': d_cache.storage_path
425 419 })
426 420
427 421 except Exception as e:
@@ -441,8 +435,7 b' def storage_archives():'
441 435 def storage_gist():
442 436 from rhodecode.model.gist import GIST_STORE_LOC
443 437 from rhodecode.lib.utils import safe_str, get_rhodecode_repo_store_path
444 from rhodecode.lib.helpers import format_byte_size_binary
445 from rhodecode.lib.rc_cache.archive_cache.utils import get_directory_statistics
438 from rhodecode.lib.helpers import format_byte_size_binary, get_directory_statistics
446 439
447 440 path = safe_str(os.path.join(
448 441 get_rhodecode_repo_store_path(), GIST_STORE_LOC))
@@ -100,7 +100,7 b' def ini_config(request, tmpdir_factory, '
100 100 overrides = [
101 101 {'server:main': {'port': rcserver_port}},
102 102 {'app:main': {
103 'cache_dir': '%(here)s/rc_data',
103 'cache_dir': '%(here)s/rc-tests/rc_data',
104 104 'vcs.server': f'localhost:{vcsserver_port}',
105 105 # johbo: We will always start the VCSServer on our own based on the
106 106 # fixtures of the test cases. For the test run it must always be
@@ -258,20 +258,62 b' file_store.backend = local'
258 258 ; path to store the uploaded binaries and artifacts
259 259 file_store.storage_path = /var/opt/rhodecode_data/file_store
260 260
261 ; Uncomment and set this path to control settings for archive download cache.
261
262 ; Redis url to acquire/check generation of archives locks
263 archive_cache.locking.url = redis://redis:6379/1
264
265 ; Storage backend, only 'filesystem' and 'objectstore' are available now
266 archive_cache.backend.type = filesystem
267
268 ; url for s3 compatible storage that allows to upload artifacts
269 ; e.g http://minio:9000
270 archive_cache.objectstore.url = http://s3-minio:9000
271
272 ; key for s3 auth
273 archive_cache.objectstore.key = key
274
275 ; secret for s3 auth
276 archive_cache.objectstore.secret = secret
277
278 ; number of sharded buckets to create to distribute archives across
279 ; default is 8 shards
280 archive_cache.objectstore.bucket_shards = 8
281
282 ; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
283 archive_cache.objectstore.retry = false
284
285 ; number of seconds to wait for next try using retry
286 archive_cache.objectstore.retry_backoff = 1
287
288 ; how many tries do do a retry fetch from this backend
289 archive_cache.objectstore.retry_attempts = 10
290
291 ; Default is $cache_dir/archive_cache if not set
262 292 ; Generated repo archives will be cached at this location
263 293 ; and served from the cache during subsequent requests for the same archive of
264 294 ; the repository. This path is important to be shared across filesystems and with
265 295 ; RhodeCode and vcsserver
266
267 ; Default is $cache_dir/archive_cache if not set
268 archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache
296 archive_cache.filesystem.store_dir = %(here)s/rc-tests/archive_cache
269 297
270 298 ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
271 archive_cache.cache_size_gb = 10
299 archive_cache.filesystem.cache_size_gb = 2
300
301 ; Eviction policy used to clear out after cache_size_gb limit is reached
302 archive_cache.filesystem.eviction_policy = least-recently-stored
272 303
273 304 ; By default cache uses sharding technique, this specifies how many shards are there
274 archive_cache.cache_shards = 10
305 ; default is 8 shards
306 archive_cache.filesystem.cache_shards = 8
307
308 ; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
309 archive_cache.filesystem.retry = false
310
311 ; number of seconds to wait for next try using retry
312 archive_cache.filesystem.retry_backoff = 1
313
314 ; how many tries do do a retry fetch from this backend
315 archive_cache.filesystem.retry_attempts = 10
316
275 317
276 318 ; #############
277 319 ; CELERY CONFIG
@@ -335,7 +377,7 b' rc_cache.cache_repo_longterm.max_size = '
335 377 rc_cache.cache_general.backend = dogpile.cache.rc.file_namespace
336 378 rc_cache.cache_general.expiration_time = 43200
337 379 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
338 rc_cache.cache_general.arguments.filename = %(here)s/cache-backend/cache_general_db
380 rc_cache.cache_general.arguments.filename = %(here)s/rc-tests/cache-backend/cache_general_db
339 381
340 382 ; alternative `cache_general` redis backend with distributed lock
341 383 #rc_cache.cache_general.backend = dogpile.cache.rc.redis
@@ -362,7 +404,7 b' rc_cache.cache_general.arguments.filenam'
362 404 rc_cache.cache_perms.backend = dogpile.cache.rc.file_namespace
363 405 rc_cache.cache_perms.expiration_time = 0
364 406 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
365 rc_cache.cache_perms.arguments.filename = %(here)s/cache-backend/cache_perms_db
407 rc_cache.cache_perms.arguments.filename = %(here)s/rc-tests/cache-backend/cache_perms_db
366 408
367 409 ; alternative `cache_perms` redis backend with distributed lock
368 410 #rc_cache.cache_perms.backend = dogpile.cache.rc.redis
@@ -389,7 +431,7 b' rc_cache.cache_perms.arguments.filename '
389 431 rc_cache.cache_repo.backend = dogpile.cache.rc.file_namespace
390 432 rc_cache.cache_repo.expiration_time = 2592000
391 433 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
392 rc_cache.cache_repo.arguments.filename = %(here)s/cache-backend/cache_repo_db
434 rc_cache.cache_repo.arguments.filename = %(here)s/rc-tests/cache-backend/cache_repo_db
393 435
394 436 ; alternative `cache_repo` redis backend with distributed lock
395 437 #rc_cache.cache_repo.backend = dogpile.cache.rc.redis
@@ -432,7 +474,7 b' beaker.session.data_dir = %(here)s/rc-te'
432 474
433 475 beaker.session.key = rhodecode
434 476 beaker.session.secret = test-rc-uytcxaz
435 beaker.session.lock_dir = %(here)s/data/sessions/lock
477 beaker.session.lock_dir = %(here)s/rc-tests/data/sessions/lock
436 478
437 479 ; Secure encrypted cookie. Requires AES and AES python libraries
438 480 ; you must disable beaker.session.secret to use this
@@ -464,7 +506,7 b' beaker.session.secure = false'
464 506 ; WHOOSH Backend, doesn't require additional services to run
465 507 ; it works good with few dozen repos
466 508 search.module = rhodecode.lib.index.whoosh
467 search.location = %(here)s/data/index
509 search.location = %(here)s/rc-tests/data/index
468 510
469 511 ; ####################
470 512 ; CHANNELSTREAM CONFIG
@@ -484,7 +526,7 b' channelstream.server = channelstream:980'
484 526 ; see Nginx/Apache configuration examples in our docs
485 527 channelstream.ws_url = ws://rhodecode.yourserver.com/_channelstream
486 528 channelstream.secret = ENV_GENERATED
487 channelstream.history.location = %(here)s/channelstream_history
529 channelstream.history.location = %(here)s/rc-tests/channelstream_history
488 530
489 531 ; Internal application path that Javascript uses to connect into.
490 532 ; If you use proxy-prefix the prefix should be added before /_channelstream
@@ -501,7 +543,7 b' channelstream.proxy_path = /_channelstre'
501 543 ; pymysql is an alternative driver for MySQL, use in case of problems with default one
502 544 #sqlalchemy.db1.url = mysql+pymysql://root:qweqwe@localhost/rhodecode
503 545
504 sqlalchemy.db1.url = sqlite:///%(here)s/rhodecode_test.db?timeout=30
546 sqlalchemy.db1.url = sqlite:///%(here)s/rc-tests/rhodecode_test.db?timeout=30
505 547
506 548 ; see sqlalchemy docs for other advanced settings
507 549 ; print the sql statements to output
@@ -590,7 +632,7 b' svn.proxy.generate_config = false'
590 632 svn.proxy.list_parent_path = true
591 633
592 634 ; Set location and file name of generated config file.
593 svn.proxy.config_file_path = %(here)s/mod_dav_svn.conf
635 svn.proxy.config_file_path = %(here)s/rc-tests/mod_dav_svn.conf
594 636
595 637 ; alternative mod_dav config template. This needs to be a valid mako template
596 638 ; Example template can be found in the source code:
@@ -626,7 +668,7 b' ssh.generate_authorized_keyfile = true'
626 668 ; Path to the authorized_keys file where the generate entries are placed.
627 669 ; It is possible to have multiple key files specified in `sshd_config` e.g.
628 670 ; AuthorizedKeysFile %h/.ssh/authorized_keys %h/.ssh/authorized_keys_rhodecode
629 ssh.authorized_keys_file_path = %(here)s/rc/authorized_keys_rhodecode
671 ssh.authorized_keys_file_path = %(here)s/rc-tests/authorized_keys_rhodecode
630 672
631 673 ; Command to execute the SSH wrapper. The binary is available in the
632 674 ; RhodeCode installation directory.
@@ -28,7 +28,7 b' import mock'
28 28 import pytest
29 29
30 30 import rhodecode
31 from rhodecode.lib.rc_cache.archive_cache import get_archival_config
31 from rhodecode.lib.archive_cache import get_archival_config
32 32 from rhodecode.lib.str_utils import ascii_bytes
33 33 from rhodecode.lib.vcs.backends import base
34 34 from rhodecode.lib.vcs.exceptions import ImproperArchiveTypeError, VCSError
General Comments 0
You need to be logged in to leave comments. Login now