diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,7 @@ dogpile.cache==1.3.3 pbr==5.11.1 dulwich==0.21.6 urllib3==1.26.14 +fsspec==2024.6.0 gunicorn==21.2.0 packaging==24.0 hg-evolve==11.1.3 @@ -59,6 +60,34 @@ pyramid==2.0.2 redis==5.0.4 async-timeout==4.0.3 repoze.lru==0.7 +s3fs==2024.6.0 + aiobotocore==2.13.0 + aiohttp==3.9.5 + aiosignal==1.3.1 + frozenlist==1.4.1 + attrs==22.2.0 + frozenlist==1.4.1 + multidict==6.0.5 + yarl==1.9.4 + idna==3.4 + multidict==6.0.5 + aioitertools==0.11.0 + botocore==1.34.106 + jmespath==1.0.1 + python-dateutil==2.8.2 + six==1.16.0 + urllib3==1.26.14 + wrapt==1.16.0 + aiohttp==3.9.5 + aiosignal==1.3.1 + frozenlist==1.4.1 + attrs==22.2.0 + frozenlist==1.4.1 + multidict==6.0.5 + yarl==1.9.4 + idna==3.4 + multidict==6.0.5 + fsspec==2024.6.0 scandir==1.10.0 setproctitle==1.3.3 subvertpy==0.11.0 diff --git a/vcsserver/base.py b/vcsserver/base.py --- a/vcsserver/base.py +++ b/vcsserver/base.py @@ -20,7 +20,7 @@ import tempfile import logging import urllib.parse -from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store +from vcsserver.lib.archive_cache import get_archival_cache_store from vcsserver import exceptions from vcsserver.exceptions import NoContentException diff --git a/vcsserver/http_main.py b/vcsserver/http_main.py --- a/vcsserver/http_main.py +++ b/vcsserver/http_main.py @@ -233,7 +233,7 @@ class HTTPApplication: self.global_config = global_config self.config.include('vcsserver.lib.rc_cache') - self.config.include('vcsserver.lib.rc_cache.archive_cache') + self.config.include('vcsserver.lib.archive_cache') settings_locale = settings.get('locale', '') or 'en_US.UTF-8' vcs = VCS(locale_conf=settings_locale, cache_config=settings) diff --git a/vcsserver/lib/archive_cache/__init__.py b/vcsserver/lib/archive_cache/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/archive_cache/__init__.py @@ -0,0 +1,79 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import logging + +from .backends.fanout_cache import FileSystemFanoutCache +from .backends.objectstore_cache import ObjectStoreCache + +from .utils import archive_iterator # noqa +from .lock import ArchiveCacheGenerationLock # noqa + +log = logging.getLogger(__name__) + + +cache_meta = None + + +def includeme(config): + return # vcsserver gets its config from rhodecode on a remote call + # init our cache at start + settings = config.get_settings() + get_archival_cache_store(settings) + + +def get_archival_config(config): + + final_config = { + + } + + for k, v in config.items(): + if k.startswith('archive_cache'): + final_config[k] = v + + return final_config + + +def get_archival_cache_store(config, always_init=False): + + global cache_meta + if cache_meta is not None and not always_init: + return cache_meta + + config = get_archival_config(config) + backend = config['archive_cache.backend.type'] + + archive_cache_locking_url = config['archive_cache.locking.url'] + + match backend: + case 'filesystem': + d_cache = FileSystemFanoutCache( + locking_url=archive_cache_locking_url, + **config + ) + case 'objectstore': + d_cache = ObjectStoreCache( + locking_url=archive_cache_locking_url, + **config + ) + case _: + raise ValueError(f'archive_cache.backend.type only supports "filesystem" or "objectstore" got {backend} ') + + cache_meta = d_cache + return cache_meta diff --git a/vcsserver/lib/archive_cache/backends/__init__.py b/vcsserver/lib/archive_cache/backends/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/archive_cache/backends/__init__.py @@ -0,0 +1,17 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ diff --git a/vcsserver/lib/archive_cache/backends/base.py b/vcsserver/lib/archive_cache/backends/base.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/archive_cache/backends/base.py @@ -0,0 +1,348 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import os +import functools +import logging +import typing +import time +import zlib + +from ...ext_json import json +from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size +from ..lock import GenerationLock + +log = logging.getLogger(__name__) + + +class BaseShard: + storage_type: str = '' + fs = None + + @classmethod + def hash(cls, key): + """Compute portable hash for `key`. + + :param key: key to hash + :return: hash value + + """ + mask = 0xFFFFFFFF + return zlib.adler32(key.encode('utf-8')) & mask # noqa + + def _write_file(self, full_path, read_iterator, mode): + raise NotImplementedError + + def _get_keyfile(self, key): + raise NotImplementedError + + def random_filename(self): + raise NotImplementedError + + def _store(self, key, value_reader, metadata, mode): + (filename, # hash-name + full_path # full-path/hash-name + ) = self.random_filename() + + key_file, key_file_path = self._get_keyfile(key) + + # STORE METADATA + _metadata = { + "version": "v1", + + "key_file": key_file, # this is the .key.json file storing meta + "key_file_path": key_file_path, # full path to key_file + "archive_key": key, # original name we stored archive under, e.g my-archive.zip + "archive_filename": filename, # the actual filename we stored that file under + "archive_full_path": full_path, + + "store_time": time.time(), + "access_count": 0, + "access_time": 0, + + "size": 0 + } + if metadata: + _metadata.update(metadata) + + read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') + size, sha256 = self._write_file(full_path, read_iterator, mode) + _metadata['size'] = size + _metadata['sha256'] = sha256 + + # after archive is finished, we create a key to save the presence of the binary file + with self.fs.open(key_file_path, 'wb') as f: + f.write(json.dumps(_metadata)) + + return key, filename, size, _metadata + + def _fetch(self, key, retry, retry_attempts, retry_backoff): + if retry is NOT_GIVEN: + retry = False + if retry_attempts is NOT_GIVEN: + retry_attempts = 0 + + if retry and retry_attempts > 0: + for attempt in range(1, retry_attempts + 1): + if key in self: + break + # we didn't find the key, wait retry_backoff N seconds, and re-check + time.sleep(retry_backoff) + + if key not in self: + log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}') + raise KeyError(key) + + key_file, key_file_path = self._get_keyfile(key) + with self.fs.open(key_file_path, 'rb') as f: + metadata = json.loads(f.read()) + + archive_path = metadata['archive_full_path'] + + try: + return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata + finally: + # update usage stats, count and accessed + metadata["access_count"] = metadata.get("access_count", 0) + 1 + metadata["access_time"] = time.time() + log.debug('Updated %s with access snapshot, access_count=%s access_time=%s', + key_file, metadata['access_count'], metadata['access_time']) + with self.fs.open(key_file_path, 'wb') as f: + f.write(json.dumps(metadata)) + + def _remove(self, key): + if key not in self: + log.exception(f'requested key={key} not found in {self}') + raise KeyError(key) + + key_file, key_file_path = self._get_keyfile(key) + with self.fs.open(key_file_path, 'rb') as f: + metadata = json.loads(f.read()) + + archive_path = metadata['archive_full_path'] + self.fs.rm(archive_path) + self.fs.rm(key_file_path) + return 1 + + @property + def storage_medium(self): + return getattr(self, self.storage_type) + + @property + def key_suffix(self): + return 'key.json' + + def __contains__(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key matching item + :return: True if key matching item + + """ + key_file, key_file_path = self._get_keyfile(key) + return self.fs.exists(key_file_path) + + +class BaseCache: + _locking_url: str = '' + _storage_path: str = '' + _config = {} + retry = False + retry_attempts = 0 + retry_backoff = 1 + _shards = tuple() + + def __contains__(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key matching item + :return: True if key matching item + + """ + return self.has_key(key) + + def __repr__(self): + return f'<{self.__class__.__name__}(storage={self._storage_path})>' + + @classmethod + def gb_to_bytes(cls, gb): + return gb * (1024 ** 3) + + @property + def storage_path(self): + return self._storage_path + + @classmethod + def get_stats_db(cls): + return StatsDB() + + def get_conf(self, key, pop=False): + if key not in self._config: + raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config") + val = self._config[key] + if pop: + del self._config[key] + return val + + def _get_shard(self, key): + raise NotImplementedError + + def _get_size(self, shard, archive_path): + raise NotImplementedError + + def store(self, key, value_reader, metadata=None): + shard = self._get_shard(key) + return shard.store(key, value_reader, metadata) + + def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]: + """ + Return file handle corresponding to `key` from specific shard cache. + """ + if retry is NOT_GIVEN: + retry = self.retry + if retry_attempts is NOT_GIVEN: + retry_attempts = self.retry_attempts + retry_backoff = self.retry_backoff + + shard = self._get_shard(key) + return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff) + + def remove(self, key): + shard = self._get_shard(key) + return shard.remove(key) + + def has_key(self, archive_key): + """Return `True` if `key` matching item is found in cache. + + :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip + :return: True if key is found + + """ + shard = self._get_shard(archive_key) + return archive_key in shard + + def iter_keys(self): + for shard in self._shards: + if shard.fs.exists(shard.storage_medium): + for path, _dirs, _files in shard.fs.walk(shard.storage_medium): + for key_file_path in _files: + if key_file_path.endswith(shard.key_suffix): + yield shard, key_file_path + + def get_lock(self, lock_key): + return GenerationLock(lock_key, self._locking_url) + + def evict(self, policy=None, size_limit=None) -> int: + """ + Remove old items based on the conditions + + + explanation of this algo: + iterate over each shard, then for each shard iterate over the .key files + read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and + access data, time creation, and access counts. + + Store that into a memory DB so we can run different sorting strategies easily. + Summing the size is a sum sql query. + + Then we run a sorting strategy based on eviction policy. + We iterate over sorted keys, and remove each checking if we hit the overall limit. + """ + + policy = policy or self._eviction_policy + size_limit = size_limit or self._cache_size_limit + + select_policy = EVICTION_POLICY[policy]['evict'] + + log.debug('Running eviction policy \'%s\', and checking for size limit: %s', + policy, format_size(size_limit)) + + if select_policy is None: + return 0 + + db = self.get_stats_db() + + data = [] + cnt = 1 + + for shard, key_file in self.iter_keys(): + with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f: + metadata = json.loads(f.read()) + + key_file_path = os.path.join(shard.storage_medium, key_file) + + archive_key = metadata['archive_key'] + archive_path = metadata['archive_full_path'] + + size = metadata.get('size') + if not size: + # in case we don't have size re-calc it... + size = self._get_size(shard, archive_path) + + data.append([ + cnt, + key_file, + key_file_path, + archive_key, + archive_path, + metadata.get('store_time', 0), + metadata.get('access_time', 0), + metadata.get('access_count', 0), + size, + ]) + cnt += 1 + + # Insert bulk data using executemany + db.bulk_insert(data) + + total_size = db.get_total_size() + log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s', + len(data), format_size(total_size), format_size(size_limit)) + + removed_items = 0 + removed_size = 0 + for key_file, archive_key, size in db.get_sorted_keys(select_policy): + # simulate removal impact BEFORE removal + total_size -= size + + if total_size <= size_limit: + # we obtained what we wanted... + break + + self.remove(archive_key) + removed_items += 1 + removed_size += size + + log.debug('Removed %s cache archives, and reduced size by: %s', + removed_items, format_size(removed_size)) + return removed_items + + def get_statistics(self): + total_files = 0 + total_size = 0 + meta = {} + + for shard, key_file in self.iter_keys(): + json_key = f"{shard.storage_medium}/{key_file}" + with shard.fs.open(json_key, 'rb') as f: + total_files += 1 + metadata = json.loads(f.read()) + total_size += metadata['size'] + + return total_files, total_size, meta + diff --git a/vcsserver/lib/archive_cache/backends/fanout_cache.py b/vcsserver/lib/archive_cache/backends/fanout_cache.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/archive_cache/backends/fanout_cache.py @@ -0,0 +1,166 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import codecs +import hashlib +import logging +import os + +import fsspec + +from .base import BaseCache, BaseShard +from ..utils import ShardFileReader, NOT_GIVEN +from ...type_utils import str2bool + +log = logging.getLogger(__name__) + + +class FileSystemShard(BaseShard): + + def __init__(self, index, directory, **settings): + self._index = index + self._directory = directory + self.storage_type = 'directory' + self.fs = fsspec.filesystem('file') + + @property + def directory(self): + """Cache directory.""" + return self._directory + + def _get_keyfile(self, archive_key) -> tuple[str, str]: + key_file = f'{archive_key}.{self.key_suffix}' + return key_file, os.path.join(self.directory, key_file) + + def _get_writer(self, path, mode): + for count in range(1, 11): + try: + # Another cache may have deleted the directory before + # the file could be opened. + return self.fs.open(path, mode) + except OSError: + if count == 10: + # Give up after 10 tries to open the file. + raise + continue + + def _write_file(self, full_path, iterator, mode): + # ensure dir exists + destination, _ = os.path.split(full_path) + if not self.fs.exists(destination): + self.fs.makedirs(destination) + + writer = self._get_writer(full_path, mode) + + digest = hashlib.sha256() + with writer: + size = 0 + for chunk in iterator: + size += len(chunk) + digest.update(chunk) + writer.write(chunk) + writer.flush() + # Get the file descriptor + fd = writer.fileno() + + # Sync the file descriptor to disk, helps with NFS cases... + os.fsync(fd) + sha256 = digest.hexdigest() + log.debug('written new archive cache under %s, sha256: %s', full_path, sha256) + return size, sha256 + + def store(self, key, value_reader, metadata: dict | None = None): + return self._store(key, value_reader, metadata, mode='xb') + + def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: + return self._fetch(key, retry, retry_attempts, retry_backoff) + + def remove(self, key): + return self._remove(key) + + def random_filename(self): + """Return filename and full-path tuple for file storage. + + Filename will be a randomly generated 28 character hexadecimal string + with ".archive_cache" suffixed. Two levels of sub-directories will be used to + reduce the size of directories. On older filesystems, lookups in + directories with many files may be slow. + """ + + hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') + + archive_name = hex_name[4:] + '.archive_cache' + filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}" + + full_path = os.path.join(self.directory, filename) + return archive_name, full_path + + def __repr__(self): + return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})' + + +class FileSystemFanoutCache(BaseCache): + + def __init__(self, locking_url, **settings): + """ + Initialize file system cache instance. + + :param str locking_url: redis url for a lock + :param settings: settings dict + + """ + self._locking_url = locking_url + self._config = settings + cache_dir = self.get_conf('archive_cache.filesystem.store_dir') + directory = str(cache_dir) + directory = os.path.expanduser(directory) + directory = os.path.expandvars(directory) + self._directory = directory + self._storage_path = directory + + # check if it's ok to write, and re-create the archive cache + if not os.path.isdir(self._directory): + os.makedirs(self._directory, exist_ok=True) + + self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True)) + + self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True) + self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb'))) + + self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True)) + self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True)) + self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True)) + + log.debug('Initializing archival cache instance under %s', self._directory) + self._shards = tuple( + FileSystemShard( + index=num, + directory=os.path.join(directory, 'shard_%03d' % num), + **settings, + ) + for num in range(self._count) + ) + self._hash = self._shards[0].hash + + def _get_shard(self, key) -> FileSystemShard: + index = self._hash(key) % self._count + shard = self._shards[index] + return shard + + def _get_size(self, shard, archive_path): + return os.stat(archive_path).st_size diff --git a/vcsserver/lib/archive_cache/backends/objectstore_cache.py b/vcsserver/lib/archive_cache/backends/objectstore_cache.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/archive_cache/backends/objectstore_cache.py @@ -0,0 +1,150 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import codecs +import hashlib +import logging +import os + +import fsspec + +from .base import BaseCache, BaseShard +from ..utils import ShardFileReader, NOT_GIVEN +from ...type_utils import str2bool + +log = logging.getLogger(__name__) + + +class S3Shard(BaseShard): + + def __init__(self, index, bucket, **settings): + self._index = index + self._bucket = bucket + self.storage_type = 'bucket' + + endpoint_url = settings.pop('archive_cache.objectstore.url') + key = settings.pop('archive_cache.objectstore.key') + secret = settings.pop('archive_cache.objectstore.secret') + + self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) + + @property + def bucket(self): + """Cache bucket.""" + return self._bucket + + def _get_keyfile(self, archive_key) -> tuple[str, str]: + key_file = f'{archive_key}-{self.key_suffix}' + return key_file, os.path.join(self.bucket, key_file) + + def _get_writer(self, path, mode): + return self.fs.open(path, 'wb') + + def _write_file(self, full_path, iterator, mode): + # ensure bucket exists + destination = self.bucket + if not self.fs.exists(destination): + self.fs.mkdir(destination, s3_additional_kwargs={}) + + writer = self._get_writer(full_path, mode) + + digest = hashlib.sha256() + with writer: + size = 0 + for chunk in iterator: + size += len(chunk) + digest.update(chunk) + writer.write(chunk) + + sha256 = digest.hexdigest() + log.debug('written new archive cache under %s, sha256: %s', full_path, sha256) + return size, sha256 + + def store(self, key, value_reader, metadata: dict | None = None): + return self._store(key, value_reader, metadata, mode='wb') + + def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: + return self._fetch(key, retry, retry_attempts, retry_backoff) + + def remove(self, key): + return self._remove(key) + + def random_filename(self): + """Return filename and full-path tuple for file storage. + + Filename will be a randomly generated 28 character hexadecimal string + with ".archive_cache" suffixed. Two levels of sub-directories will be used to + reduce the size of directories. On older filesystems, lookups in + directories with many files may be slow. + """ + + hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') + + archive_name = hex_name[4:] + '.archive_cache' + filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}" + + full_path = os.path.join(self.bucket, filename) + return archive_name, full_path + + def __repr__(self): + return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})' + + +class ObjectStoreCache(BaseCache): + + def __init__(self, locking_url, **settings): + """ + Initialize objectstore cache instance. + + :param str locking_url: redis url for a lock + :param settings: settings dict + + """ + self._locking_url = locking_url + self._config = settings + + objectstore_url = self.get_conf('archive_cache.objectstore.url') + self._storage_path = objectstore_url + + self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) + + self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True) + self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb'))) + + self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True)) + self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True)) + self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True)) + + log.debug('Initializing archival cache instance under %s', objectstore_url) + self._shards = tuple( + S3Shard( + index=num, + bucket='rhodecode-archivecache-%03d' % num, + **settings, + ) + for num in range(self._count) + ) + self._hash = self._shards[0].hash + + def _get_shard(self, key) -> S3Shard: + index = self._hash(key) % self._count + shard = self._shards[index] + return shard + + def _get_size(self, shard, archive_path): + return shard.fs.info(archive_path)['size'] diff --git a/vcsserver/lib/archive_cache/lock.py b/vcsserver/lib/archive_cache/lock.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/archive_cache/lock.py @@ -0,0 +1,62 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import redis +from .._vendor import redis_lock + + +class ArchiveCacheGenerationLock(Exception): + pass + + +class GenerationLock: + """ + Locking mechanism that detects if a lock is acquired + + with GenerationLock(lock_key): + compute_archive() + """ + lock_timeout = 7200 + + def __init__(self, lock_key, url): + self.lock_key = lock_key + self._create_client(url) + self.lock = self.get_lock() + + def _create_client(self, url): + connection_pool = redis.ConnectionPool.from_url(url) + self.writer_client = redis.StrictRedis( + connection_pool=connection_pool + ) + self.reader_client = self.writer_client + + def get_lock(self): + return redis_lock.Lock( + redis_client=self.writer_client, + name=self.lock_key, + expire=self.lock_timeout, + strict=True + ) + + def __enter__(self): + acquired = self.lock.acquire(blocking=False) + if not acquired: + raise ArchiveCacheGenerationLock('Failed to create a lock') + + def __exit__(self, exc_type, exc_val, exc_tb): + self.lock.release() diff --git a/vcsserver/lib/archive_cache/utils.py b/vcsserver/lib/archive_cache/utils.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/archive_cache/utils.py @@ -0,0 +1,134 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import sqlite3 +import s3fs.core + +NOT_GIVEN = -917 + + +EVICTION_POLICY = { + 'none': { + 'evict': None, + }, + 'least-recently-stored': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', + }, + 'least-recently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', + }, + 'least-frequently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', + }, +} + + +def archive_iterator(_reader, block_size: int = 4096 * 512): + # 4096 * 64 = 64KB + while 1: + data = _reader.read(block_size) + if not data: + break + yield data + + +def format_size(size): + # Convert size in bytes to a human-readable format (e.g., KB, MB, GB) + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size < 1024: + return f"{size:.2f} {unit}" + size /= 1024 + + +class StatsDB: + + def __init__(self): + self.connection = sqlite3.connect(':memory:') + self._init_db() + + def _init_db(self): + qry = ''' + CREATE TABLE IF NOT EXISTS archive_cache ( + rowid INTEGER PRIMARY KEY, + key_file TEXT, + key_file_path TEXT, + archive_key TEXT, + archive_path TEXT, + store_time REAL, + access_time REAL, + access_count INTEGER DEFAULT 0, + size INTEGER DEFAULT 0 + ) + ''' + + self.sql(qry) + self.connection.commit() + + @property + def sql(self): + return self.connection.execute + + def bulk_insert(self, rows): + qry = ''' + INSERT INTO archive_cache ( + rowid, + key_file, + key_file_path, + archive_key, + archive_path, + store_time, + access_time, + access_count, + size + ) + VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ''' + cursor = self.connection.cursor() + cursor.executemany(qry, rows) + self.connection.commit() + + def get_total_size(self): + qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache' + ((total_size,),) = self.sql(qry).fetchall() + return total_size + + def get_sorted_keys(self, select_policy): + select_policy_qry = select_policy.format(fields='key_file, archive_key, size') + return self.sql(select_policy_qry).fetchall() + + +class ShardFileReader: + + def __init__(self, file_like_reader): + self._file_like_reader = file_like_reader + + def __getattr__(self, item): + if isinstance(self._file_like_reader, s3fs.core.S3File): + match item: + case 'name': + # S3 FileWrapper doesn't support name attribute, and we use it + return self._file_like_reader.full_name + case _: + return getattr(self._file_like_reader, item) + else: + return getattr(self._file_like_reader, item) + + def __repr__(self): + return f'<{self.__class__.__name__}={self._file_like_reader}>' diff --git a/vcsserver/lib/rc_cache/archive_cache/__init__.py b/vcsserver/lib/rc_cache/archive_cache/__init__.py deleted file mode 100644 --- a/vcsserver/lib/rc_cache/archive_cache/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# RhodeCode VCSServer provides access to different vcs backends via network. -# Copyright (C) 2014-2024 RhodeCode GmbH -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -from .fanout_cache import get_archival_cache_store -from .fanout_cache import get_archival_config - -from .utils import archive_iterator -from .utils import ArchiveCacheGenerationLock - - -def includeme(config): - # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode - return - - # init our cache at start - settings = config.get_settings() - get_archival_cache_store(settings) diff --git a/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py b/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py deleted file mode 100644 --- a/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py +++ /dev/null @@ -1,455 +0,0 @@ -# RhodeCode VCSServer provides access to different vcs backends via network. -# Copyright (C) 2014-2024 RhodeCode GmbH -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -import codecs -import contextlib -import functools -import os -import logging -import time -import typing -import zlib -import sqlite3 - -from ...ext_json import json -from .lock import GenerationLock -from .utils import format_size - -log = logging.getLogger(__name__) - -cache_meta = None - -UNKNOWN = -241 -NO_VAL = -917 - -MODE_BINARY = 'BINARY' - - -EVICTION_POLICY = { - 'none': { - 'evict': None, - }, - 'least-recently-stored': { - 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', - }, - 'least-recently-used': { - 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', - }, - 'least-frequently-used': { - 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', - }, -} - - -class DB: - - def __init__(self): - self.connection = sqlite3.connect(':memory:') - self._init_db() - - def _init_db(self): - qry = ''' - CREATE TABLE IF NOT EXISTS archive_cache ( - rowid INTEGER PRIMARY KEY, - key_file TEXT, - key_file_path TEXT, - filename TEXT, - full_path TEXT, - store_time REAL, - access_time REAL, - access_count INTEGER DEFAULT 0, - size INTEGER DEFAULT 0 - ) - ''' - - self.sql(qry) - self.connection.commit() - - @property - def sql(self): - return self.connection.execute - - def bulk_insert(self, rows): - qry = ''' - INSERT INTO archive_cache ( - rowid, - key_file, - key_file_path, - filename, - full_path, - store_time, - access_time, - access_count, - size - ) - VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - ''' - cursor = self.connection.cursor() - cursor.executemany(qry, rows) - self.connection.commit() - - -class FileSystemCache: - - def __init__(self, index, directory, **settings): - self._index = index - self._directory = directory - - @property - def directory(self): - """Cache directory.""" - return self._directory - - def _write_file(self, full_path, iterator, mode, encoding=None): - full_dir, _ = os.path.split(full_path) - - for count in range(1, 11): - with contextlib.suppress(OSError): - os.makedirs(full_dir) - - try: - # Another cache may have deleted the directory before - # the file could be opened. - writer = open(full_path, mode, encoding=encoding) - except OSError: - if count == 10: - # Give up after 10 tries to open the file. - raise - continue - - with writer: - size = 0 - for chunk in iterator: - size += len(chunk) - writer.write(chunk) - writer.flush() - # Get the file descriptor - fd = writer.fileno() - - # Sync the file descriptor to disk, helps with NFS cases... - os.fsync(fd) - log.debug('written new archive cache under %s', full_path) - return size - - def _get_keyfile(self, key): - return os.path.join(self._directory, f'{key}.key') - - def store(self, key, value_reader, metadata): - filename, full_path = self.random_filename() - key_file = self._get_keyfile(key) - - # STORE METADATA - _metadata = { - "version": "v1", - "filename": filename, - "full_path": full_path, - "key_file": key_file, - "store_time": time.time(), - "access_count": 1, - "access_time": 0, - "size": 0 - } - if metadata: - _metadata.update(metadata) - - reader = functools.partial(value_reader.read, 2**22) - - iterator = iter(reader, b'') - size = self._write_file(full_path, iterator, 'xb') - metadata['size'] = size - - # after archive is finished, we create a key to save the presence of the binary file - with open(key_file, 'wb') as f: - f.write(json.dumps(_metadata)) - - return key, size, MODE_BINARY, filename, _metadata - - def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]: - - if retry: - for attempt in range(retry_attempts): - if key in self: - break - # we dind't find the key, wait 1s, and re-check - time.sleep(1) - - if key not in self: - log.exception('requested {key} not found in {self}', key, self) - raise KeyError(key) - - key_file = self._get_keyfile(key) - with open(key_file, 'rb') as f: - metadata = json.loads(f.read()) - - filename = metadata['filename'] - - try: - return open(os.path.join(self.directory, filename), 'rb'), metadata - finally: - # update usage stats, count and accessed - metadata["access_count"] = metadata.get("access_count", 0) + 1 - metadata["access_time"] = time.time() - - with open(key_file, 'wb') as f: - f.write(json.dumps(metadata)) - - def random_filename(self): - """Return filename and full-path tuple for file storage. - - Filename will be a randomly generated 28 character hexadecimal string - with ".archive_cache" suffixed. Two levels of sub-directories will be used to - reduce the size of directories. On older filesystems, lookups in - directories with many files may be slow. - """ - - hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') - sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) - name = hex_name[4:] + '.archive_cache' - filename = os.path.join(sub_dir, name) - full_path = os.path.join(self.directory, filename) - return filename, full_path - - def hash(self, key): - """Compute portable hash for `key`. - - :param key: key to hash - :return: hash value - - """ - mask = 0xFFFFFFFF - return zlib.adler32(key.encode('utf-8')) & mask # noqa - - def __contains__(self, key): - """Return `True` if `key` matching item is found in cache. - - :param key: key matching item - :return: True if key matching item - - """ - key_file = self._get_keyfile(key) - return os.path.exists(key_file) - - def __repr__(self): - return f'FileSystemCache(index={self._index}, dir={self.directory})' - - -class FanoutCache: - """Cache that shards keys and values.""" - - def __init__( - self, directory=None, **settings - ): - """Initialize cache instance. - - :param str directory: cache directory - :param settings: settings dict - - """ - if directory is None: - raise ValueError('directory cannot be None') - - directory = str(directory) - directory = os.path.expanduser(directory) - directory = os.path.expandvars(directory) - self._directory = directory - - self._count = settings.pop('cache_shards') - self._locking_url = settings.pop('locking_url') - - self._eviction_policy = settings['cache_eviction_policy'] - self._cache_size_limit = settings['cache_size_limit'] - - self._shards = tuple( - FileSystemCache( - index=num, - directory=os.path.join(directory, 'shard_%03d' % num), - **settings, - ) - for num in range(self._count) - ) - self._hash = self._shards[0].hash - - @property - def directory(self): - """Cache directory.""" - return self._directory - - def get_lock(self, lock_key): - return GenerationLock(lock_key, self._locking_url) - - def _get_shard(self, key) -> FileSystemCache: - index = self._hash(key) % self._count - shard = self._shards[index] - return shard - - def store(self, key, value_reader, metadata=None): - shard = self._get_shard(key) - return shard.store(key, value_reader, metadata) - - def fetch(self, key, retry=False, retry_attempts=10): - """Return file handle corresponding to `key` from cache. - """ - shard = self._get_shard(key) - return shard.fetch(key, retry=retry, retry_attempts=retry_attempts) - - def has_key(self, key): - """Return `True` if `key` matching item is found in cache. - - :param key: key for item - :return: True if key is found - - """ - shard = self._get_shard(key) - return key in shard - - def __contains__(self, item): - return self.has_key(item) - - def evict(self, policy=None, size_limit=None): - """ - Remove old items based on the conditions - - - explanation of this algo: - iterate over each shard, then for each shard iterate over the .key files - read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and - access data, time creation, and access counts. - - Store that into a memory DB so we can run different sorting strategies easily. - Summing the size is a sum sql query. - - Then we run a sorting strategy based on eviction policy. - We iterate over sorted keys, and remove each checking if we hit the overall limit. - """ - - policy = policy or self._eviction_policy - size_limit = size_limit or self._cache_size_limit - - select_policy = EVICTION_POLICY[policy]['evict'] - - log.debug('Running eviction policy \'%s\', and checking for size limit: %s', - policy, format_size(size_limit)) - - if select_policy is None: - return 0 - - db = DB() - - data = [] - cnt = 1 - for shard in self._shards: - for key_file in os.listdir(shard.directory): - if key_file.endswith('.key'): - key_file_path = os.path.join(shard.directory, key_file) - with open(key_file_path, 'rb') as f: - metadata = json.loads(f.read()) - - size = metadata.get('size') - filename = metadata.get('filename') - full_path = metadata.get('full_path') - - if not size: - # in case we don't have size re-calc it... - size = os.stat(full_path).st_size - - data.append([ - cnt, - key_file, - key_file_path, - filename, - full_path, - metadata.get('store_time', 0), - metadata.get('access_time', 0), - metadata.get('access_count', 0), - size, - ]) - cnt += 1 - - # Insert bulk data using executemany - db.bulk_insert(data) - - ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall() - log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size)) - select_policy_qry = select_policy.format(fields='key_file_path, full_path, size') - sorted_keys = db.sql(select_policy_qry).fetchall() - - removed_items = 0 - removed_size = 0 - for key, cached_file, size in sorted_keys: - # simulate removal impact BEFORE removal - total_size -= size - - if total_size <= size_limit: - # we obtained what we wanted... - break - - os.remove(cached_file) - os.remove(key) - removed_items += 1 - removed_size += size - - log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size)) - return removed_items - - -def get_archival_config(config): - - final_config = { - - } - - for k, v in config.items(): - if k.startswith('archive_cache'): - final_config[k] = v - - return final_config - - -def get_archival_cache_store(config): - - global cache_meta - if cache_meta is not None: - return cache_meta - - config = get_archival_config(config) - backend = config['archive_cache.backend.type'] - if backend != 'filesystem': - raise ValueError('archive_cache.backend.type only supports "filesystem"') - - archive_cache_locking_url = config['archive_cache.locking.url'] - archive_cache_dir = config['archive_cache.filesystem.store_dir'] - archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] - archive_cache_shards = config['archive_cache.filesystem.cache_shards'] - archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] - - log.debug('Initializing archival cache instance under %s', archive_cache_dir) - - # check if it's ok to write, and re-create the archive cache - if not os.path.isdir(archive_cache_dir): - os.makedirs(archive_cache_dir, exist_ok=True) - - d_cache = FanoutCache( - archive_cache_dir, - locking_url=archive_cache_locking_url, - cache_shards=archive_cache_shards, - cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, - cache_eviction_policy=archive_cache_eviction_policy - ) - cache_meta = d_cache - return cache_meta diff --git a/vcsserver/lib/rc_cache/archive_cache/lock.py b/vcsserver/lib/rc_cache/archive_cache/lock.py deleted file mode 100644 --- a/vcsserver/lib/rc_cache/archive_cache/lock.py +++ /dev/null @@ -1,58 +0,0 @@ -# RhodeCode VCSServer provides access to different vcs backends via network. -# Copyright (C) 2014-2024 RhodeCode GmbH -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -import redis -from ..._vendor import redis_lock -from .utils import ArchiveCacheGenerationLock - - -class GenerationLock: - """ - Locking mechanism that detects if a lock is acquired - - with GenerationLock(lock_key): - compute_archive() - """ - lock_timeout = 7200 - - def __init__(self, lock_key, url): - self.lock_key = lock_key - self._create_client(url) - self.lock = self.get_lock() - - def _create_client(self, url): - connection_pool = redis.ConnectionPool.from_url(url) - self.writer_client = redis.StrictRedis( - connection_pool=connection_pool - ) - self.reader_client = self.writer_client - - def get_lock(self): - return redis_lock.Lock( - redis_client=self.writer_client, - name=self.lock_key, - expire=self.lock_timeout, - strict=True - ) - - def __enter__(self): - acquired = self.lock.acquire(blocking=False) - if not acquired: - raise ArchiveCacheGenerationLock('Failed to create a lock') - - def __exit__(self, exc_type, exc_val, exc_tb): - self.lock.release() diff --git a/vcsserver/lib/rc_cache/archive_cache/utils.py b/vcsserver/lib/rc_cache/archive_cache/utils.py deleted file mode 100644 --- a/vcsserver/lib/rc_cache/archive_cache/utils.py +++ /dev/null @@ -1,71 +0,0 @@ -# RhodeCode VCSServer provides access to different vcs backends via network. -# Copyright (C) 2014-2024 RhodeCode GmbH -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -import os - - -class ArchiveCacheGenerationLock(Exception): - pass - - -def archive_iterator(_reader, block_size: int = 4096 * 512): - # 4096 * 64 = 64KB - while 1: - data = _reader.read(block_size) - if not data: - break - yield data - - -def get_directory_statistics(start_path): - """ - total_files, total_size, directory_stats = get_directory_statistics(start_path) - - print(f"Directory statistics for: {start_path}\n") - print(f"Total files: {total_files}") - print(f"Total size: {format_size(total_size)}\n") - - :param start_path: - :return: - """ - - total_files = 0 - total_size = 0 - directory_stats = {} - - for dir_path, dir_names, file_names in os.walk(start_path): - dir_size = 0 - file_count = len(file_names) - - for file in file_names: - filepath = os.path.join(dir_path, file) - file_size = os.path.getsize(filepath) - dir_size += file_size - - directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size} - total_files += file_count - total_size += dir_size - - return total_files, total_size, directory_stats - - -def format_size(size): - # Convert size in bytes to a human-readable format (e.g., KB, MB, GB) - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: - if size < 1024: - return f"{size:.2f} {unit}" - size /= 1024