# Copyright (C) 2016-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import hashlib import functools import time import logging from .. import config_keys from ..exceptions import FileOverSizeException from ..backends.base import BaseFileStoreBackend, fsspec, BaseShard, ShardFileReader from ....lib.ext_json import json log = logging.getLogger(__name__) class S3Shard(BaseShard): METADATA_VER = 'v2' BACKEND_TYPE = config_keys.backend_objectstore storage_type: str = 'bucket' def __init__(self, index, bucket, bucket_folder, fs, **settings): self._index: int = index self._bucket_main: str = bucket self._bucket_folder: str = bucket_folder self.fs = fs @property def bucket(self) -> str: """Cache bucket final path.""" return os.path.join(self._bucket_main, self._bucket_folder) def _write_file(self, full_path, iterator, max_filesize, mode='wb'): # ensure dir exists destination, _ = os.path.split(full_path) if not self.fs.exists(destination): self.fs.makedirs(destination) writer = self.fs.open(full_path, mode) digest = hashlib.sha256() oversize_cleanup = False with writer: size = 0 for chunk in iterator: size += len(chunk) digest.update(chunk) writer.write(chunk) if max_filesize and size > max_filesize: oversize_cleanup = True # free up the copied file, and raise exc break if oversize_cleanup: self.fs.rm(full_path) raise FileOverSizeException(f'given file is over size limit ({max_filesize}): {full_path}') sha256 = digest.hexdigest() log.debug('written new artifact under %s, sha256: %s', full_path, sha256) return size, sha256 def _store(self, key: str, uid_key, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): filename = key uid_filename = uid_key full_path = self.store_path(uid_filename) # STORE METADATA _metadata = { "version": self.METADATA_VER, "store_type": self.BACKEND_TYPE, "filename": filename, "filename_uid_path": full_path, "filename_uid": uid_filename, "sha256": "", # NOTE: filled in by reader iteration "store_time": time.time(), "size": 0 } if metadata: if kwargs.pop('import_mode', False): # in import mode, we don't need to compute metadata, we just take the old version _metadata["import_mode"] = True else: _metadata.update(metadata) read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') size, sha256 = self._write_file(full_path, read_iterator, max_filesize) _metadata['size'] = size _metadata['sha256'] = sha256 # after storing the artifacts, we write the metadata present metadata_file, metadata_file_path = self.get_metadata_filename(uid_key) with self.fs.open(metadata_file_path, 'wb') as f: f.write(json.dumps(_metadata)) return uid_filename, _metadata def store_path(self, uid_filename): """ Returns absolute file path of the uid_filename """ return os.path.join(self._bucket_main, self._bucket_folder, uid_filename) def _fetch(self, key, presigned_url_expires: int = 0): if key not in self: log.exception(f'requested key={key} not found in {self}') raise KeyError(key) metadata_file, metadata_file_path = self.get_metadata_filename(key) with self.fs.open(metadata_file_path, 'rb') as f: metadata = json.loads(f.read()) file_path = metadata['filename_uid_path'] if presigned_url_expires and presigned_url_expires > 0: metadata['url'] = self.fs.url(file_path, expires=presigned_url_expires) return ShardFileReader(self.fs.open(file_path, 'rb')), metadata def delete(self, key): return self._delete(key) class ObjectStoreBackend(BaseFileStoreBackend): shard_name: str = 'shard-{:03d}' _shard_cls = S3Shard def __init__(self, settings): super().__init__(settings) self._shard_count = int(self.get_conf(config_keys.objectstore_bucket_shards, pop=True)) if self._shard_count < 1: raise ValueError('cache_shards must be 1 or more') self._bucket = settings.pop(config_keys.objectstore_bucket) if not self._bucket: raise ValueError(f'{config_keys.objectstore_bucket} needs to have a value') objectstore_url = self.get_conf(config_keys.objectstore_url) key = settings.pop(config_keys.objectstore_key) secret = settings.pop(config_keys.objectstore_secret) self._storage_path = objectstore_url # common path for all from BaseCache log.debug('Initializing %s file_store instance', self) fs = fsspec.filesystem('s3', anon=False, endpoint_url=objectstore_url, key=key, secret=secret) # init main bucket if not fs.exists(self._bucket): fs.mkdir(self._bucket) self._shards = tuple( self._shard_cls( index=num, bucket=self._bucket, bucket_folder=self.shard_name.format(num), fs=fs, **settings, ) for num in range(self._shard_count) )