# Copyright (C) 2016-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import fsspec # noqa import logging from rhodecode.lib.ext_json import json from rhodecode.apps.file_store.utils import sha256_safe, ShardFileReader, get_uid_filename from rhodecode.apps.file_store.extensions import resolve_extensions from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException # noqa: F401 log = logging.getLogger(__name__) class BaseShard: metadata_suffix: str = '.metadata' storage_type: str = '' fs = None @property def storage_medium(self): if not self.storage_type: raise ValueError('No storage type set for this shard storage_type=""') return getattr(self, self.storage_type) def __contains__(self, key): full_path = self.store_path(key) return self.fs.exists(full_path) def metadata_convert(self, uid_filename, metadata): return metadata def get_metadata_filename(self, uid_filename) -> tuple[str, str]: metadata_file: str = f'{uid_filename}{self.metadata_suffix}' return metadata_file, self.store_path(metadata_file) def get_metadata(self, uid_filename, ignore_missing=False) -> dict: _metadata_file, metadata_file_path = self.get_metadata_filename(uid_filename) if ignore_missing and not self.fs.exists(metadata_file_path): return {} with self.fs.open(metadata_file_path, 'rb') as f: metadata = json.loads(f.read()) metadata = self.metadata_convert(uid_filename, metadata) return metadata def _store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): raise NotImplementedError def store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): return self._store(key, uid_key, value_reader, max_filesize, metadata, **kwargs) def _fetch(self, key, presigned_url_expires: int = 0): raise NotImplementedError def fetch(self, key, **kwargs) -> tuple[ShardFileReader, dict]: return self._fetch(key) def _delete(self, key): if key not in self: log.exception(f'requested key={key} not found in {self}') raise KeyError(key) metadata = self.get_metadata(key) _metadata_file, metadata_file_path = self.get_metadata_filename(key) artifact_file_path = metadata['filename_uid_path'] self.fs.rm(artifact_file_path) self.fs.rm(metadata_file_path) return 1 def delete(self, key): raise NotImplementedError def store_path(self, uid_filename): raise NotImplementedError class BaseFileStoreBackend: _shards = tuple() _shard_cls = BaseShard _config: dict | None = None _storage_path: str = '' def __init__(self, settings, extension_groups=None): self._config = settings extension_groups = extension_groups or ['any'] self.extensions = resolve_extensions([], groups=extension_groups) def __contains__(self, key): return self.filename_exists(key) def __repr__(self): return f'<{self.__class__.__name__}(storage={self.storage_path})>' @property def storage_path(self): return self._storage_path @classmethod def get_shard_index(cls, filename: str, num_shards) -> int: # Generate a hash value from the filename hash_value = sha256_safe(filename) # Convert the hash value to an integer hash_int = int(hash_value, 16) # Map the hash integer to a shard number between 1 and num_shards shard_number = (hash_int % num_shards) return shard_number @classmethod def apply_counter(cls, counter: int, filename: str) -> str: """ Apply a counter to the filename. :param counter: The counter value to apply. :param filename: The original filename. :return: The modified filename with the counter. """ name_counted = f'{counter:d}-{filename}' return name_counted def _get_shard(self, key) -> _shard_cls: index = self.get_shard_index(key, len(self._shards)) shard = self._shards[index] return shard def get_conf(self, key, pop=False): if key not in self._config: raise ValueError( f"No configuration key '{key}', please make sure it exists in filestore config") val = self._config[key] if pop: del self._config[key] return val def filename_allowed(self, filename, extensions=None): """Checks if a filename has an allowed extension :param filename: base name of file :param extensions: iterable of extensions (or self.extensions) """ _, ext = os.path.splitext(filename) return self.extension_allowed(ext, extensions) def extension_allowed(self, ext, extensions=None): """ Checks if an extension is permitted. Both e.g. ".jpg" and "jpg" can be passed in. Extension lookup is case-insensitive. :param ext: extension to check :param extensions: iterable of extensions to validate against (or self.extensions) """ def normalize_ext(_ext): if _ext.startswith('.'): _ext = _ext[1:] return _ext.lower() extensions = extensions or self.extensions if not extensions: return True ext = normalize_ext(ext) return ext in [normalize_ext(x) for x in extensions] def filename_exists(self, uid_filename): shard = self._get_shard(uid_filename) return uid_filename in shard def store_path(self, uid_filename): """ Returns absolute file path of the uid_filename """ shard = self._get_shard(uid_filename) return shard.store_path(uid_filename) def store_metadata(self, uid_filename): shard = self._get_shard(uid_filename) return shard.get_metadata_filename(uid_filename) def store(self, filename, value_reader, extensions=None, metadata=None, max_filesize=None, randomized_name=True, **kwargs): extensions = extensions or self.extensions if not self.filename_allowed(filename, extensions): msg = f'filename {filename} does not allow extensions {extensions}' raise FileNotAllowedException(msg) # # TODO: check why we need this setting ? it looks stupid... # no_body_seek is used in stream mode importer somehow # no_body_seek = kwargs.pop('no_body_seek', False) # if no_body_seek: # pass # else: # value_reader.seek(0) uid_filename = kwargs.pop('uid_filename', None) if uid_filename is None: uid_filename = get_uid_filename(filename, randomized=randomized_name) shard = self._get_shard(uid_filename) return shard.store(filename, uid_filename, value_reader, max_filesize, metadata, **kwargs) def import_to_store(self, value_reader, org_filename, uid_filename, metadata, **kwargs): shard = self._get_shard(uid_filename) max_filesize = None return shard.store(org_filename, uid_filename, value_reader, max_filesize, metadata, import_mode=True) def delete(self, uid_filename): shard = self._get_shard(uid_filename) return shard.delete(uid_filename) def fetch(self, uid_filename) -> tuple[ShardFileReader, dict]: shard = self._get_shard(uid_filename) return shard.fetch(uid_filename) def get_metadata(self, uid_filename, ignore_missing=False) -> dict: shard = self._get_shard(uid_filename) return shard.get_metadata(uid_filename, ignore_missing=ignore_missing) def iter_keys(self): for shard in self._shards: if shard.fs.exists(shard.storage_medium): for path, _dirs, _files in shard.fs.walk(shard.storage_medium): for key_file_path in _files: if key_file_path.endswith(shard.metadata_suffix): yield shard, key_file_path def iter_artifacts(self): for shard, key_file in self.iter_keys(): json_key = f"{shard.storage_medium}/{key_file}" with shard.fs.open(json_key, 'rb') as f: yield shard, json.loads(f.read())['filename_uid'] def get_statistics(self): total_files = 0 total_size = 0 meta = {} for shard, key_file in self.iter_keys(): json_key = f"{shard.storage_medium}/{key_file}" with shard.fs.open(json_key, 'rb') as f: total_files += 1 metadata = json.loads(f.read()) total_size += metadata['size'] return total_files, total_size, meta