# Copyright (C) 2016-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import io import uuid import pathlib import s3fs from rhodecode.lib.hash_utils import sha256_safe from rhodecode.apps.file_store import config_keys file_store_meta = None def get_filestore_config(config) -> dict: final_config = {} for k, v in config.items(): if k.startswith('file_store'): final_config[k] = v return final_config def get_filestore_backend(config, always_init=False): """ usage:: from rhodecode.apps.file_store import get_filestore_backend f_store = get_filestore_backend(config=CONFIG) :param config: :param always_init: :return: """ global file_store_meta if file_store_meta is not None and not always_init: return file_store_meta config = get_filestore_config(config) backend = config[config_keys.backend_type] match backend: case config_keys.backend_legacy_filesystem: # Legacy backward compatible storage from rhodecode.apps.file_store.backends.filesystem_legacy import LegacyFileSystemBackend d_cache = LegacyFileSystemBackend( settings=config ) case config_keys.backend_filesystem: from rhodecode.apps.file_store.backends.filesystem import FileSystemBackend d_cache = FileSystemBackend( settings=config ) case config_keys.backend_objectstore: from rhodecode.apps.file_store.backends.objectstore import ObjectStoreBackend d_cache = ObjectStoreBackend( settings=config ) case _: raise ValueError( f'file_store.backend.type only supports "{config_keys.backend_types}" got {backend}' ) cache_meta = d_cache return cache_meta def splitext(filename): final_ext = [] for suffix in pathlib.Path(filename).suffixes: if not suffix.isascii(): continue suffix = " ".join(suffix.split()).replace(" ", "") final_ext.append(suffix) ext = ''.join(final_ext) return filename, ext def get_uid_filename(filename, randomized=True): """ Generates a randomized or stable (uuid) filename, preserving the original extension. :param filename: the original filename :param randomized: define if filename should be stable (sha1 based) or randomized """ _, ext = splitext(filename) if randomized: uid = uuid.uuid4() else: store_suffix = "store" hash_key = f'{filename}.{store_suffix}' uid = uuid.uuid5(uuid.NAMESPACE_URL, hash_key) return str(uid) + ext.lower() def bytes_to_file_obj(bytes_data): return io.BytesIO(bytes_data) class ShardFileReader: def __init__(self, file_like_reader): self._file_like_reader = file_like_reader def __getattr__(self, item): if isinstance(self._file_like_reader, s3fs.core.S3File): match item: case 'name': # S3 FileWrapper doesn't support name attribute, and we use it return self._file_like_reader.full_name case _: return getattr(self._file_like_reader, item) else: return getattr(self._file_like_reader, item) def archive_iterator(_reader, block_size: int = 4096 * 512): # 4096 * 64 = 64KB while 1: data = _reader.read(block_size) if not data: break yield data