diff --git a/configs/development.ini b/configs/development.ini --- a/configs/development.ini +++ b/configs/development.ini @@ -290,19 +290,41 @@ file_store.backend = local ; path to store the uploaded binaries and artifacts file_store.storage_path = /var/opt/rhodecode_data/file_store -; Uncomment and set this path to control settings for archive download cache. + +; Redis url to acquire/check generation of archives locks +archive_cache.locking.url = redis://redis:6379/1 + +; Storage backend, only 'filesystem' and 'objectstore' are available now +archive_cache.backend.type = filesystem + +; url for s3 compatible storage that allows to upload artifacts +; e.g http://minio:9000 +archive_cache.objectstore.url = http://s3-minio:9000 + +; key for s3 auth +archive_cache.objectstore.key = key + +; secret for s3 auth +archive_cache.objectstore.secret = secret + +; number of sharded buckets to create to distribute archives across +; default is 8 shards +archive_cache.objectstore.bucket_shards = 8 + +; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time +archive_cache.objectstore.retry = false + +; number of seconds to wait for next try using retry +archive_cache.objectstore.retry_backoff = 1 + +; how many tries do do a retry fetch from this backend +archive_cache.objectstore.retry_attempts = 10 + +; Default is $cache_dir/archive_cache if not set ; Generated repo archives will be cached at this location ; and served from the cache during subsequent requests for the same archive of ; the repository. This path is important to be shared across filesystems and with ; RhodeCode and vcsserver - -; Redis url to acquire/check generation of archives locks -archive_cache.locking.url = redis://redis:6379/1 - -; Storage backend, only 'filesystem' is available now -archive_cache.backend.type = filesystem - -; Default is $cache_dir/archive_cache if not set archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb @@ -312,8 +334,18 @@ archive_cache.filesystem.cache_size_gb = archive_cache.filesystem.eviction_policy = least-recently-stored ; By default cache uses sharding technique, this specifies how many shards are there +; default is 8 shards archive_cache.filesystem.cache_shards = 8 +; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time +archive_cache.filesystem.retry = false + +; number of seconds to wait for next try using retry +archive_cache.filesystem.retry_backoff = 1 + +; how many tries do do a retry fetch from this backend +archive_cache.filesystem.retry_attempts = 10 + ; ############# ; CELERY CONFIG diff --git a/configs/production.ini b/configs/production.ini --- a/configs/production.ini +++ b/configs/production.ini @@ -258,19 +258,41 @@ file_store.backend = local ; path to store the uploaded binaries and artifacts file_store.storage_path = /var/opt/rhodecode_data/file_store -; Uncomment and set this path to control settings for archive download cache. + +; Redis url to acquire/check generation of archives locks +archive_cache.locking.url = redis://redis:6379/1 + +; Storage backend, only 'filesystem' and 'objectstore' are available now +archive_cache.backend.type = filesystem + +; url for s3 compatible storage that allows to upload artifacts +; e.g http://minio:9000 +archive_cache.objectstore.url = http://s3-minio:9000 + +; key for s3 auth +archive_cache.objectstore.key = key + +; secret for s3 auth +archive_cache.objectstore.secret = secret + +; number of sharded buckets to create to distribute archives across +; default is 8 shards +archive_cache.objectstore.bucket_shards = 8 + +; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time +archive_cache.objectstore.retry = false + +; number of seconds to wait for next try using retry +archive_cache.objectstore.retry_backoff = 1 + +; how many tries do do a retry fetch from this backend +archive_cache.objectstore.retry_attempts = 10 + +; Default is $cache_dir/archive_cache if not set ; Generated repo archives will be cached at this location ; and served from the cache during subsequent requests for the same archive of ; the repository. This path is important to be shared across filesystems and with ; RhodeCode and vcsserver - -; Redis url to acquire/check generation of archives locks -archive_cache.locking.url = redis://redis:6379/1 - -; Storage backend, only 'filesystem' is available now -archive_cache.backend.type = filesystem - -; Default is $cache_dir/archive_cache if not set archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb @@ -280,8 +302,18 @@ archive_cache.filesystem.cache_size_gb = archive_cache.filesystem.eviction_policy = least-recently-stored ; By default cache uses sharding technique, this specifies how many shards are there +; default is 8 shards archive_cache.filesystem.cache_shards = 8 +; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time +archive_cache.filesystem.retry = false + +; number of seconds to wait for next try using retry +archive_cache.filesystem.retry_backoff = 1 + +; how many tries do do a retry fetch from this backend +archive_cache.filesystem.retry_attempts = 10 + ; ############# ; CELERY CONFIG diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -89,6 +89,7 @@ dogpile.cache==1.3.3 pbr==5.11.1 formencode==2.1.0 six==1.16.0 +fsspec==2024.6.0 gunicorn==21.2.0 packaging==24.0 gevent==24.2.1 @@ -257,6 +258,34 @@ regex==2022.10.31 routes==2.5.1 repoze.lru==0.7 six==1.16.0 +s3fs==2024.6.0 + aiobotocore==2.13.0 + aiohttp==3.9.5 + aiosignal==1.3.1 + frozenlist==1.4.1 + attrs==22.2.0 + frozenlist==1.4.1 + multidict==6.0.5 + yarl==1.9.4 + idna==3.4 + multidict==6.0.5 + aioitertools==0.11.0 + botocore==1.34.106 + jmespath==1.0.1 + python-dateutil==2.8.2 + six==1.16.0 + urllib3==1.26.14 + wrapt==1.16.0 + aiohttp==3.9.5 + aiosignal==1.3.1 + frozenlist==1.4.1 + attrs==22.2.0 + frozenlist==1.4.1 + multidict==6.0.5 + yarl==1.9.4 + idna==3.4 + multidict==6.0.5 + fsspec==2024.6.0 simplejson==3.19.2 sshpubkeys==3.3.1 cryptography==40.0.2 diff --git a/rhodecode/apps/repository/views/repo_files.py b/rhodecode/apps/repository/views/repo_files.py --- a/rhodecode/apps/repository/views/repo_files.py +++ b/rhodecode/apps/repository/views/repo_files.py @@ -39,7 +39,7 @@ from rhodecode.apps._base import RepoApp from rhodecode.lib import diffs, helpers as h, rc_cache from rhodecode.lib import audit_logger from rhodecode.lib.hash_utils import sha1_safe -from rhodecode.lib.rc_cache.archive_cache import ( +from rhodecode.lib.archive_cache import ( get_archival_cache_store, get_archival_config, ArchiveCacheGenerationLock, archive_iterator) from rhodecode.lib.str_utils import safe_bytes, convert_special_chars from rhodecode.lib.view_utils import parse_path_ref diff --git a/rhodecode/config/config_maker.py b/rhodecode/config/config_maker.py --- a/rhodecode/config/config_maker.py +++ b/rhodecode/config/config_maker.py @@ -193,10 +193,26 @@ def sanitize_settings_and_apply_defaults settings_maker.make_setting('archive_cache.backend.type', 'filesystem') settings_maker.make_setting('archive_cache.filesystem.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,) + settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int') settings_maker.make_setting('archive_cache.filesystem.cache_size_gb', 10, parser='float') - settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int') settings_maker.make_setting('archive_cache.filesystem.eviction_policy', 'least-recently-stored') + settings_maker.make_setting('archive_cache.filesystem.retry', False, parser='bool') + settings_maker.make_setting('archive_cache.filesystem.retry_backoff', 1, parser='int') + settings_maker.make_setting('archive_cache.filesystem.retry_attempts', 10, parser='int') + + settings_maker.make_setting('archive_cache.objectstore.url', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,) + settings_maker.make_setting('archive_cache.objectstore.key', '') + settings_maker.make_setting('archive_cache.objectstore.secret', '') + settings_maker.make_setting('archive_cache.objectstore.bucket_shards', 8, parser='int') + + settings_maker.make_setting('archive_cache.objectstore.cache_size_gb', 10, parser='float') + settings_maker.make_setting('archive_cache.objectstore.eviction_policy', 'least-recently-stored') + + settings_maker.make_setting('archive_cache.objectstore.retry', False, parser='bool') + settings_maker.make_setting('archive_cache.objectstore.retry_backoff', 1, parser='int') + settings_maker.make_setting('archive_cache.objectstore.retry_attempts', 10, parser='int') + settings_maker.env_expand() # configure instance id diff --git a/rhodecode/config/middleware.py b/rhodecode/config/middleware.py --- a/rhodecode/config/middleware.py +++ b/rhodecode/config/middleware.py @@ -326,7 +326,7 @@ def includeme(config, auth_resources=Non config.include('pyramid_mako') config.include('rhodecode.lib.rc_beaker') config.include('rhodecode.lib.rc_cache') - config.include('rhodecode.lib.rc_cache.archive_cache') + config.include('rhodecode.lib.archive_cache') config.include('rhodecode.apps._base.navigation') config.include('rhodecode.apps._base.subscribers') diff --git a/rhodecode/lib/rc_cache/archive_cache/__init__.py b/rhodecode/lib/archive_cache/__init__.py rename from rhodecode/lib/rc_cache/archive_cache/__init__.py rename to rhodecode/lib/archive_cache/__init__.py --- a/rhodecode/lib/rc_cache/archive_cache/__init__.py +++ b/rhodecode/lib/archive_cache/__init__.py @@ -16,14 +16,63 @@ # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ -from .fanout_cache import get_archival_cache_store -from .fanout_cache import get_archival_config +import logging + +from .backends.fanout_cache import FileSystemFanoutCache +from .backends.objectstore_cache import ObjectStoreCache -from .utils import archive_iterator -from .utils import ArchiveCacheGenerationLock +from .utils import archive_iterator # noqa +from .lock import ArchiveCacheGenerationLock # noqa + +log = logging.getLogger(__name__) + + +cache_meta = None def includeme(config): # init our cache at start settings = config.get_settings() get_archival_cache_store(settings) + + +def get_archival_config(config): + + final_config = { + + } + + for k, v in config.items(): + if k.startswith('archive_cache'): + final_config[k] = v + + return final_config + + +def get_archival_cache_store(config, always_init=False): + + global cache_meta + if cache_meta is not None and not always_init: + return cache_meta + + config = get_archival_config(config) + backend = config['archive_cache.backend.type'] + + archive_cache_locking_url = config['archive_cache.locking.url'] + + match backend: + case 'filesystem': + d_cache = FileSystemFanoutCache( + locking_url=archive_cache_locking_url, + **config + ) + case 'objectstore': + d_cache = ObjectStoreCache( + locking_url=archive_cache_locking_url, + **config + ) + case _: + raise ValueError(f'archive_cache.backend.type only supports "filesystem" or "objectstore" got {backend} ') + + cache_meta = d_cache + return cache_meta diff --git a/rhodecode/lib/archive_cache/backends/__init__.py b/rhodecode/lib/archive_cache/backends/__init__.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/archive_cache/backends/__init__.py @@ -0,0 +1,17 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ diff --git a/rhodecode/lib/archive_cache/backends/base.py b/rhodecode/lib/archive_cache/backends/base.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/archive_cache/backends/base.py @@ -0,0 +1,348 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import os +import functools +import logging +import typing +import time +import zlib + +from ...ext_json import json +from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size +from ..lock import GenerationLock + +log = logging.getLogger(__name__) + + +class BaseShard: + storage_type: str = '' + fs = None + + @classmethod + def hash(cls, key): + """Compute portable hash for `key`. + + :param key: key to hash + :return: hash value + + """ + mask = 0xFFFFFFFF + return zlib.adler32(key.encode('utf-8')) & mask # noqa + + def _write_file(self, full_path, read_iterator, mode): + raise NotImplementedError + + def _get_keyfile(self, key): + raise NotImplementedError + + def random_filename(self): + raise NotImplementedError + + def _store(self, key, value_reader, metadata, mode): + (filename, # hash-name + full_path # full-path/hash-name + ) = self.random_filename() + + key_file, key_file_path = self._get_keyfile(key) + + # STORE METADATA + _metadata = { + "version": "v1", + + "key_file": key_file, # this is the .key.json file storing meta + "key_file_path": key_file_path, # full path to key_file + "archive_key": key, # original name we stored archive under, e.g my-archive.zip + "archive_filename": filename, # the actual filename we stored that file under + "archive_full_path": full_path, + + "store_time": time.time(), + "access_count": 0, + "access_time": 0, + + "size": 0 + } + if metadata: + _metadata.update(metadata) + + read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') + size, sha256 = self._write_file(full_path, read_iterator, mode) + _metadata['size'] = size + _metadata['sha256'] = sha256 + + # after archive is finished, we create a key to save the presence of the binary file + with self.fs.open(key_file_path, 'wb') as f: + f.write(json.dumps(_metadata)) + + return key, filename, size, _metadata + + def _fetch(self, key, retry, retry_attempts, retry_backoff): + if retry is NOT_GIVEN: + retry = False + if retry_attempts is NOT_GIVEN: + retry_attempts = 0 + + if retry and retry_attempts > 0: + for attempt in range(1, retry_attempts + 1): + if key in self: + break + # we didn't find the key, wait retry_backoff N seconds, and re-check + time.sleep(retry_backoff) + + if key not in self: + log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}') + raise KeyError(key) + + key_file, key_file_path = self._get_keyfile(key) + with self.fs.open(key_file_path, 'rb') as f: + metadata = json.loads(f.read()) + + archive_path = metadata['archive_full_path'] + + try: + return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata + finally: + # update usage stats, count and accessed + metadata["access_count"] = metadata.get("access_count", 0) + 1 + metadata["access_time"] = time.time() + log.debug('Updated %s with access snapshot, access_count=%s access_time=%s', + key_file, metadata['access_count'], metadata['access_time']) + with self.fs.open(key_file_path, 'wb') as f: + f.write(json.dumps(metadata)) + + def _remove(self, key): + if key not in self: + log.exception(f'requested key={key} not found in {self}') + raise KeyError(key) + + key_file, key_file_path = self._get_keyfile(key) + with self.fs.open(key_file_path, 'rb') as f: + metadata = json.loads(f.read()) + + archive_path = metadata['archive_full_path'] + self.fs.rm(archive_path) + self.fs.rm(key_file_path) + return 1 + + @property + def storage_medium(self): + return getattr(self, self.storage_type) + + @property + def key_suffix(self): + return 'key.json' + + def __contains__(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key matching item + :return: True if key matching item + + """ + key_file, key_file_path = self._get_keyfile(key) + return self.fs.exists(key_file_path) + + +class BaseCache: + _locking_url: str = '' + _storage_path: str = '' + _config = {} + retry = False + retry_attempts = 0 + retry_backoff = 1 + _shards = tuple() + + def __contains__(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key matching item + :return: True if key matching item + + """ + return self.has_key(key) + + def __repr__(self): + return f'<{self.__class__.__name__}(storage={self._storage_path})>' + + @classmethod + def gb_to_bytes(cls, gb): + return gb * (1024 ** 3) + + @property + def storage_path(self): + return self._storage_path + + @classmethod + def get_stats_db(cls): + return StatsDB() + + def get_conf(self, key, pop=False): + if key not in self._config: + raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config") + val = self._config[key] + if pop: + del self._config[key] + return val + + def _get_shard(self, key): + raise NotImplementedError + + def _get_size(self, shard, archive_path): + raise NotImplementedError + + def store(self, key, value_reader, metadata=None): + shard = self._get_shard(key) + return shard.store(key, value_reader, metadata) + + def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]: + """ + Return file handle corresponding to `key` from specific shard cache. + """ + if retry is NOT_GIVEN: + retry = self.retry + if retry_attempts is NOT_GIVEN: + retry_attempts = self.retry_attempts + retry_backoff = self.retry_backoff + + shard = self._get_shard(key) + return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff) + + def remove(self, key): + shard = self._get_shard(key) + return shard.remove(key) + + def has_key(self, archive_key): + """Return `True` if `key` matching item is found in cache. + + :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip + :return: True if key is found + + """ + shard = self._get_shard(archive_key) + return archive_key in shard + + def iter_keys(self): + for shard in self._shards: + if shard.fs.exists(shard.storage_medium): + for path, _dirs, _files in shard.fs.walk(shard.storage_medium): + for key_file_path in _files: + if key_file_path.endswith(shard.key_suffix): + yield shard, key_file_path + + def get_lock(self, lock_key): + return GenerationLock(lock_key, self._locking_url) + + def evict(self, policy=None, size_limit=None) -> int: + """ + Remove old items based on the conditions + + + explanation of this algo: + iterate over each shard, then for each shard iterate over the .key files + read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and + access data, time creation, and access counts. + + Store that into a memory DB so we can run different sorting strategies easily. + Summing the size is a sum sql query. + + Then we run a sorting strategy based on eviction policy. + We iterate over sorted keys, and remove each checking if we hit the overall limit. + """ + + policy = policy or self._eviction_policy + size_limit = size_limit or self._cache_size_limit + + select_policy = EVICTION_POLICY[policy]['evict'] + + log.debug('Running eviction policy \'%s\', and checking for size limit: %s', + policy, format_size(size_limit)) + + if select_policy is None: + return 0 + + db = self.get_stats_db() + + data = [] + cnt = 1 + + for shard, key_file in self.iter_keys(): + with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f: + metadata = json.loads(f.read()) + + key_file_path = os.path.join(shard.storage_medium, key_file) + + archive_key = metadata['archive_key'] + archive_path = metadata['archive_full_path'] + + size = metadata.get('size') + if not size: + # in case we don't have size re-calc it... + size = self._get_size(shard, archive_path) + + data.append([ + cnt, + key_file, + key_file_path, + archive_key, + archive_path, + metadata.get('store_time', 0), + metadata.get('access_time', 0), + metadata.get('access_count', 0), + size, + ]) + cnt += 1 + + # Insert bulk data using executemany + db.bulk_insert(data) + + total_size = db.get_total_size() + log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s', + len(data), format_size(total_size), format_size(size_limit)) + + removed_items = 0 + removed_size = 0 + for key_file, archive_key, size in db.get_sorted_keys(select_policy): + # simulate removal impact BEFORE removal + total_size -= size + + if total_size <= size_limit: + # we obtained what we wanted... + break + + self.remove(archive_key) + removed_items += 1 + removed_size += size + + log.debug('Removed %s cache archives, and reduced size by: %s', + removed_items, format_size(removed_size)) + return removed_items + + def get_statistics(self): + total_files = 0 + total_size = 0 + meta = {} + + for shard, key_file in self.iter_keys(): + json_key = f"{shard.storage_medium}/{key_file}" + with shard.fs.open(json_key, 'rb') as f: + total_files += 1 + metadata = json.loads(f.read()) + total_size += metadata['size'] + + return total_files, total_size, meta + diff --git a/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py b/rhodecode/lib/archive_cache/backends/fanout_cache.py rename from rhodecode/lib/rc_cache/archive_cache/fanout_cache.py rename to rhodecode/lib/archive_cache/backends/fanout_cache.py --- a/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py +++ b/rhodecode/lib/archive_cache/backends/fanout_cache.py @@ -17,198 +17,81 @@ # and proprietary license terms, please see https://rhodecode.com/licenses/ import codecs -import contextlib -import functools -import os +import hashlib import logging -import time -import typing -import zlib -import sqlite3 +import os + +import fsspec -from ...ext_json import json -from .lock import GenerationLock -from .utils import format_size +from .base import BaseCache, BaseShard +from ..utils import ShardFileReader, NOT_GIVEN +from ...type_utils import str2bool log = logging.getLogger(__name__) -cache_meta = None -UNKNOWN = -241 -NO_VAL = -917 - -MODE_BINARY = 'BINARY' - - -EVICTION_POLICY = { - 'none': { - 'evict': None, - }, - 'least-recently-stored': { - 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', - }, - 'least-recently-used': { - 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', - }, - 'least-frequently-used': { - 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', - }, -} - - -class DB: - - def __init__(self): - self.connection = sqlite3.connect(':memory:') - self._init_db() - - def _init_db(self): - qry = ''' - CREATE TABLE IF NOT EXISTS archive_cache ( - rowid INTEGER PRIMARY KEY, - key_file TEXT, - key_file_path TEXT, - filename TEXT, - full_path TEXT, - store_time REAL, - access_time REAL, - access_count INTEGER DEFAULT 0, - size INTEGER DEFAULT 0 - ) - ''' - - self.sql(qry) - self.connection.commit() - - @property - def sql(self): - return self.connection.execute - - def bulk_insert(self, rows): - qry = ''' - INSERT INTO archive_cache ( - rowid, - key_file, - key_file_path, - filename, - full_path, - store_time, - access_time, - access_count, - size - ) - VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - ''' - cursor = self.connection.cursor() - cursor.executemany(qry, rows) - self.connection.commit() - - -class FileSystemCache: +class FileSystemShard(BaseShard): def __init__(self, index, directory, **settings): self._index = index self._directory = directory + self.storage_type = 'directory' + self.fs = fsspec.filesystem('file') @property def directory(self): """Cache directory.""" return self._directory - def _write_file(self, full_path, iterator, mode, encoding=None): - full_dir, _ = os.path.split(full_path) + def _get_keyfile(self, archive_key) -> tuple[str, str]: + key_file = f'{archive_key}.{self.key_suffix}' + return key_file, os.path.join(self.directory, key_file) + def _get_writer(self, path, mode): for count in range(1, 11): - with contextlib.suppress(OSError): - os.makedirs(full_dir) - try: # Another cache may have deleted the directory before # the file could be opened. - writer = open(full_path, mode, encoding=encoding) + return self.fs.open(path, mode) except OSError: if count == 10: # Give up after 10 tries to open the file. raise continue - with writer: - size = 0 - for chunk in iterator: - size += len(chunk) - writer.write(chunk) - writer.flush() - # Get the file descriptor - fd = writer.fileno() - - # Sync the file descriptor to disk, helps with NFS cases... - os.fsync(fd) - log.debug('written new archive cache under %s', full_path) - return size - - def _get_keyfile(self, key): - return os.path.join(self._directory, f'{key}.key') + def _write_file(self, full_path, iterator, mode): + # ensure dir exists + destination, _ = os.path.split(full_path) + if not self.fs.exists(destination): + self.fs.makedirs(destination) - def store(self, key, value_reader, metadata): - filename, full_path = self.random_filename() - key_file = self._get_keyfile(key) - - # STORE METADATA - _metadata = { - "version": "v1", - "filename": filename, - "full_path": full_path, - "key_file": key_file, - "store_time": time.time(), - "access_count": 1, - "access_time": 0, - "size": 0 - } - if metadata: - _metadata.update(metadata) - - reader = functools.partial(value_reader.read, 2**22) + writer = self._get_writer(full_path, mode) - iterator = iter(reader, b'') - size = self._write_file(full_path, iterator, 'xb') - metadata['size'] = size - - # after archive is finished, we create a key to save the presence of the binary file - with open(key_file, 'wb') as f: - f.write(json.dumps(_metadata)) - - return key, size, MODE_BINARY, filename, _metadata - - def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]: - - if retry: - for attempt in range(retry_attempts): - if key in self: - break - # we dind't find the key, wait 1s, and re-check - time.sleep(1) + digest = hashlib.sha256() + with writer: + size = 0 + for chunk in iterator: + size += len(chunk) + digest.update(chunk) + writer.write(chunk) + writer.flush() + # Get the file descriptor + fd = writer.fileno() - if key not in self: - log.exception('requested {key} not found in {self}', key, self) - raise KeyError(key) - - key_file = self._get_keyfile(key) - with open(key_file, 'rb') as f: - metadata = json.loads(f.read()) - - filename = metadata['filename'] + # Sync the file descriptor to disk, helps with NFS cases... + os.fsync(fd) + sha256 = digest.hexdigest() + log.debug('written new archive cache under %s, sha256: %s', full_path, sha256) + return size, sha256 - try: - return open(os.path.join(self.directory, filename), 'rb'), metadata - finally: - # update usage stats, count and accessed - metadata["access_count"] = metadata.get("access_count", 0) + 1 - metadata["access_time"] = time.time() + def store(self, key, value_reader, metadata: dict | None = None): + return self._store(key, value_reader, metadata, mode='xb') - with open(key_file, 'wb') as f: - f.write(json.dumps(metadata)) + def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: + return self._fetch(key, retry, retry_attempts, retry_backoff) + + def remove(self, key): + return self._remove(key) def random_filename(self): """Return filename and full-path tuple for file storage. @@ -220,64 +103,52 @@ class FileSystemCache: """ hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') - sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) - name = hex_name[4:] + '.archive_cache' - filename = os.path.join(sub_dir, name) - full_path = os.path.join(self.directory, filename) - return filename, full_path - - def hash(self, key): - """Compute portable hash for `key`. - - :param key: key to hash - :return: hash value - """ - mask = 0xFFFFFFFF - return zlib.adler32(key.encode('utf-8')) & mask # noqa - - def __contains__(self, key): - """Return `True` if `key` matching item is found in cache. + archive_name = hex_name[4:] + '.archive_cache' + filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}" - :param key: key matching item - :return: True if key matching item - - """ - key_file = self._get_keyfile(key) - return os.path.exists(key_file) + full_path = os.path.join(self.directory, filename) + return archive_name, full_path def __repr__(self): - return f'FileSystemCache(index={self._index}, dir={self.directory})' + return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})' -class FanoutCache: - """Cache that shards keys and values.""" +class FileSystemFanoutCache(BaseCache): - def __init__( - self, directory=None, **settings - ): - """Initialize cache instance. + def __init__(self, locking_url, **settings): + """ + Initialize file system cache instance. - :param str directory: cache directory + :param str locking_url: redis url for a lock :param settings: settings dict """ - if directory is None: - raise ValueError('directory cannot be None') - - directory = str(directory) + self._locking_url = locking_url + self._config = settings + cache_dir = self.get_conf('archive_cache.filesystem.store_dir') + directory = str(cache_dir) directory = os.path.expanduser(directory) directory = os.path.expandvars(directory) self._directory = directory + self._storage_path = directory - self._count = settings.pop('cache_shards') - self._locking_url = settings.pop('locking_url') + # check if it's ok to write, and re-create the archive cache + if not os.path.isdir(self._directory): + os.makedirs(self._directory, exist_ok=True) + + self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True)) - self._eviction_policy = settings['cache_eviction_policy'] - self._cache_size_limit = settings['cache_size_limit'] + self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True) + self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb'))) + self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True)) + self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True)) + self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True)) + + log.debug('Initializing archival cache instance under %s', self._directory) self._shards = tuple( - FileSystemCache( + FileSystemShard( index=num, directory=os.path.join(directory, 'shard_%03d' % num), **settings, @@ -286,171 +157,10 @@ class FanoutCache: ) self._hash = self._shards[0].hash - @property - def directory(self): - """Cache directory.""" - return self._directory - - def get_lock(self, lock_key): - return GenerationLock(lock_key, self._locking_url) - - def _get_shard(self, key) -> FileSystemCache: + def _get_shard(self, key) -> FileSystemShard: index = self._hash(key) % self._count shard = self._shards[index] return shard - def store(self, key, value_reader, metadata=None): - shard = self._get_shard(key) - return shard.store(key, value_reader, metadata) - - def fetch(self, key, retry=False, retry_attempts=10): - """Return file handle corresponding to `key` from cache. - """ - shard = self._get_shard(key) - return shard.fetch(key, retry=retry, retry_attempts=retry_attempts) - - def has_key(self, key): - """Return `True` if `key` matching item is found in cache. - - :param key: key for item - :return: True if key is found - - """ - shard = self._get_shard(key) - return key in shard - - def __contains__(self, item): - return self.has_key(item) - - def evict(self, policy=None, size_limit=None): - """ - Remove old items based on the conditions - - - explanation of this algo: - iterate over each shard, then for each shard iterate over the .key files - read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and - access data, time creation, and access counts. - - Store that into a memory DB so we can run different sorting strategies easily. - Summing the size is a sum sql query. - - Then we run a sorting strategy based on eviction policy. - We iterate over sorted keys, and remove each checking if we hit the overall limit. - """ - - policy = policy or self._eviction_policy - size_limit = size_limit or self._cache_size_limit - - select_policy = EVICTION_POLICY[policy]['evict'] - - log.debug('Running eviction policy \'%s\', and checking for size limit: %s', - policy, format_size(size_limit)) - - if select_policy is None: - return 0 - - db = DB() - - data = [] - cnt = 1 - for shard in self._shards: - for key_file in os.listdir(shard.directory): - if key_file.endswith('.key'): - key_file_path = os.path.join(shard.directory, key_file) - with open(key_file_path, 'rb') as f: - metadata = json.loads(f.read()) - - size = metadata.get('size') - filename = metadata.get('filename') - full_path = metadata.get('full_path') - - if not size: - # in case we don't have size re-calc it... - size = os.stat(full_path).st_size - - data.append([ - cnt, - key_file, - key_file_path, - filename, - full_path, - metadata.get('store_time', 0), - metadata.get('access_time', 0), - metadata.get('access_count', 0), - size, - ]) - cnt += 1 - - # Insert bulk data using executemany - db.bulk_insert(data) - - ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall() - log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size)) - select_policy_qry = select_policy.format(fields='key_file_path, full_path, size') - sorted_keys = db.sql(select_policy_qry).fetchall() - - removed_items = 0 - removed_size = 0 - for key, cached_file, size in sorted_keys: - # simulate removal impact BEFORE removal - total_size -= size - - if total_size <= size_limit: - # we obtained what we wanted... - break - - os.remove(cached_file) - os.remove(key) - removed_items += 1 - removed_size += size - - log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size)) - return removed_items - - -def get_archival_config(config): - - final_config = { - - } - - for k, v in config.items(): - if k.startswith('archive_cache'): - final_config[k] = v - - return final_config - - -def get_archival_cache_store(config): - - global cache_meta - if cache_meta is not None: - return cache_meta - - config = get_archival_config(config) - backend = config['archive_cache.backend.type'] - if backend != 'filesystem': - raise ValueError('archive_cache.backend.type only supports "filesystem"') - - archive_cache_locking_url = config['archive_cache.locking.url'] - archive_cache_dir = config['archive_cache.filesystem.store_dir'] - archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] - archive_cache_shards = config['archive_cache.filesystem.cache_shards'] - archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] - - log.debug('Initializing archival cache instance under %s', archive_cache_dir) - - # check if it's ok to write, and re-create the archive cache - if not os.path.isdir(archive_cache_dir): - os.makedirs(archive_cache_dir, exist_ok=True) - - d_cache = FanoutCache( - archive_cache_dir, - locking_url=archive_cache_locking_url, - cache_shards=archive_cache_shards, - cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, - cache_eviction_policy=archive_cache_eviction_policy - ) - cache_meta = d_cache - return cache_meta + def _get_size(self, shard, archive_path): + return os.stat(archive_path).st_size diff --git a/rhodecode/lib/archive_cache/backends/objectstore_cache.py b/rhodecode/lib/archive_cache/backends/objectstore_cache.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/archive_cache/backends/objectstore_cache.py @@ -0,0 +1,150 @@ +# Copyright (C) 2015-2024 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import codecs +import hashlib +import logging +import os + +import fsspec + +from .base import BaseCache, BaseShard +from ..utils import ShardFileReader, NOT_GIVEN +from ...type_utils import str2bool + +log = logging.getLogger(__name__) + + +class S3Shard(BaseShard): + + def __init__(self, index, bucket, **settings): + self._index = index + self._bucket = bucket + self.storage_type = 'bucket' + + endpoint_url = settings.pop('archive_cache.objectstore.url') + key = settings.pop('archive_cache.objectstore.key') + secret = settings.pop('archive_cache.objectstore.secret') + + self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) + + @property + def bucket(self): + """Cache bucket.""" + return self._bucket + + def _get_keyfile(self, archive_key) -> tuple[str, str]: + key_file = f'{archive_key}-{self.key_suffix}' + return key_file, os.path.join(self.bucket, key_file) + + def _get_writer(self, path, mode): + return self.fs.open(path, 'wb') + + def _write_file(self, full_path, iterator, mode): + # ensure bucket exists + destination = self.bucket + if not self.fs.exists(destination): + self.fs.mkdir(destination, s3_additional_kwargs={}) + + writer = self._get_writer(full_path, mode) + + digest = hashlib.sha256() + with writer: + size = 0 + for chunk in iterator: + size += len(chunk) + digest.update(chunk) + writer.write(chunk) + + sha256 = digest.hexdigest() + log.debug('written new archive cache under %s, sha256: %s', full_path, sha256) + return size, sha256 + + def store(self, key, value_reader, metadata: dict | None = None): + return self._store(key, value_reader, metadata, mode='wb') + + def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: + return self._fetch(key, retry, retry_attempts, retry_backoff) + + def remove(self, key): + return self._remove(key) + + def random_filename(self): + """Return filename and full-path tuple for file storage. + + Filename will be a randomly generated 28 character hexadecimal string + with ".archive_cache" suffixed. Two levels of sub-directories will be used to + reduce the size of directories. On older filesystems, lookups in + directories with many files may be slow. + """ + + hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') + + archive_name = hex_name[4:] + '.archive_cache' + filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}" + + full_path = os.path.join(self.bucket, filename) + return archive_name, full_path + + def __repr__(self): + return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})' + + +class ObjectStoreCache(BaseCache): + + def __init__(self, locking_url, **settings): + """ + Initialize objectstore cache instance. + + :param str locking_url: redis url for a lock + :param settings: settings dict + + """ + self._locking_url = locking_url + self._config = settings + + objectstore_url = self.get_conf('archive_cache.objectstore.url') + self._storage_path = objectstore_url + + self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) + + self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True) + self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb'))) + + self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True)) + self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True)) + self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True)) + + log.debug('Initializing archival cache instance under %s', objectstore_url) + self._shards = tuple( + S3Shard( + index=num, + bucket='rhodecode-archivecache-%03d' % num, + **settings, + ) + for num in range(self._count) + ) + self._hash = self._shards[0].hash + + def _get_shard(self, key) -> S3Shard: + index = self._hash(key) % self._count + shard = self._shards[index] + return shard + + def _get_size(self, shard, archive_path): + return shard.fs.info(archive_path)['size'] diff --git a/rhodecode/lib/rc_cache/archive_cache/lock.py b/rhodecode/lib/archive_cache/lock.py rename from rhodecode/lib/rc_cache/archive_cache/lock.py rename to rhodecode/lib/archive_cache/lock.py --- a/rhodecode/lib/rc_cache/archive_cache/lock.py +++ b/rhodecode/lib/archive_cache/lock.py @@ -17,8 +17,11 @@ # and proprietary license terms, please see https://rhodecode.com/licenses/ import redis -from ..._vendor import redis_lock -from .utils import ArchiveCacheGenerationLock +from .._vendor import redis_lock + + +class ArchiveCacheGenerationLock(Exception): + pass class GenerationLock: diff --git a/rhodecode/lib/rc_cache/archive_cache/utils.py b/rhodecode/lib/archive_cache/utils.py rename from rhodecode/lib/rc_cache/archive_cache/utils.py rename to rhodecode/lib/archive_cache/utils.py --- a/rhodecode/lib/rc_cache/archive_cache/utils.py +++ b/rhodecode/lib/archive_cache/utils.py @@ -16,11 +16,26 @@ # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ -import os +import sqlite3 +import s3fs.core + +NOT_GIVEN = -917 -class ArchiveCacheGenerationLock(Exception): - pass +EVICTION_POLICY = { + 'none': { + 'evict': None, + }, + 'least-recently-stored': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', + }, + 'least-recently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', + }, + 'least-frequently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', + }, +} def archive_iterator(_reader, block_size: int = 4096 * 512): @@ -32,41 +47,88 @@ def archive_iterator(_reader, block_size yield data -def get_directory_statistics(start_path): - """ - total_files, total_size, directory_stats = get_directory_statistics(start_path) - - print(f"Directory statistics for: {start_path}\n") - print(f"Total files: {total_files}") - print(f"Total size: {format_size(total_size)}\n") - - :param start_path: - :return: - """ - - total_files = 0 - total_size = 0 - directory_stats = {} - - for dir_path, dir_names, file_names in os.walk(start_path): - dir_size = 0 - file_count = len(file_names) - - for file in file_names: - filepath = os.path.join(dir_path, file) - file_size = os.path.getsize(filepath) - dir_size += file_size - - directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size} - total_files += file_count - total_size += dir_size - - return total_files, total_size, directory_stats - - def format_size(size): # Convert size in bytes to a human-readable format (e.g., KB, MB, GB) for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 + + +class StatsDB: + + def __init__(self): + self.connection = sqlite3.connect(':memory:') + self._init_db() + + def _init_db(self): + qry = ''' + CREATE TABLE IF NOT EXISTS archive_cache ( + rowid INTEGER PRIMARY KEY, + key_file TEXT, + key_file_path TEXT, + archive_key TEXT, + archive_path TEXT, + store_time REAL, + access_time REAL, + access_count INTEGER DEFAULT 0, + size INTEGER DEFAULT 0 + ) + ''' + + self.sql(qry) + self.connection.commit() + + @property + def sql(self): + return self.connection.execute + + def bulk_insert(self, rows): + qry = ''' + INSERT INTO archive_cache ( + rowid, + key_file, + key_file_path, + archive_key, + archive_path, + store_time, + access_time, + access_count, + size + ) + VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ''' + cursor = self.connection.cursor() + cursor.executemany(qry, rows) + self.connection.commit() + + def get_total_size(self): + qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache' + ((total_size,),) = self.sql(qry).fetchall() + return total_size + + def get_sorted_keys(self, select_policy): + select_policy_qry = select_policy.format(fields='key_file, archive_key, size') + return self.sql(select_policy_qry).fetchall() + + +class ShardFileReader: + + def __init__(self, file_like_reader): + self._file_like_reader = file_like_reader + + def __getattr__(self, item): + if isinstance(self._file_like_reader, s3fs.core.S3File): + match item: + case 'name': + # S3 FileWrapper doesn't support name attribute, and we use it + return self._file_like_reader.full_name + case _: + return getattr(self._file_like_reader, item) + else: + return getattr(self._file_like_reader, item) + + def __repr__(self): + return f'<{self.__class__.__name__}={self._file_like_reader}>' diff --git a/rhodecode/lib/base.py b/rhodecode/lib/base.py --- a/rhodecode/lib/base.py +++ b/rhodecode/lib/base.py @@ -578,7 +578,7 @@ def bootstrap_config(request, registry_n config.include('pyramid_mako') config.include('rhodecode.lib.rc_beaker') config.include('rhodecode.lib.rc_cache') - config.include('rhodecode.lib.rc_cache.archive_cache') + config.include('rhodecode.lib.archive_cache') add_events_routes(config) return config diff --git a/rhodecode/lib/helpers.py b/rhodecode/lib/helpers.py --- a/rhodecode/lib/helpers.py +++ b/rhodecode/lib/helpers.py @@ -2196,3 +2196,35 @@ class IssuesRegistry(object): @property def issues_unique_count(self): return len(set(i['id'] for i in self.issues)) + + +def get_directory_statistics(start_path): + """ + total_files, total_size, directory_stats = get_directory_statistics(start_path) + + print(f"Directory statistics for: {start_path}\n") + print(f"Total files: {total_files}") + print(f"Total size: {format_size(total_size)}\n") + + :param start_path: + :return: + """ + + total_files = 0 + total_size = 0 + directory_stats = {} + + for dir_path, dir_names, file_names in os.walk(start_path): + dir_size = 0 + file_count = len(file_names) + + for fname in file_names: + filepath = os.path.join(dir_path, fname) + file_size = os.path.getsize(filepath) + dir_size += file_size + + directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size} + total_files += file_count + total_size += dir_size + + return total_files, total_size, directory_stats diff --git a/rhodecode/lib/system_info.py b/rhodecode/lib/system_info.py --- a/rhodecode/lib/system_info.py +++ b/rhodecode/lib/system_info.py @@ -399,29 +399,23 @@ def storage_inodes(): def storage_archives(): import rhodecode from rhodecode.lib.helpers import format_byte_size_binary - from rhodecode.lib.rc_cache.archive_cache.utils import get_directory_statistics + from rhodecode.lib.archive_cache import get_archival_cache_store storage_type = rhodecode.ConfigGet().get_str('archive_cache.backend.type') - storage_key = 'archive_cache.filesystem.store_dir' - default_msg = 'Archive cache storage is controlled by '\ - f'{storage_key}=/path/to/cache option in the .ini file' - path = rhodecode.ConfigGet().get_str(storage_key, missing=default_msg) - - value = dict(percent=0, used=0, total=0, items=0, path=path, text='', type=storage_type) + value = dict(percent=0, used=0, total=0, items=0, path='', text='', type=storage_type) state = STATE_OK_DEFAULT try: - if storage_type != 'filesystem': - # raise Exc to stop reporting on different type - raise ValueError('Storage type must be "filesystem"') + d_cache = get_archival_cache_store(config=rhodecode.CONFIG) - total_files, total_size, _directory_stats = get_directory_statistics(path) + total_files, total_size, _directory_stats = d_cache.get_statistics() value.update({ 'percent': 100, 'used': total_size, 'total': total_size, - 'items': total_files + 'items': total_files, + 'path': d_cache.storage_path }) except Exception as e: @@ -441,8 +435,7 @@ def storage_archives(): def storage_gist(): from rhodecode.model.gist import GIST_STORE_LOC from rhodecode.lib.utils import safe_str, get_rhodecode_repo_store_path - from rhodecode.lib.helpers import format_byte_size_binary - from rhodecode.lib.rc_cache.archive_cache.utils import get_directory_statistics + from rhodecode.lib.helpers import format_byte_size_binary, get_directory_statistics path = safe_str(os.path.join( get_rhodecode_repo_store_path(), GIST_STORE_LOC)) diff --git a/rhodecode/tests/fixture_mods/fixture_pyramid.py b/rhodecode/tests/fixture_mods/fixture_pyramid.py --- a/rhodecode/tests/fixture_mods/fixture_pyramid.py +++ b/rhodecode/tests/fixture_mods/fixture_pyramid.py @@ -100,7 +100,7 @@ def ini_config(request, tmpdir_factory, overrides = [ {'server:main': {'port': rcserver_port}}, {'app:main': { - 'cache_dir': '%(here)s/rc_data', + 'cache_dir': '%(here)s/rc-tests/rc_data', 'vcs.server': f'localhost:{vcsserver_port}', # johbo: We will always start the VCSServer on our own based on the # fixtures of the test cases. For the test run it must always be diff --git a/rhodecode/tests/lib/test_archive_caches.py b/rhodecode/tests/lib/test_archive_caches.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/lib/test_archive_caches.py @@ -0,0 +1,105 @@ +# Copyright (C) 2016-2023 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import time +import pytest +import rhodecode +import os +import shutil +from tempfile import mkdtemp + +from rhodecode.lib import archive_cache + + +def file_reader(temp_store): + with open(temp_store, 'w') as f: + for cnt in range(10000): + f.write(str(cnt)) + return open(temp_store, 'rb') + + +@pytest.fixture() +def d_cache_instance(ini_settings): + config = ini_settings + d_cache = archive_cache.get_archival_cache_store(config=config, always_init=True) + return d_cache + + +@pytest.mark.usefixtures('app') +class TestArchiveCaches(object): + + def test_archivecache_empty_stats(self, d_cache_instance): + d_cache = d_cache_instance + shutil.rmtree(d_cache._directory) + + stats = d_cache.get_statistics() + assert (0, 0, {}) == stats + + def test_archivecache_store_keys(self, d_cache_instance, tmp_path): + d_cache = d_cache_instance + shutil.rmtree(d_cache._directory) + + for n in range(100): + + archive_name = f'my-archive-abc-{n}.zip' + temp_archive_path = os.path.join(tmp_path, archive_name) + d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'}) + reader, meta = d_cache.fetch(archive_name) + content = reader.read() + assert content == open(temp_archive_path, 'rb').read() + + stats = d_cache.get_statistics() + assert (100, 3889000, {}) == stats + + def test_archivecache_remove_keys(self, d_cache_instance, tmp_path): + d_cache = d_cache_instance + shutil.rmtree(d_cache._directory) + + n = 1 + archive_name = f'my-archive-abc-{n}.zip' + temp_archive_path = os.path.join(tmp_path, archive_name) + + d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'}) + stats = d_cache.get_statistics() + assert (1, 38890, {}) == stats + + assert 1 == d_cache.remove(archive_name) + + stats = d_cache.get_statistics() + assert (0, 0, {}) == stats + + def test_archivecache_evict_keys(self, d_cache_instance, tmp_path): + d_cache = d_cache_instance + shutil.rmtree(d_cache._directory) + tries = 500 + for n in range(tries): + + archive_name = f'my-archive-abc-{n}.zip' + temp_archive_path = os.path.join(tmp_path, archive_name) + d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'}) + + stats = d_cache.get_statistics() + assert (tries, 19445000, {}) == stats + evict_to = 0.005 # around (5mb) + evicted_items = d_cache.evict(size_limit=d_cache.gb_to_bytes(evict_to)) + evicted = 361 + assert evicted == evicted_items + + stats = d_cache.get_statistics() + assert (tries - evicted, 5405710, {}) == stats + diff --git a/rhodecode/tests/rhodecode.ini b/rhodecode/tests/rhodecode.ini --- a/rhodecode/tests/rhodecode.ini +++ b/rhodecode/tests/rhodecode.ini @@ -258,20 +258,62 @@ file_store.backend = local ; path to store the uploaded binaries and artifacts file_store.storage_path = /var/opt/rhodecode_data/file_store -; Uncomment and set this path to control settings for archive download cache. + +; Redis url to acquire/check generation of archives locks +archive_cache.locking.url = redis://redis:6379/1 + +; Storage backend, only 'filesystem' and 'objectstore' are available now +archive_cache.backend.type = filesystem + +; url for s3 compatible storage that allows to upload artifacts +; e.g http://minio:9000 +archive_cache.objectstore.url = http://s3-minio:9000 + +; key for s3 auth +archive_cache.objectstore.key = key + +; secret for s3 auth +archive_cache.objectstore.secret = secret + +; number of sharded buckets to create to distribute archives across +; default is 8 shards +archive_cache.objectstore.bucket_shards = 8 + +; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time +archive_cache.objectstore.retry = false + +; number of seconds to wait for next try using retry +archive_cache.objectstore.retry_backoff = 1 + +; how many tries do do a retry fetch from this backend +archive_cache.objectstore.retry_attempts = 10 + +; Default is $cache_dir/archive_cache if not set ; Generated repo archives will be cached at this location ; and served from the cache during subsequent requests for the same archive of ; the repository. This path is important to be shared across filesystems and with ; RhodeCode and vcsserver - -; Default is $cache_dir/archive_cache if not set -archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache +archive_cache.filesystem.store_dir = %(here)s/rc-tests/archive_cache ; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb -archive_cache.cache_size_gb = 10 +archive_cache.filesystem.cache_size_gb = 2 + +; Eviction policy used to clear out after cache_size_gb limit is reached +archive_cache.filesystem.eviction_policy = least-recently-stored ; By default cache uses sharding technique, this specifies how many shards are there -archive_cache.cache_shards = 10 +; default is 8 shards +archive_cache.filesystem.cache_shards = 8 + +; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time +archive_cache.filesystem.retry = false + +; number of seconds to wait for next try using retry +archive_cache.filesystem.retry_backoff = 1 + +; how many tries do do a retry fetch from this backend +archive_cache.filesystem.retry_attempts = 10 + ; ############# ; CELERY CONFIG @@ -335,7 +377,7 @@ rc_cache.cache_repo_longterm.max_size = rc_cache.cache_general.backend = dogpile.cache.rc.file_namespace rc_cache.cache_general.expiration_time = 43200 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set -rc_cache.cache_general.arguments.filename = %(here)s/cache-backend/cache_general_db +rc_cache.cache_general.arguments.filename = %(here)s/rc-tests/cache-backend/cache_general_db ; alternative `cache_general` redis backend with distributed lock #rc_cache.cache_general.backend = dogpile.cache.rc.redis @@ -362,7 +404,7 @@ rc_cache.cache_general.arguments.filenam rc_cache.cache_perms.backend = dogpile.cache.rc.file_namespace rc_cache.cache_perms.expiration_time = 0 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set -rc_cache.cache_perms.arguments.filename = %(here)s/cache-backend/cache_perms_db +rc_cache.cache_perms.arguments.filename = %(here)s/rc-tests/cache-backend/cache_perms_db ; alternative `cache_perms` redis backend with distributed lock #rc_cache.cache_perms.backend = dogpile.cache.rc.redis @@ -389,7 +431,7 @@ rc_cache.cache_perms.arguments.filename rc_cache.cache_repo.backend = dogpile.cache.rc.file_namespace rc_cache.cache_repo.expiration_time = 2592000 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set -rc_cache.cache_repo.arguments.filename = %(here)s/cache-backend/cache_repo_db +rc_cache.cache_repo.arguments.filename = %(here)s/rc-tests/cache-backend/cache_repo_db ; alternative `cache_repo` redis backend with distributed lock #rc_cache.cache_repo.backend = dogpile.cache.rc.redis @@ -432,7 +474,7 @@ beaker.session.data_dir = %(here)s/rc-te beaker.session.key = rhodecode beaker.session.secret = test-rc-uytcxaz -beaker.session.lock_dir = %(here)s/data/sessions/lock +beaker.session.lock_dir = %(here)s/rc-tests/data/sessions/lock ; Secure encrypted cookie. Requires AES and AES python libraries ; you must disable beaker.session.secret to use this @@ -464,7 +506,7 @@ beaker.session.secure = false ; WHOOSH Backend, doesn't require additional services to run ; it works good with few dozen repos search.module = rhodecode.lib.index.whoosh -search.location = %(here)s/data/index +search.location = %(here)s/rc-tests/data/index ; #################### ; CHANNELSTREAM CONFIG @@ -484,7 +526,7 @@ channelstream.server = channelstream:980 ; see Nginx/Apache configuration examples in our docs channelstream.ws_url = ws://rhodecode.yourserver.com/_channelstream channelstream.secret = ENV_GENERATED -channelstream.history.location = %(here)s/channelstream_history +channelstream.history.location = %(here)s/rc-tests/channelstream_history ; Internal application path that Javascript uses to connect into. ; If you use proxy-prefix the prefix should be added before /_channelstream @@ -501,7 +543,7 @@ channelstream.proxy_path = /_channelstre ; pymysql is an alternative driver for MySQL, use in case of problems with default one #sqlalchemy.db1.url = mysql+pymysql://root:qweqwe@localhost/rhodecode -sqlalchemy.db1.url = sqlite:///%(here)s/rhodecode_test.db?timeout=30 +sqlalchemy.db1.url = sqlite:///%(here)s/rc-tests/rhodecode_test.db?timeout=30 ; see sqlalchemy docs for other advanced settings ; print the sql statements to output @@ -590,7 +632,7 @@ svn.proxy.generate_config = false svn.proxy.list_parent_path = true ; Set location and file name of generated config file. -svn.proxy.config_file_path = %(here)s/mod_dav_svn.conf +svn.proxy.config_file_path = %(here)s/rc-tests/mod_dav_svn.conf ; alternative mod_dav config template. This needs to be a valid mako template ; Example template can be found in the source code: @@ -626,7 +668,7 @@ ssh.generate_authorized_keyfile = true ; Path to the authorized_keys file where the generate entries are placed. ; It is possible to have multiple key files specified in `sshd_config` e.g. ; AuthorizedKeysFile %h/.ssh/authorized_keys %h/.ssh/authorized_keys_rhodecode -ssh.authorized_keys_file_path = %(here)s/rc/authorized_keys_rhodecode +ssh.authorized_keys_file_path = %(here)s/rc-tests/authorized_keys_rhodecode ; Command to execute the SSH wrapper. The binary is available in the ; RhodeCode installation directory. diff --git a/rhodecode/tests/vcs/test_archives.py b/rhodecode/tests/vcs/test_archives.py --- a/rhodecode/tests/vcs/test_archives.py +++ b/rhodecode/tests/vcs/test_archives.py @@ -28,7 +28,7 @@ import mock import pytest import rhodecode -from rhodecode.lib.rc_cache.archive_cache import get_archival_config +from rhodecode.lib.archive_cache import get_archival_config from rhodecode.lib.str_utils import ascii_bytes from rhodecode.lib.vcs.backends import base from rhodecode.lib.vcs.exceptions import ImproperArchiveTypeError, VCSError