# Copyright (C) 2015-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import codecs import hashlib import logging import os import typing import fsspec from .base import BaseCache, BaseShard from ..utils import ShardFileReader, NOT_GIVEN from ...type_utils import str2bool log = logging.getLogger(__name__) class S3Shard(BaseShard): def __init__(self, index, bucket, bucket_folder, fs, **settings): self._index: int = index self._bucket_folder: str = bucket_folder self.storage_type: str = 'bucket' self._bucket_main: str = bucket self.fs = fs @property def bucket(self) -> str: """Cache bucket final path.""" return os.path.join(self._bucket_main, self._bucket_folder) def _get_keyfile(self, archive_key) -> tuple[str, str]: key_file: str = f'{archive_key}-{self.key_suffix}' return key_file, os.path.join(self.bucket, key_file) def _get_writer(self, path, mode): return self.fs.open(path, 'wb') def _write_file(self, full_path, iterator, mode): # ensure folder in bucket exists destination = self.bucket if not self.fs.exists(destination): self.fs.mkdir(destination, s3_additional_kwargs={}) writer = self._get_writer(full_path, mode) digest = hashlib.sha256() with writer: size = 0 for chunk in iterator: size += len(chunk) digest.update(chunk) writer.write(chunk) sha256 = digest.hexdigest() log.debug('written new archive cache under %s, sha256: %s', full_path, sha256) return size, sha256 def store(self, key, value_reader, metadata: dict | None = None): return self._store(key, value_reader, metadata, mode='wb') def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1, presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires) def remove(self, key): return self._remove(key) def random_filename(self): """Return filename and full-path tuple for file storage. Filename will be a randomly generated 28 character hexadecimal string with ".archive_cache" suffixed. Two levels of sub-directories will be used to reduce the size of directories. On older filesystems, lookups in directories with many files may be slow. """ hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') archive_name = hex_name[4:] + '.archive_cache' filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}" full_path = os.path.join(self.bucket, filename) return archive_name, full_path def __repr__(self): return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})' class ObjectStoreCache(BaseCache): shard_name: str = 'shard-{:03d}' shard_cls = S3Shard def __init__(self, locking_url, **settings): """ Initialize objectstore cache instance. :param str locking_url: redis url for a lock :param settings: settings dict """ self._locking_url = locking_url self._config = settings objectstore_url = self.get_conf('archive_cache.objectstore.url') self._storage_path = objectstore_url # common path for all from BaseCache self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) if self._shard_count < 1: raise ValueError('cache_shards must be 1 or more') self._bucket = settings.pop('archive_cache.objectstore.bucket') if not self._bucket: raise ValueError('archive_cache.objectstore.bucket needs to have a value') self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True) self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb'))) self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True)) self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True)) self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True)) endpoint_url = settings.pop('archive_cache.objectstore.url') key = settings.pop('archive_cache.objectstore.key') secret = settings.pop('archive_cache.objectstore.secret') region = settings.pop('archive_cache.objectstore.region') log.debug('Initializing %s archival cache instance', self) fs = fsspec.filesystem( 's3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, client_kwargs={'region_name': region} ) # init main bucket if not fs.exists(self._bucket): fs.mkdir(self._bucket) self._shards = tuple( self.shard_cls( index=num, bucket=self._bucket, bucket_folder=self.shard_name.format(num), fs=fs, **settings, ) for num in range(self._shard_count) ) self._hash = self._shards[0].hash def _get_size(self, shard, archive_path): return shard.fs.info(archive_path)['size'] def set_presigned_url_expiry(self, val: int) -> None: self.presigned_url_expires = val