##// END OF EJS Templates
feat(disk-cache): rewrite diskcache backend to be k8s and NFS safe....
super-admin -
r5420:9cce0276 default
parent child Browse files
Show More
@@ -0,0 +1,29 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 from .fanout_cache import get_archival_cache_store
20 from .fanout_cache import get_archival_config
21
22 from .utils import archive_iterator
23 from .utils import ArchiveCacheLock
24
25
26 def includeme(config):
27 # init our cache at start
28 settings = config.get_settings()
29 get_archival_cache_store(settings)
@@ -0,0 +1,60 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import redis
20 from rhodecode.lib._vendor import redis_lock
21
22 from .utils import ArchiveCacheLock
23
24
25 class GenerationLock:
26 """
27 Locking mechanism that detects if a lock is acquired
28
29 with GenerationLock(lock_key):
30 compute_archive()
31 """
32 lock_timeout = 7200
33
34 def __init__(self, lock_key, url):
35 self.lock_key = lock_key
36 self._create_client(url)
37 self.lock = self.get_lock()
38
39 def _create_client(self, url):
40 connection_pool = redis.ConnectionPool.from_url(url)
41 self.writer_client = redis.StrictRedis(
42 connection_pool=connection_pool
43 )
44 self.reader_client = self.writer_client
45
46 def get_lock(self):
47 return redis_lock.Lock(
48 redis_client=self.writer_client,
49 name=self.lock_key,
50 expire=self.lock_timeout,
51 strict=True
52 )
53
54 def __enter__(self):
55 acquired = self.lock.acquire(blocking=False)
56 if not acquired:
57 raise ArchiveCacheLock('Failed to create a lock')
58
59 def __exit__(self, exc_type, exc_val, exc_tb):
60 self.lock.release()
@@ -0,0 +1,30 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19
20 class ArchiveCacheLock(Exception):
21 pass
22
23
24 def archive_iterator(_reader, block_size: int = 4096 * 512):
25 # 4096 * 64 = 64KB
26 while 1:
27 data = _reader.read(block_size)
28 if not data:
29 break
30 yield data
@@ -296,14 +296,24 b' file_store.storage_path = /var/opt/rhode'
296 296 ; the repository. This path is important to be shared across filesystems and with
297 297 ; RhodeCode and vcsserver
298 298
299 ; Redis url to acquire/check generation of archives locks
300 archive_cache.locking.url = redis://redis:6379/1
301
302 ; Storage backend, only 'filesystem' is available now
303 archive_cache.backend.type = filesystem
304
299 305 ; Default is $cache_dir/archive_cache if not set
300 archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache
306 archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
301 307
302 308 ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
303 archive_cache.cache_size_gb = 10
309 archive_cache.filesystem.cache_size_gb = 1
310
311 ; Eviction policy used to clear out after cache_size_gb limit is reached
312 archive_cache.filesystem.eviction_policy = least-recently-stored
304 313
305 314 ; By default cache uses sharding technique, this specifies how many shards are there
306 archive_cache.cache_shards = 4
315 archive_cache.filesystem.cache_shards = 8
316
307 317
308 318 ; #############
309 319 ; CELERY CONFIG
@@ -264,14 +264,24 b' file_store.storage_path = /var/opt/rhode'
264 264 ; the repository. This path is important to be shared across filesystems and with
265 265 ; RhodeCode and vcsserver
266 266
267 ; Redis url to acquire/check generation of archives locks
268 archive_cache.locking.url = redis://redis:6379/1
269
270 ; Storage backend, only 'filesystem' is available now
271 archive_cache.backend.type = filesystem
272
267 273 ; Default is $cache_dir/archive_cache if not set
268 archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache
274 archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
269 275
270 276 ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
271 archive_cache.cache_size_gb = 40
277 archive_cache.filesystem.cache_size_gb = 40
278
279 ; Eviction policy used to clear out after cache_size_gb limit is reached
280 archive_cache.filesystem.eviction_policy = least-recently-stored
272 281
273 282 ; By default cache uses sharding technique, this specifies how many shards are there
274 archive_cache.cache_shards = 4
283 archive_cache.filesystem.cache_shards = 8
284
275 285
276 286 ; #############
277 287 ; CELERY CONFIG
@@ -82,7 +82,6 b' deform==2.0.15'
82 82 peppercorn==0.6
83 83 translationstring==1.4
84 84 zope.deprecation==5.0.0
85 diskcache==5.6.3
86 85 docutils==0.19
87 86 dogpile.cache==1.3.3
88 87 decorator==5.1.1
@@ -24,6 +24,8 b' import urllib.request'
24 24 import urllib.parse
25 25 import urllib.error
26 26 import pathlib
27 import time
28 import random
27 29
28 30 from pyramid.httpexceptions import HTTPNotFound, HTTPBadRequest, HTTPFound
29 31
@@ -37,7 +39,8 b' from rhodecode.apps._base import RepoApp'
37 39 from rhodecode.lib import diffs, helpers as h, rc_cache
38 40 from rhodecode.lib import audit_logger
39 41 from rhodecode.lib.hash_utils import sha1_safe
40 from rhodecode.lib.rc_cache.archive_cache import get_archival_cache_store, get_archival_config, ReentrantLock
42 from rhodecode.lib.rc_cache.archive_cache import (
43 get_archival_cache_store, get_archival_config, ArchiveCacheLock, archive_iterator)
41 44 from rhodecode.lib.str_utils import safe_bytes, convert_special_chars
42 45 from rhodecode.lib.view_utils import parse_path_ref
43 46 from rhodecode.lib.exceptions import NonRelativePathError
@@ -417,41 +420,45 b' class RepoFilesView(RepoAppView):'
417 420 # NOTE: we get the config to pass to a call to lazy-init the SAME type of cache on vcsserver
418 421 d_cache_conf = get_archival_config(config=CONFIG)
419 422
423 # This is also a cache key, and lock key
420 424 reentrant_lock_key = archive_name_key + '.lock'
421 with ReentrantLock(d_cache, reentrant_lock_key):
422 # This is also a cache key
423 use_cached_archive = False
424 if not archive_cache_disable and archive_name_key in d_cache:
425 reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True)
426 use_cached_archive = True
427 log.debug('Found cached archive as key=%s tag=%s, serving archive from cache reader=%s',
428 archive_name_key, tag, reader.name)
429 else:
430 reader = None
431 log.debug('Archive with key=%s is not yet cached, creating one now...', archive_name_key)
425
426 use_cached_archive = False
427 if not archive_cache_disable and archive_name_key in d_cache:
428 reader, metadata = d_cache.fetch(archive_name_key)
432 429
433 # generate new archive, as previous was not found in the cache
434 if not reader:
435
436 try:
437 commit.archive_repo(archive_name_key, archive_dir_name=archive_dir_name,
438 kind=fileformat, subrepos=subrepos,
439 archive_at_path=at_path, cache_config=d_cache_conf)
440 except ImproperArchiveTypeError:
441 return _('Unknown archive type')
442
443 reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True)
430 use_cached_archive = True
431 log.debug('Found cached archive as key=%s tag=%s, serving archive from cache reader=%s',
432 archive_name_key, metadata, reader.name)
433 else:
434 reader = None
435 log.debug('Archive with key=%s is not yet cached, creating one now...', archive_name_key)
444 436
445 437 if not reader:
446 raise ValueError('archive cache reader is empty, failed to fetch file from distributed archive cache')
438 # generate new archive, as previous was not found in the cache
439 try:
440 with d_cache.get_lock(reentrant_lock_key):
441 try:
442 commit.archive_repo(archive_name_key, archive_dir_name=archive_dir_name,
443 kind=fileformat, subrepos=subrepos,
444 archive_at_path=at_path, cache_config=d_cache_conf)
445 except ImproperArchiveTypeError:
446 return _('Unknown archive type')
447 except ArchiveCacheLock:
448 retry_after = round(random.uniform(0.3, 3.0), 1)
449 time.sleep(retry_after)
447 450
448 def archive_iterator(_reader, block_size: int = 4096*512):
449 # 4096 * 64 = 64KB
450 while 1:
451 data = _reader.read(block_size)
452 if not data:
453 break
454 yield data
451 location = self.request.url
452 response = Response(
453 f"archive {archive_name_key} generation in progress, Retry-After={retry_after}, Location={location}"
454 )
455 response.headers["Retry-After"] = str(retry_after)
456 response.status_code = 307 # temporary redirect
457
458 response.location = location
459 return response
460
461 reader, metadata = d_cache.fetch(archive_name_key)
455 462
456 463 response = Response(app_iter=archive_iterator(reader))
457 464 response.content_disposition = f'attachment; filename={response_archive_name}'
@@ -189,9 +189,13 b' def sanitize_settings_and_apply_defaults'
189 189 settings_maker.make_setting('rc_cache.sql_cache_short.max_size', 10000, parser='int')
190 190
191 191 # archive_cache
192 settings_maker.make_setting('archive_cache.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
193 settings_maker.make_setting('archive_cache.cache_size_gb', 10, parser='float')
194 settings_maker.make_setting('archive_cache.cache_shards', 10, parser='int')
192 settings_maker.make_setting('archive_cache.locking.url', 'redis://redis:6379/1')
193 settings_maker.make_setting('archive_cache.backend.type', 'filesystem')
194
195 settings_maker.make_setting('archive_cache.filesystem.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
196 settings_maker.make_setting('archive_cache.filesystem.cache_size_gb', 10, parser='float')
197 settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int')
198 settings_maker.make_setting('archive_cache.filesystem.eviction_policy', 'least-recently-stored')
195 199
196 200 settings_maker.env_expand()
197 201
@@ -1,4 +1,4 b''
1 # Copyright (C) 2015-2023 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
@@ -16,33 +16,210 b''
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 import logging
19 import codecs
20 import contextlib
21 import functools
20 22 import os
21 import diskcache
22 from diskcache import RLock
23 import logging
24 import time
25 import typing
26 import zlib
27
28 from rhodecode.lib.ext_json import json
29 from .lock import GenerationLock
23 30
24 31 log = logging.getLogger(__name__)
25 32
26 33 cache_meta = None
27 34
35 UNKNOWN = -241
36 NO_VAL = -917
28 37
29 class ReentrantLock(RLock):
30 def __enter__(self):
31 reentrant_lock_key = self._key
38 MODE_BINARY = 'BINARY'
39
40
41 class FileSystemCache:
42
43 def __init__(self, index, directory, **settings):
44 self._index = index
45 self._directory = directory
46
47 def _write_file(self, full_path, iterator, mode, encoding=None):
48 full_dir, _ = os.path.split(full_path)
49
50 for count in range(1, 11):
51 with contextlib.suppress(OSError):
52 os.makedirs(full_dir)
53
54 try:
55 # Another cache may have deleted the directory before
56 # the file could be opened.
57 writer = open(full_path, mode, encoding=encoding)
58 except OSError:
59 if count == 10:
60 # Give up after 10 tries to open the file.
61 raise
62 continue
63
64 with writer:
65 size = 0
66 for chunk in iterator:
67 size += len(chunk)
68 writer.write(chunk)
69 return size
70
71 def _get_keyfile(self, key):
72 return os.path.join(self._directory, f'{key}.key')
73
74 def store(self, key, value_reader, metadata):
75 filename, full_path = self.random_filename()
76 key_file = self._get_keyfile(key)
77
78 # STORE METADATA
79 _metadata = {
80 "version": "v1",
81 "timestamp": time.time(),
82 "filename": filename,
83 "full_path": full_path,
84 "key_file": key_file,
85 }
86 if metadata:
87 _metadata.update(metadata)
88
89 reader = functools.partial(value_reader.read, 2**22)
90
91 iterator = iter(reader, b'')
92 size = self._write_file(full_path, iterator, 'xb')
93
94 # after archive is finished, we create a key to save the presence of the binary file
95 with open(key_file, 'wb') as f:
96 f.write(json.dumps(_metadata))
97
98 return key, size, MODE_BINARY, filename, _metadata
99
100 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
101 if key not in self:
102 raise KeyError(key)
103
104 key_file = self._get_keyfile(key)
105 with open(key_file, 'rb') as f:
106 metadata = json.loads(f.read())
107
108 filename = metadata['filename']
109
110 return open(os.path.join(self._directory, filename), 'rb'), metadata
111
112 def random_filename(self):
113 """Return filename and full-path tuple for file storage.
114
115 Filename will be a randomly generated 28 character hexadecimal string
116 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
117 reduce the size of directories. On older filesystems, lookups in
118 directories with many files may be slow.
119 """
32 120
33 log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key)
34 #self.acquire()
35 log.debug('Lock for key=%s acquired', reentrant_lock_key)
121 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
122 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
123 name = hex_name[4:] + '.archive_cache'
124 filename = os.path.join(sub_dir, name)
125 full_path = os.path.join(self._directory, filename)
126 return filename, full_path
127
128 def hash(self, key):
129 """Compute portable hash for `key`.
130
131 :param key: key to hash
132 :return: hash value
133
134 """
135 mask = 0xFFFFFFFF
136 return zlib.adler32(key.encode('utf-8')) & mask # noqa
137
138 def __contains__(self, key):
139 """Return `True` if `key` matching item is found in cache.
140
141 :param key: key matching item
142 :return: True if key matching item
143
144 """
145 key_file = self._get_keyfile(key)
146 return os.path.exists(key_file)
147
148
149 class FanoutCache:
150 """Cache that shards keys and values."""
151
152 def __init__(
153 self, directory=None, **settings
154 ):
155 """Initialize cache instance.
156
157 :param str directory: cache directory
158 :param settings: settings dict
159
160 """
161 if directory is None:
162 raise ValueError('directory cannot be None')
163
164 directory = str(directory)
165 directory = os.path.expanduser(directory)
166 directory = os.path.expandvars(directory)
167 self._directory = directory
36 168
37 def __exit__(self, *exc_info):
38 #self.release()
39 pass
169 self._count = settings.pop('cache_shards')
170 self._locking_url = settings.pop('locking_url')
171
172 self._shards = tuple(
173 FileSystemCache(
174 index=num,
175 directory=os.path.join(directory, 'shard_%03d' % num),
176 **settings,
177 )
178 for num in range(self._count)
179 )
180 self._hash = self._shards[0].hash
181
182 def get_lock(self, lock_key):
183 return GenerationLock(lock_key, self._locking_url)
184
185 def _get_shard(self, key) -> FileSystemCache:
186 index = self._hash(key) % self._count
187 shard = self._shards[index]
188 return shard
189
190 def store(self, key, value_reader, metadata=None):
191 shard = self._get_shard(key)
192 return shard.store(key, value_reader, metadata)
193
194 def fetch(self, key):
195 """Return file handle corresponding to `key` from cache.
196 """
197 shard = self._get_shard(key)
198 return shard.fetch(key)
199
200 def has_key(self, key):
201 """Return `True` if `key` matching item is found in cache.
202
203 :param key: key for item
204 :return: True if key is found
205
206 """
207 shard = self._get_shard(key)
208 return key in shard
209
210 def __contains__(self, item):
211 return self.has_key(item)
212
213 def evict(self):
214 """Remove old items based on the conditions"""
215 # TODO: Implement this...
216 return
40 217
41 218
42 219 def get_archival_config(config):
43 220
44 221 final_config = {
45 'archive_cache.eviction_policy': 'least-frequently-used'
222
46 223 }
47 224
48 225 for k, v in config.items():
@@ -59,11 +236,15 b' def get_archival_cache_store(config):'
59 236 return cache_meta
60 237
61 238 config = get_archival_config(config)
239 backend = config['archive_cache.backend.type']
240 if backend != 'filesystem':
241 raise ValueError('archive_cache.backend.type only supports "filesystem"')
62 242
63 archive_cache_dir = config['archive_cache.store_dir']
64 archive_cache_size_gb = config['archive_cache.cache_size_gb']
65 archive_cache_shards = config['archive_cache.cache_shards']
66 archive_cache_eviction_policy = config['archive_cache.eviction_policy']
243 archive_cache_locking_url = config['archive_cache.locking.url']
244 archive_cache_dir = config['archive_cache.filesystem.store_dir']
245 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
246 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
247 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
67 248
68 249 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
69 250
@@ -71,18 +252,13 b' def get_archival_cache_store(config):'
71 252 if not os.path.isdir(archive_cache_dir):
72 253 os.makedirs(archive_cache_dir, exist_ok=True)
73 254
74 d_cache = diskcache.FanoutCache(
75 archive_cache_dir, shards=archive_cache_shards,
76 cull_limit=0, # manual eviction required
77 size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
78 eviction_policy=archive_cache_eviction_policy,
79 timeout=30
255 d_cache = FanoutCache(
256 archive_cache_dir,
257 locking_url=archive_cache_locking_url,
258 cache_shards=archive_cache_shards,
259 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
260 cache_eviction_policy=archive_cache_eviction_policy
80 261 )
81 262 cache_meta = d_cache
82 263 return cache_meta
83 264
84
85 def includeme(config):
86 # init our cache at start
87 settings = config.get_settings()
88 get_archival_cache_store(settings)
@@ -401,13 +401,20 b' def storage_archives():'
401 401 from rhodecode.lib.utils import safe_str
402 402 from rhodecode.lib.helpers import format_byte_size_binary
403 403
404 msg = 'Archive cache storage is controlled by ' \
405 'archive_cache.store_dir=/path/to/cache option in the .ini file'
406 path = safe_str(rhodecode.CONFIG.get('archive_cache.store_dir', msg))
404 storage_type = rhodecode.ConfigGet().get_str('archive_cache.backend.type')
405 storage_key = 'archive_cache.filesystem.store_dir'
406
407 default_msg = 'Archive cache storage is controlled by '\
408 f'{storage_key}=/path/to/cache option in the .ini file'
409 path = rhodecode.ConfigGet().get_str(storage_key, missing=default_msg)
407 410
408 411 value = dict(percent=0, used=0, total=0, items=0, path=path, text='')
409 412 state = STATE_OK_DEFAULT
410 413 try:
414 if storage_type != 'filesystem':
415 # raise Exc to stop reporting on different type
416 raise ValueError('Storage type must be "filesystem"')
417
411 418 items_count = 0
412 419 used = 0
413 420 for root, dirs, files in os.walk(path):
General Comments 0
You need to be logged in to leave comments. Login now