# HG changeset patch # User RhodeCode Admin # Date 2024-06-05 10:32:12 # Node ID 9cce0276b2632c248b89b7fbd5a99cb0bcff8475 # Parent 887a975387e530d9e4a015ffe921e774279f9345 feat(disk-cache): rewrite diskcache backend to be k8s and NFS safe. - no longer depend on diskcache - use simpler version with Redis lock, and filesystem storage - fixes RCCE-78 diff --git a/configs/development.ini b/configs/development.ini --- a/configs/development.ini +++ b/configs/development.ini @@ -296,14 +296,24 @@ file_store.storage_path = /var/opt/rhode ; the repository. This path is important to be shared across filesystems and with ; RhodeCode and vcsserver +; Redis url to acquire/check generation of archives locks +archive_cache.locking.url = redis://redis:6379/1 + +; Storage backend, only 'filesystem' is available now +archive_cache.backend.type = filesystem + ; Default is $cache_dir/archive_cache if not set -archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache +archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb -archive_cache.cache_size_gb = 10 +archive_cache.filesystem.cache_size_gb = 1 + +; Eviction policy used to clear out after cache_size_gb limit is reached +archive_cache.filesystem.eviction_policy = least-recently-stored ; By default cache uses sharding technique, this specifies how many shards are there -archive_cache.cache_shards = 4 +archive_cache.filesystem.cache_shards = 8 + ; ############# ; CELERY CONFIG diff --git a/configs/production.ini b/configs/production.ini --- a/configs/production.ini +++ b/configs/production.ini @@ -264,14 +264,24 @@ file_store.storage_path = /var/opt/rhode ; the repository. This path is important to be shared across filesystems and with ; RhodeCode and vcsserver +; Redis url to acquire/check generation of archives locks +archive_cache.locking.url = redis://redis:6379/1 + +; Storage backend, only 'filesystem' is available now +archive_cache.backend.type = filesystem + ; Default is $cache_dir/archive_cache if not set -archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache +archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb -archive_cache.cache_size_gb = 40 +archive_cache.filesystem.cache_size_gb = 40 + +; Eviction policy used to clear out after cache_size_gb limit is reached +archive_cache.filesystem.eviction_policy = least-recently-stored ; By default cache uses sharding technique, this specifies how many shards are there -archive_cache.cache_shards = 4 +archive_cache.filesystem.cache_shards = 8 + ; ############# ; CELERY CONFIG diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -82,7 +82,6 @@ deform==2.0.15 peppercorn==0.6 translationstring==1.4 zope.deprecation==5.0.0 -diskcache==5.6.3 docutils==0.19 dogpile.cache==1.3.3 decorator==5.1.1 diff --git a/rhodecode/apps/repository/views/repo_files.py b/rhodecode/apps/repository/views/repo_files.py --- a/rhodecode/apps/repository/views/repo_files.py +++ b/rhodecode/apps/repository/views/repo_files.py @@ -24,6 +24,8 @@ import urllib.request import urllib.parse import urllib.error import pathlib +import time +import random from pyramid.httpexceptions import HTTPNotFound, HTTPBadRequest, HTTPFound @@ -37,7 +39,8 @@ from rhodecode.apps._base import RepoApp from rhodecode.lib import diffs, helpers as h, rc_cache from rhodecode.lib import audit_logger from rhodecode.lib.hash_utils import sha1_safe -from rhodecode.lib.rc_cache.archive_cache import get_archival_cache_store, get_archival_config, ReentrantLock +from rhodecode.lib.rc_cache.archive_cache import ( + get_archival_cache_store, get_archival_config, ArchiveCacheLock, archive_iterator) from rhodecode.lib.str_utils import safe_bytes, convert_special_chars from rhodecode.lib.view_utils import parse_path_ref from rhodecode.lib.exceptions import NonRelativePathError @@ -417,41 +420,45 @@ class RepoFilesView(RepoAppView): # NOTE: we get the config to pass to a call to lazy-init the SAME type of cache on vcsserver d_cache_conf = get_archival_config(config=CONFIG) + # This is also a cache key, and lock key reentrant_lock_key = archive_name_key + '.lock' - with ReentrantLock(d_cache, reentrant_lock_key): - # This is also a cache key - use_cached_archive = False - if not archive_cache_disable and archive_name_key in d_cache: - reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True) - use_cached_archive = True - log.debug('Found cached archive as key=%s tag=%s, serving archive from cache reader=%s', - archive_name_key, tag, reader.name) - else: - reader = None - log.debug('Archive with key=%s is not yet cached, creating one now...', archive_name_key) + + use_cached_archive = False + if not archive_cache_disable and archive_name_key in d_cache: + reader, metadata = d_cache.fetch(archive_name_key) - # generate new archive, as previous was not found in the cache - if not reader: - - try: - commit.archive_repo(archive_name_key, archive_dir_name=archive_dir_name, - kind=fileformat, subrepos=subrepos, - archive_at_path=at_path, cache_config=d_cache_conf) - except ImproperArchiveTypeError: - return _('Unknown archive type') - - reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True) + use_cached_archive = True + log.debug('Found cached archive as key=%s tag=%s, serving archive from cache reader=%s', + archive_name_key, metadata, reader.name) + else: + reader = None + log.debug('Archive with key=%s is not yet cached, creating one now...', archive_name_key) if not reader: - raise ValueError('archive cache reader is empty, failed to fetch file from distributed archive cache') + # generate new archive, as previous was not found in the cache + try: + with d_cache.get_lock(reentrant_lock_key): + try: + commit.archive_repo(archive_name_key, archive_dir_name=archive_dir_name, + kind=fileformat, subrepos=subrepos, + archive_at_path=at_path, cache_config=d_cache_conf) + except ImproperArchiveTypeError: + return _('Unknown archive type') + except ArchiveCacheLock: + retry_after = round(random.uniform(0.3, 3.0), 1) + time.sleep(retry_after) - def archive_iterator(_reader, block_size: int = 4096*512): - # 4096 * 64 = 64KB - while 1: - data = _reader.read(block_size) - if not data: - break - yield data + location = self.request.url + response = Response( + f"archive {archive_name_key} generation in progress, Retry-After={retry_after}, Location={location}" + ) + response.headers["Retry-After"] = str(retry_after) + response.status_code = 307 # temporary redirect + + response.location = location + return response + + reader, metadata = d_cache.fetch(archive_name_key) response = Response(app_iter=archive_iterator(reader)) response.content_disposition = f'attachment; filename={response_archive_name}' diff --git a/rhodecode/config/config_maker.py b/rhodecode/config/config_maker.py --- a/rhodecode/config/config_maker.py +++ b/rhodecode/config/config_maker.py @@ -189,9 +189,13 @@ def sanitize_settings_and_apply_defaults settings_maker.make_setting('rc_cache.sql_cache_short.max_size', 10000, parser='int') # archive_cache - settings_maker.make_setting('archive_cache.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,) - settings_maker.make_setting('archive_cache.cache_size_gb', 10, parser='float') - settings_maker.make_setting('archive_cache.cache_shards', 10, parser='int') + settings_maker.make_setting('archive_cache.locking.url', 'redis://redis:6379/1') + settings_maker.make_setting('archive_cache.backend.type', 'filesystem') + + settings_maker.make_setting('archive_cache.filesystem.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,) + settings_maker.make_setting('archive_cache.filesystem.cache_size_gb', 10, parser='float') + settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int') + settings_maker.make_setting('archive_cache.filesystem.eviction_policy', 'least-recently-stored') settings_maker.env_expand() diff --git a/rhodecode/lib/rc_cache/archive_cache/__init__.py b/rhodecode/lib/rc_cache/archive_cache/__init__.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/rc_cache/archive_cache/__init__.py @@ -0,0 +1,29 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +from .fanout_cache import get_archival_cache_store +from .fanout_cache import get_archival_config + +from .utils import archive_iterator +from .utils import ArchiveCacheLock + + +def includeme(config): + # init our cache at start + settings = config.get_settings() + get_archival_cache_store(settings) diff --git a/rhodecode/lib/rc_cache/archive_cache.py b/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py rename from rhodecode/lib/rc_cache/archive_cache.py rename to rhodecode/lib/rc_cache/archive_cache/fanout_cache.py --- a/rhodecode/lib/rc_cache/archive_cache.py +++ b/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2023 RhodeCode GmbH +# Copyright (C) 2015-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 @@ -16,33 +16,210 @@ # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ -import logging +import codecs +import contextlib +import functools import os -import diskcache -from diskcache import RLock +import logging +import time +import typing +import zlib + +from rhodecode.lib.ext_json import json +from .lock import GenerationLock log = logging.getLogger(__name__) cache_meta = None +UNKNOWN = -241 +NO_VAL = -917 -class ReentrantLock(RLock): - def __enter__(self): - reentrant_lock_key = self._key +MODE_BINARY = 'BINARY' + + +class FileSystemCache: + + def __init__(self, index, directory, **settings): + self._index = index + self._directory = directory + + def _write_file(self, full_path, iterator, mode, encoding=None): + full_dir, _ = os.path.split(full_path) + + for count in range(1, 11): + with contextlib.suppress(OSError): + os.makedirs(full_dir) + + try: + # Another cache may have deleted the directory before + # the file could be opened. + writer = open(full_path, mode, encoding=encoding) + except OSError: + if count == 10: + # Give up after 10 tries to open the file. + raise + continue + + with writer: + size = 0 + for chunk in iterator: + size += len(chunk) + writer.write(chunk) + return size + + def _get_keyfile(self, key): + return os.path.join(self._directory, f'{key}.key') + + def store(self, key, value_reader, metadata): + filename, full_path = self.random_filename() + key_file = self._get_keyfile(key) + + # STORE METADATA + _metadata = { + "version": "v1", + "timestamp": time.time(), + "filename": filename, + "full_path": full_path, + "key_file": key_file, + } + if metadata: + _metadata.update(metadata) + + reader = functools.partial(value_reader.read, 2**22) + + iterator = iter(reader, b'') + size = self._write_file(full_path, iterator, 'xb') + + # after archive is finished, we create a key to save the presence of the binary file + with open(key_file, 'wb') as f: + f.write(json.dumps(_metadata)) + + return key, size, MODE_BINARY, filename, _metadata + + def fetch(self, key) -> tuple[typing.BinaryIO, dict]: + if key not in self: + raise KeyError(key) + + key_file = self._get_keyfile(key) + with open(key_file, 'rb') as f: + metadata = json.loads(f.read()) + + filename = metadata['filename'] + + return open(os.path.join(self._directory, filename), 'rb'), metadata + + def random_filename(self): + """Return filename and full-path tuple for file storage. + + Filename will be a randomly generated 28 character hexadecimal string + with ".archive_cache" suffixed. Two levels of sub-directories will be used to + reduce the size of directories. On older filesystems, lookups in + directories with many files may be slow. + """ - log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key) - #self.acquire() - log.debug('Lock for key=%s acquired', reentrant_lock_key) + hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') + sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) + name = hex_name[4:] + '.archive_cache' + filename = os.path.join(sub_dir, name) + full_path = os.path.join(self._directory, filename) + return filename, full_path + + def hash(self, key): + """Compute portable hash for `key`. + + :param key: key to hash + :return: hash value + + """ + mask = 0xFFFFFFFF + return zlib.adler32(key.encode('utf-8')) & mask # noqa + + def __contains__(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key matching item + :return: True if key matching item + + """ + key_file = self._get_keyfile(key) + return os.path.exists(key_file) + + +class FanoutCache: + """Cache that shards keys and values.""" + + def __init__( + self, directory=None, **settings + ): + """Initialize cache instance. + + :param str directory: cache directory + :param settings: settings dict + + """ + if directory is None: + raise ValueError('directory cannot be None') + + directory = str(directory) + directory = os.path.expanduser(directory) + directory = os.path.expandvars(directory) + self._directory = directory - def __exit__(self, *exc_info): - #self.release() - pass + self._count = settings.pop('cache_shards') + self._locking_url = settings.pop('locking_url') + + self._shards = tuple( + FileSystemCache( + index=num, + directory=os.path.join(directory, 'shard_%03d' % num), + **settings, + ) + for num in range(self._count) + ) + self._hash = self._shards[0].hash + + def get_lock(self, lock_key): + return GenerationLock(lock_key, self._locking_url) + + def _get_shard(self, key) -> FileSystemCache: + index = self._hash(key) % self._count + shard = self._shards[index] + return shard + + def store(self, key, value_reader, metadata=None): + shard = self._get_shard(key) + return shard.store(key, value_reader, metadata) + + def fetch(self, key): + """Return file handle corresponding to `key` from cache. + """ + shard = self._get_shard(key) + return shard.fetch(key) + + def has_key(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key for item + :return: True if key is found + + """ + shard = self._get_shard(key) + return key in shard + + def __contains__(self, item): + return self.has_key(item) + + def evict(self): + """Remove old items based on the conditions""" + # TODO: Implement this... + return def get_archival_config(config): final_config = { - 'archive_cache.eviction_policy': 'least-frequently-used' + } for k, v in config.items(): @@ -59,11 +236,15 @@ def get_archival_cache_store(config): return cache_meta config = get_archival_config(config) + backend = config['archive_cache.backend.type'] + if backend != 'filesystem': + raise ValueError('archive_cache.backend.type only supports "filesystem"') - archive_cache_dir = config['archive_cache.store_dir'] - archive_cache_size_gb = config['archive_cache.cache_size_gb'] - archive_cache_shards = config['archive_cache.cache_shards'] - archive_cache_eviction_policy = config['archive_cache.eviction_policy'] + archive_cache_locking_url = config['archive_cache.locking.url'] + archive_cache_dir = config['archive_cache.filesystem.store_dir'] + archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] + archive_cache_shards = config['archive_cache.filesystem.cache_shards'] + archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] log.debug('Initializing archival cache instance under %s', archive_cache_dir) @@ -71,18 +252,13 @@ def get_archival_cache_store(config): if not os.path.isdir(archive_cache_dir): os.makedirs(archive_cache_dir, exist_ok=True) - d_cache = diskcache.FanoutCache( - archive_cache_dir, shards=archive_cache_shards, - cull_limit=0, # manual eviction required - size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, - eviction_policy=archive_cache_eviction_policy, - timeout=30 + d_cache = FanoutCache( + archive_cache_dir, + locking_url=archive_cache_locking_url, + cache_shards=archive_cache_shards, + cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, + cache_eviction_policy=archive_cache_eviction_policy ) cache_meta = d_cache return cache_meta - -def includeme(config): - # init our cache at start - settings = config.get_settings() - get_archival_cache_store(settings) diff --git a/rhodecode/lib/rc_cache/archive_cache/lock.py b/rhodecode/lib/rc_cache/archive_cache/lock.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/rc_cache/archive_cache/lock.py @@ -0,0 +1,60 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import redis +from rhodecode.lib._vendor import redis_lock + +from .utils import ArchiveCacheLock + + +class GenerationLock: + """ + Locking mechanism that detects if a lock is acquired + + with GenerationLock(lock_key): + compute_archive() + """ + lock_timeout = 7200 + + def __init__(self, lock_key, url): + self.lock_key = lock_key + self._create_client(url) + self.lock = self.get_lock() + + def _create_client(self, url): + connection_pool = redis.ConnectionPool.from_url(url) + self.writer_client = redis.StrictRedis( + connection_pool=connection_pool + ) + self.reader_client = self.writer_client + + def get_lock(self): + return redis_lock.Lock( + redis_client=self.writer_client, + name=self.lock_key, + expire=self.lock_timeout, + strict=True + ) + + def __enter__(self): + acquired = self.lock.acquire(blocking=False) + if not acquired: + raise ArchiveCacheLock('Failed to create a lock') + + def __exit__(self, exc_type, exc_val, exc_tb): + self.lock.release() diff --git a/rhodecode/lib/rc_cache/archive_cache/utils.py b/rhodecode/lib/rc_cache/archive_cache/utils.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/rc_cache/archive_cache/utils.py @@ -0,0 +1,30 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + + +class ArchiveCacheLock(Exception): + pass + + +def archive_iterator(_reader, block_size: int = 4096 * 512): + # 4096 * 64 = 64KB + while 1: + data = _reader.read(block_size) + if not data: + break + yield data diff --git a/rhodecode/lib/system_info.py b/rhodecode/lib/system_info.py --- a/rhodecode/lib/system_info.py +++ b/rhodecode/lib/system_info.py @@ -401,13 +401,20 @@ def storage_archives(): from rhodecode.lib.utils import safe_str from rhodecode.lib.helpers import format_byte_size_binary - msg = 'Archive cache storage is controlled by ' \ - 'archive_cache.store_dir=/path/to/cache option in the .ini file' - path = safe_str(rhodecode.CONFIG.get('archive_cache.store_dir', msg)) + storage_type = rhodecode.ConfigGet().get_str('archive_cache.backend.type') + storage_key = 'archive_cache.filesystem.store_dir' + + default_msg = 'Archive cache storage is controlled by '\ + f'{storage_key}=/path/to/cache option in the .ini file' + path = rhodecode.ConfigGet().get_str(storage_key, missing=default_msg) value = dict(percent=0, used=0, total=0, items=0, path=path, text='') state = STATE_OK_DEFAULT try: + if storage_type != 'filesystem': + # raise Exc to stop reporting on different type + raise ValueError('Storage type must be "filesystem"') + items_count = 0 used = 0 for root, dirs, files in os.walk(path):