|
|
# Copyright (C) 2016-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import os
|
|
|
import fsspec # noqa
|
|
|
import logging
|
|
|
|
|
|
from rhodecode.lib.ext_json import json
|
|
|
|
|
|
from rhodecode.apps.file_store.utils import sha256_safe, ShardFileReader, get_uid_filename
|
|
|
from rhodecode.apps.file_store.extensions import resolve_extensions
|
|
|
from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException # noqa: F401
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class BaseShard:
|
|
|
|
|
|
metadata_suffix: str = '.metadata'
|
|
|
storage_type: str = ''
|
|
|
fs = None
|
|
|
|
|
|
@property
|
|
|
def storage_medium(self):
|
|
|
if not self.storage_type:
|
|
|
raise ValueError('No storage type set for this shard storage_type=""')
|
|
|
return getattr(self, self.storage_type)
|
|
|
|
|
|
def __contains__(self, key):
|
|
|
full_path = self.store_path(key)
|
|
|
return self.fs.exists(full_path)
|
|
|
|
|
|
def metadata_convert(self, uid_filename, metadata):
|
|
|
return metadata
|
|
|
|
|
|
def get_metadata_filename(self, uid_filename) -> tuple[str, str]:
|
|
|
metadata_file: str = f'{uid_filename}{self.metadata_suffix}'
|
|
|
return metadata_file, self.store_path(metadata_file)
|
|
|
|
|
|
def get_metadata(self, uid_filename, ignore_missing=False) -> dict:
|
|
|
_metadata_file, metadata_file_path = self.get_metadata_filename(uid_filename)
|
|
|
if ignore_missing and not self.fs.exists(metadata_file_path):
|
|
|
return {}
|
|
|
|
|
|
with self.fs.open(metadata_file_path, 'rb') as f:
|
|
|
metadata = json.loads(f.read())
|
|
|
|
|
|
metadata = self.metadata_convert(uid_filename, metadata)
|
|
|
return metadata
|
|
|
|
|
|
def _store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs):
|
|
|
return self._store(key, uid_key, value_reader, max_filesize, metadata, **kwargs)
|
|
|
|
|
|
def _fetch(self, key, presigned_url_expires: int = 0):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def fetch(self, key, **kwargs) -> tuple[ShardFileReader, dict]:
|
|
|
return self._fetch(key)
|
|
|
|
|
|
def _delete(self, key):
|
|
|
if key not in self:
|
|
|
log.exception(f'requested key={key} not found in {self}')
|
|
|
raise KeyError(key)
|
|
|
|
|
|
metadata = self.get_metadata(key)
|
|
|
_metadata_file, metadata_file_path = self.get_metadata_filename(key)
|
|
|
artifact_file_path = metadata['filename_uid_path']
|
|
|
self.fs.rm(artifact_file_path)
|
|
|
self.fs.rm(metadata_file_path)
|
|
|
|
|
|
return 1
|
|
|
|
|
|
def delete(self, key):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def store_path(self, uid_filename):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
class BaseFileStoreBackend:
|
|
|
_shards = tuple()
|
|
|
_shard_cls = BaseShard
|
|
|
_config: dict | None = None
|
|
|
_storage_path: str = ''
|
|
|
|
|
|
def __init__(self, settings, extension_groups=None):
|
|
|
self._config = settings
|
|
|
extension_groups = extension_groups or ['any']
|
|
|
self.extensions = resolve_extensions([], groups=extension_groups)
|
|
|
|
|
|
def __contains__(self, key):
|
|
|
return self.filename_exists(key)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return f'<{self.__class__.__name__}(storage={self.storage_path})>'
|
|
|
|
|
|
@property
|
|
|
def storage_path(self):
|
|
|
return self._storage_path
|
|
|
|
|
|
@classmethod
|
|
|
def get_shard_index(cls, filename: str, num_shards) -> int:
|
|
|
# Generate a hash value from the filename
|
|
|
hash_value = sha256_safe(filename)
|
|
|
|
|
|
# Convert the hash value to an integer
|
|
|
hash_int = int(hash_value, 16)
|
|
|
|
|
|
# Map the hash integer to a shard number between 1 and num_shards
|
|
|
shard_number = (hash_int % num_shards)
|
|
|
|
|
|
return shard_number
|
|
|
|
|
|
@classmethod
|
|
|
def apply_counter(cls, counter: int, filename: str) -> str:
|
|
|
"""
|
|
|
Apply a counter to the filename.
|
|
|
|
|
|
:param counter: The counter value to apply.
|
|
|
:param filename: The original filename.
|
|
|
:return: The modified filename with the counter.
|
|
|
"""
|
|
|
name_counted = f'{counter:d}-{filename}'
|
|
|
return name_counted
|
|
|
|
|
|
def _get_shard(self, key) -> _shard_cls:
|
|
|
index = self.get_shard_index(key, len(self._shards))
|
|
|
shard = self._shards[index]
|
|
|
return shard
|
|
|
|
|
|
def get_conf(self, key, pop=False):
|
|
|
if key not in self._config:
|
|
|
raise ValueError(
|
|
|
f"No configuration key '{key}', please make sure it exists in filestore config")
|
|
|
val = self._config[key]
|
|
|
if pop:
|
|
|
del self._config[key]
|
|
|
return val
|
|
|
|
|
|
def filename_allowed(self, filename, extensions=None):
|
|
|
"""Checks if a filename has an allowed extension
|
|
|
|
|
|
:param filename: base name of file
|
|
|
:param extensions: iterable of extensions (or self.extensions)
|
|
|
"""
|
|
|
_, ext = os.path.splitext(filename)
|
|
|
return self.extension_allowed(ext, extensions)
|
|
|
|
|
|
def extension_allowed(self, ext, extensions=None):
|
|
|
"""
|
|
|
Checks if an extension is permitted. Both e.g. ".jpg" and
|
|
|
"jpg" can be passed in. Extension lookup is case-insensitive.
|
|
|
|
|
|
:param ext: extension to check
|
|
|
:param extensions: iterable of extensions to validate against (or self.extensions)
|
|
|
"""
|
|
|
def normalize_ext(_ext):
|
|
|
if _ext.startswith('.'):
|
|
|
_ext = _ext[1:]
|
|
|
return _ext.lower()
|
|
|
|
|
|
extensions = extensions or self.extensions
|
|
|
if not extensions:
|
|
|
return True
|
|
|
|
|
|
ext = normalize_ext(ext)
|
|
|
|
|
|
return ext in [normalize_ext(x) for x in extensions]
|
|
|
|
|
|
def filename_exists(self, uid_filename):
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
return uid_filename in shard
|
|
|
|
|
|
def store_path(self, uid_filename):
|
|
|
"""
|
|
|
Returns absolute file path of the uid_filename
|
|
|
"""
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
return shard.store_path(uid_filename)
|
|
|
|
|
|
def store_metadata(self, uid_filename):
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
return shard.get_metadata_filename(uid_filename)
|
|
|
|
|
|
def store(self, filename, value_reader, extensions=None, metadata=None, max_filesize=None, randomized_name=True, **kwargs):
|
|
|
extensions = extensions or self.extensions
|
|
|
|
|
|
if not self.filename_allowed(filename, extensions):
|
|
|
msg = f'filename {filename} does not allow extensions {extensions}'
|
|
|
raise FileNotAllowedException(msg)
|
|
|
|
|
|
# # TODO: check why we need this setting ? it looks stupid...
|
|
|
# no_body_seek is used in stream mode importer somehow
|
|
|
# no_body_seek = kwargs.pop('no_body_seek', False)
|
|
|
# if no_body_seek:
|
|
|
# pass
|
|
|
# else:
|
|
|
# value_reader.seek(0)
|
|
|
|
|
|
uid_filename = kwargs.pop('uid_filename', None)
|
|
|
if uid_filename is None:
|
|
|
uid_filename = get_uid_filename(filename, randomized=randomized_name)
|
|
|
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
|
|
|
return shard.store(filename, uid_filename, value_reader, max_filesize, metadata, **kwargs)
|
|
|
|
|
|
def import_to_store(self, value_reader, org_filename, uid_filename, metadata, **kwargs):
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
max_filesize = None
|
|
|
return shard.store(org_filename, uid_filename, value_reader, max_filesize, metadata, import_mode=True)
|
|
|
|
|
|
def delete(self, uid_filename):
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
return shard.delete(uid_filename)
|
|
|
|
|
|
def fetch(self, uid_filename) -> tuple[ShardFileReader, dict]:
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
return shard.fetch(uid_filename)
|
|
|
|
|
|
def get_metadata(self, uid_filename, ignore_missing=False) -> dict:
|
|
|
shard = self._get_shard(uid_filename)
|
|
|
return shard.get_metadata(uid_filename, ignore_missing=ignore_missing)
|
|
|
|
|
|
def iter_keys(self):
|
|
|
for shard in self._shards:
|
|
|
if shard.fs.exists(shard.storage_medium):
|
|
|
for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
|
|
|
for key_file_path in _files:
|
|
|
if key_file_path.endswith(shard.metadata_suffix):
|
|
|
yield shard, key_file_path
|
|
|
|
|
|
def iter_artifacts(self):
|
|
|
for shard, key_file in self.iter_keys():
|
|
|
json_key = f"{shard.storage_medium}/{key_file}"
|
|
|
with shard.fs.open(json_key, 'rb') as f:
|
|
|
yield shard, json.loads(f.read())['filename_uid']
|
|
|
|
|
|
def get_statistics(self):
|
|
|
total_files = 0
|
|
|
total_size = 0
|
|
|
meta = {}
|
|
|
|
|
|
for shard, key_file in self.iter_keys():
|
|
|
json_key = f"{shard.storage_medium}/{key_file}"
|
|
|
with shard.fs.open(json_key, 'rb') as f:
|
|
|
total_files += 1
|
|
|
metadata = json.loads(f.read())
|
|
|
total_size += metadata['size']
|
|
|
|
|
|
return total_files, total_size, meta
|
|
|
|