# Copyright (C) 2016-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import errno import os import hashlib import functools import time import logging from .. import config_keys from ..exceptions import FileOverSizeException from ..backends.base import BaseFileStoreBackend, fsspec, BaseShard, ShardFileReader from ....lib.ext_json import json log = logging.getLogger(__name__) class LegacyFileSystemShard(BaseShard): # legacy ver METADATA_VER = 'v2' BACKEND_TYPE = config_keys.backend_legacy_filesystem storage_type: str = 'dir_struct' # legacy suffix metadata_suffix: str = '.meta' @classmethod def _sub_store_from_filename(cls, filename): return filename[:2] @classmethod def apply_counter(cls, counter, filename): name_counted = '%d-%s' % (counter, filename) return name_counted @classmethod def safe_make_dirs(cls, dir_path): if not os.path.exists(dir_path): try: os.makedirs(dir_path) except OSError as e: if e.errno != errno.EEXIST: raise return @classmethod def resolve_name(cls, name, directory): """ Resolves a unique name and the correct path. If a filename for that path already exists then a numeric prefix with values > 0 will be added, for example test.jpg -> 1-test.jpg etc. initially file would have 0 prefix. :param name: base name of file :param directory: absolute directory path """ counter = 0 while True: name_counted = cls.apply_counter(counter, name) # sub_store prefix to optimize disk usage, e.g some_path/ab/final_file sub_store: str = cls._sub_store_from_filename(name_counted) sub_store_path: str = os.path.join(directory, sub_store) cls.safe_make_dirs(sub_store_path) path = os.path.join(sub_store_path, name_counted) if not os.path.exists(path): return name_counted, path counter += 1 def __init__(self, index, directory, directory_folder, fs, **settings): self._index: int = index self._directory: str = directory self._directory_folder: str = directory_folder self.fs = fs @property def dir_struct(self) -> str: """Cache directory final path.""" return os.path.join(self._directory, '0-') def _write_file(self, full_path, iterator, max_filesize, mode='wb'): # ensure dir exists destination, _ = os.path.split(full_path) if not self.fs.exists(destination): self.fs.makedirs(destination) writer = self.fs.open(full_path, mode) digest = hashlib.sha256() oversize_cleanup = False with writer: size = 0 for chunk in iterator: size += len(chunk) digest.update(chunk) writer.write(chunk) if max_filesize and size > max_filesize: # free up the copied file, and raise exc oversize_cleanup = True break writer.flush() # Get the file descriptor fd = writer.fileno() # Sync the file descriptor to disk, helps with NFS cases... os.fsync(fd) if oversize_cleanup: self.fs.rm(full_path) raise FileOverSizeException(f'given file is over size limit ({max_filesize}): {full_path}') sha256 = digest.hexdigest() log.debug('written new artifact under %s, sha256: %s', full_path, sha256) return size, sha256 def _store(self, key: str, uid_key, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): filename = key uid_filename = uid_key # NOTE:, also apply N- Counter... uid_filename, full_path = self.resolve_name(uid_filename, self._directory) # STORE METADATA # TODO: make it compatible, and backward proof _metadata = { "version": self.METADATA_VER, "filename": filename, "filename_uid_path": full_path, "filename_uid": uid_filename, "sha256": "", # NOTE: filled in by reader iteration "store_time": time.time(), "size": 0 } if metadata: _metadata.update(metadata) read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') size, sha256 = self._write_file(full_path, read_iterator, max_filesize) _metadata['size'] = size _metadata['sha256'] = sha256 # after storing the artifacts, we write the metadata present _metadata_file, metadata_file_path = self.get_metadata_filename(uid_filename) with self.fs.open(metadata_file_path, 'wb') as f: f.write(json.dumps(_metadata)) return uid_filename, _metadata def store_path(self, uid_filename): """ Returns absolute file path of the uid_filename """ prefix_dir = '' if '/' in uid_filename: prefix_dir, filename = uid_filename.split('/') sub_store = self._sub_store_from_filename(filename) else: sub_store = self._sub_store_from_filename(uid_filename) return os.path.join(self._directory, prefix_dir, sub_store, uid_filename) def metadata_convert(self, uid_filename, metadata): # NOTE: backward compat mode here... this is for file created PRE 5.2 system if 'meta_ver' in metadata: full_path = self.store_path(uid_filename) metadata = { "_converted": True, "_org": metadata, "version": self.METADATA_VER, "store_type": self.BACKEND_TYPE, "filename": metadata['filename'], "filename_uid_path": full_path, "filename_uid": uid_filename, "sha256": metadata['sha256'], "store_time": metadata['time'], "size": metadata['size'] } return metadata def _fetch(self, key, presigned_url_expires: int = 0): if key not in self: log.exception(f'requested key={key} not found in {self}') raise KeyError(key) metadata = self.get_metadata(key) file_path = metadata['filename_uid_path'] if presigned_url_expires and presigned_url_expires > 0: metadata['url'] = self.fs.url(file_path, expires=presigned_url_expires) return ShardFileReader(self.fs.open(file_path, 'rb')), metadata def delete(self, key): return self._delete(key) def _delete(self, key): if key not in self: log.exception(f'requested key={key} not found in {self}') raise KeyError(key) metadata = self.get_metadata(key) metadata_file, metadata_file_path = self.get_metadata_filename(key) artifact_file_path = metadata['filename_uid_path'] self.fs.rm(artifact_file_path) self.fs.rm(metadata_file_path) def get_metadata_filename(self, uid_filename) -> tuple[str, str]: metadata_file: str = f'{uid_filename}{self.metadata_suffix}' uid_path_in_store = self.store_path(uid_filename) metadata_file_path = f'{uid_path_in_store}{self.metadata_suffix}' return metadata_file, metadata_file_path class LegacyFileSystemBackend(BaseFileStoreBackend): _shard_cls = LegacyFileSystemShard def __init__(self, settings): super().__init__(settings) store_dir = self.get_conf(config_keys.legacy_filesystem_storage_path) directory = os.path.expanduser(store_dir) self._directory = directory self._storage_path = directory # common path for all from BaseCache log.debug('Initializing %s file_store instance', self) fs = fsspec.filesystem('file') if not fs.exists(self._directory): fs.makedirs(self._directory, exist_ok=True) # legacy system uses single shard self._shards = tuple( [ self._shard_cls( index=0, directory=directory, directory_folder='', fs=fs, **settings, ) ] ) @classmethod def get_shard_index(cls, filename: str, num_shards) -> int: # legacy filesystem doesn't use shards, and always uses single shard return 0