# Copyright (C) 2016-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import hashlib import functools import time import logging from .. import config_keys from ..exceptions import FileOverSizeException from ..backends.base import BaseFileStoreBackend, fsspec, BaseShard, ShardFileReader from ....lib.ext_json import json log = logging.getLogger(__name__) class FileSystemShard(BaseShard): METADATA_VER = 'v2' BACKEND_TYPE = config_keys.backend_filesystem storage_type: str = 'directory' def __init__(self, index, directory, directory_folder, fs, **settings): self._index: int = index self._directory: str = directory self._directory_folder: str = directory_folder self.fs = fs @property def directory(self) -> str: """Cache directory final path.""" return os.path.join(self._directory, self._directory_folder) def _write_file(self, full_path, iterator, max_filesize, mode='wb'): # ensure dir exists destination, _ = os.path.split(full_path) if not self.fs.exists(destination): self.fs.makedirs(destination) writer = self.fs.open(full_path, mode) digest = hashlib.sha256() oversize_cleanup = False with writer: size = 0 for chunk in iterator: size += len(chunk) digest.update(chunk) writer.write(chunk) if max_filesize and size > max_filesize: oversize_cleanup = True # free up the copied file, and raise exc break writer.flush() # Get the file descriptor fd = writer.fileno() # Sync the file descriptor to disk, helps with NFS cases... os.fsync(fd) if oversize_cleanup: self.fs.rm(full_path) raise FileOverSizeException(f'given file is over size limit ({max_filesize}): {full_path}') sha256 = digest.hexdigest() log.debug('written new artifact under %s, sha256: %s', full_path, sha256) return size, sha256 def _store(self, key: str, uid_key, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): filename = key uid_filename = uid_key full_path = self.store_path(uid_filename) # STORE METADATA _metadata = { "version": self.METADATA_VER, "store_type": self.BACKEND_TYPE, "filename": filename, "filename_uid_path": full_path, "filename_uid": uid_filename, "sha256": "", # NOTE: filled in by reader iteration "store_time": time.time(), "size": 0 } if metadata: if kwargs.pop('import_mode', False): # in import mode, we don't need to compute metadata, we just take the old version _metadata["import_mode"] = True else: _metadata.update(metadata) read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') size, sha256 = self._write_file(full_path, read_iterator, max_filesize) _metadata['size'] = size _metadata['sha256'] = sha256 # after storing the artifacts, we write the metadata present _metadata_file, metadata_file_path = self.get_metadata_filename(uid_key) with self.fs.open(metadata_file_path, 'wb') as f: f.write(json.dumps(_metadata)) return uid_filename, _metadata def store_path(self, uid_filename): """ Returns absolute file path of the uid_filename """ return os.path.join(self._directory, self._directory_folder, uid_filename) def _fetch(self, key, presigned_url_expires: int = 0): if key not in self: log.exception(f'requested key={key} not found in {self}') raise KeyError(key) metadata = self.get_metadata(key) file_path = metadata['filename_uid_path'] if presigned_url_expires and presigned_url_expires > 0: metadata['url'] = self.fs.url(file_path, expires=presigned_url_expires) return ShardFileReader(self.fs.open(file_path, 'rb')), metadata def delete(self, key): return self._delete(key) class FileSystemBackend(BaseFileStoreBackend): shard_name: str = 'shard_{:03d}' _shard_cls = FileSystemShard def __init__(self, settings): super().__init__(settings) store_dir = self.get_conf(config_keys.filesystem_storage_path) directory = os.path.expanduser(store_dir) self._directory = directory self._storage_path = directory # common path for all from BaseCache self._shard_count = int(self.get_conf(config_keys.filesystem_shards, pop=True)) if self._shard_count < 1: raise ValueError(f'{config_keys.filesystem_shards} must be 1 or more') log.debug('Initializing %s file_store instance', self) fs = fsspec.filesystem('file') if not fs.exists(self._directory): fs.makedirs(self._directory, exist_ok=True) self._shards = tuple( self._shard_cls( index=num, directory=directory, directory_folder=self.shard_name.format(num), fs=fs, **settings, ) for num in range(self._shard_count) )