base.py
269 lines
| 9.5 KiB
| text/x-python
|
PythonLexer
r5516 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import fsspec # noqa | ||||
import logging | ||||
from rhodecode.lib.ext_json import json | ||||
from rhodecode.apps.file_store.utils import sha256_safe, ShardFileReader, get_uid_filename | ||||
from rhodecode.apps.file_store.extensions import resolve_extensions | ||||
from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException # noqa: F401 | ||||
log = logging.getLogger(__name__) | ||||
class BaseShard: | ||||
metadata_suffix: str = '.metadata' | ||||
storage_type: str = '' | ||||
fs = None | ||||
@property | ||||
def storage_medium(self): | ||||
if not self.storage_type: | ||||
raise ValueError('No storage type set for this shard storage_type=""') | ||||
return getattr(self, self.storage_type) | ||||
def __contains__(self, key): | ||||
full_path = self.store_path(key) | ||||
return self.fs.exists(full_path) | ||||
def metadata_convert(self, uid_filename, metadata): | ||||
return metadata | ||||
def get_metadata_filename(self, uid_filename) -> tuple[str, str]: | ||||
metadata_file: str = f'{uid_filename}{self.metadata_suffix}' | ||||
return metadata_file, self.store_path(metadata_file) | ||||
def get_metadata(self, uid_filename, ignore_missing=False) -> dict: | ||||
_metadata_file, metadata_file_path = self.get_metadata_filename(uid_filename) | ||||
if ignore_missing and not self.fs.exists(metadata_file_path): | ||||
return {} | ||||
with self.fs.open(metadata_file_path, 'rb') as f: | ||||
metadata = json.loads(f.read()) | ||||
metadata = self.metadata_convert(uid_filename, metadata) | ||||
return metadata | ||||
def _store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): | ||||
raise NotImplementedError | ||||
def store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): | ||||
return self._store(key, uid_key, value_reader, max_filesize, metadata, **kwargs) | ||||
def _fetch(self, key, presigned_url_expires: int = 0): | ||||
raise NotImplementedError | ||||
def fetch(self, key, **kwargs) -> tuple[ShardFileReader, dict]: | ||||
return self._fetch(key) | ||||
def _delete(self, key): | ||||
if key not in self: | ||||
log.exception(f'requested key={key} not found in {self}') | ||||
raise KeyError(key) | ||||
metadata = self.get_metadata(key) | ||||
_metadata_file, metadata_file_path = self.get_metadata_filename(key) | ||||
artifact_file_path = metadata['filename_uid_path'] | ||||
self.fs.rm(artifact_file_path) | ||||
self.fs.rm(metadata_file_path) | ||||
return 1 | ||||
def delete(self, key): | ||||
raise NotImplementedError | ||||
def store_path(self, uid_filename): | ||||
raise NotImplementedError | ||||
class BaseFileStoreBackend: | ||||
_shards = tuple() | ||||
_shard_cls = BaseShard | ||||
_config: dict | None = None | ||||
_storage_path: str = '' | ||||
def __init__(self, settings, extension_groups=None): | ||||
self._config = settings | ||||
extension_groups = extension_groups or ['any'] | ||||
self.extensions = resolve_extensions([], groups=extension_groups) | ||||
def __contains__(self, key): | ||||
return self.filename_exists(key) | ||||
def __repr__(self): | ||||
return f'<{self.__class__.__name__}(storage={self.storage_path})>' | ||||
@property | ||||
def storage_path(self): | ||||
return self._storage_path | ||||
@classmethod | ||||
def get_shard_index(cls, filename: str, num_shards) -> int: | ||||
# Generate a hash value from the filename | ||||
hash_value = sha256_safe(filename) | ||||
# Convert the hash value to an integer | ||||
hash_int = int(hash_value, 16) | ||||
# Map the hash integer to a shard number between 1 and num_shards | ||||
shard_number = (hash_int % num_shards) | ||||
return shard_number | ||||
@classmethod | ||||
def apply_counter(cls, counter: int, filename: str) -> str: | ||||
""" | ||||
Apply a counter to the filename. | ||||
:param counter: The counter value to apply. | ||||
:param filename: The original filename. | ||||
:return: The modified filename with the counter. | ||||
""" | ||||
name_counted = f'{counter:d}-{filename}' | ||||
return name_counted | ||||
def _get_shard(self, key) -> _shard_cls: | ||||
index = self.get_shard_index(key, len(self._shards)) | ||||
shard = self._shards[index] | ||||
return shard | ||||
def get_conf(self, key, pop=False): | ||||
if key not in self._config: | ||||
raise ValueError( | ||||
f"No configuration key '{key}', please make sure it exists in filestore config") | ||||
val = self._config[key] | ||||
if pop: | ||||
del self._config[key] | ||||
return val | ||||
def filename_allowed(self, filename, extensions=None): | ||||
"""Checks if a filename has an allowed extension | ||||
:param filename: base name of file | ||||
:param extensions: iterable of extensions (or self.extensions) | ||||
""" | ||||
_, ext = os.path.splitext(filename) | ||||
return self.extension_allowed(ext, extensions) | ||||
def extension_allowed(self, ext, extensions=None): | ||||
""" | ||||
Checks if an extension is permitted. Both e.g. ".jpg" and | ||||
"jpg" can be passed in. Extension lookup is case-insensitive. | ||||
:param ext: extension to check | ||||
:param extensions: iterable of extensions to validate against (or self.extensions) | ||||
""" | ||||
def normalize_ext(_ext): | ||||
if _ext.startswith('.'): | ||||
_ext = _ext[1:] | ||||
return _ext.lower() | ||||
extensions = extensions or self.extensions | ||||
if not extensions: | ||||
return True | ||||
ext = normalize_ext(ext) | ||||
return ext in [normalize_ext(x) for x in extensions] | ||||
def filename_exists(self, uid_filename): | ||||
shard = self._get_shard(uid_filename) | ||||
return uid_filename in shard | ||||
def store_path(self, uid_filename): | ||||
""" | ||||
Returns absolute file path of the uid_filename | ||||
""" | ||||
shard = self._get_shard(uid_filename) | ||||
return shard.store_path(uid_filename) | ||||
def store_metadata(self, uid_filename): | ||||
shard = self._get_shard(uid_filename) | ||||
return shard.get_metadata_filename(uid_filename) | ||||
def store(self, filename, value_reader, extensions=None, metadata=None, max_filesize=None, randomized_name=True, **kwargs): | ||||
extensions = extensions or self.extensions | ||||
if not self.filename_allowed(filename, extensions): | ||||
msg = f'filename {filename} does not allow extensions {extensions}' | ||||
raise FileNotAllowedException(msg) | ||||
# # TODO: check why we need this setting ? it looks stupid... | ||||
# no_body_seek is used in stream mode importer somehow | ||||
# no_body_seek = kwargs.pop('no_body_seek', False) | ||||
# if no_body_seek: | ||||
# pass | ||||
# else: | ||||
# value_reader.seek(0) | ||||
uid_filename = kwargs.pop('uid_filename', None) | ||||
if uid_filename is None: | ||||
uid_filename = get_uid_filename(filename, randomized=randomized_name) | ||||
shard = self._get_shard(uid_filename) | ||||
return shard.store(filename, uid_filename, value_reader, max_filesize, metadata, **kwargs) | ||||
def import_to_store(self, value_reader, org_filename, uid_filename, metadata, **kwargs): | ||||
shard = self._get_shard(uid_filename) | ||||
max_filesize = None | ||||
return shard.store(org_filename, uid_filename, value_reader, max_filesize, metadata, import_mode=True) | ||||
def delete(self, uid_filename): | ||||
shard = self._get_shard(uid_filename) | ||||
return shard.delete(uid_filename) | ||||
def fetch(self, uid_filename) -> tuple[ShardFileReader, dict]: | ||||
shard = self._get_shard(uid_filename) | ||||
return shard.fetch(uid_filename) | ||||
def get_metadata(self, uid_filename, ignore_missing=False) -> dict: | ||||
shard = self._get_shard(uid_filename) | ||||
return shard.get_metadata(uid_filename, ignore_missing=ignore_missing) | ||||
def iter_keys(self): | ||||
for shard in self._shards: | ||||
if shard.fs.exists(shard.storage_medium): | ||||
for path, _dirs, _files in shard.fs.walk(shard.storage_medium): | ||||
for key_file_path in _files: | ||||
if key_file_path.endswith(shard.metadata_suffix): | ||||
yield shard, key_file_path | ||||
def iter_artifacts(self): | ||||
for shard, key_file in self.iter_keys(): | ||||
json_key = f"{shard.storage_medium}/{key_file}" | ||||
with shard.fs.open(json_key, 'rb') as f: | ||||
yield shard, json.loads(f.read())['filename_uid'] | ||||
def get_statistics(self): | ||||
total_files = 0 | ||||
total_size = 0 | ||||
meta = {} | ||||
for shard, key_file in self.iter_keys(): | ||||
json_key = f"{shard.storage_medium}/{key_file}" | ||||
with shard.fs.open(json_key, 'rb') as f: | ||||
total_files += 1 | ||||
metadata = json.loads(f.read()) | ||||
total_size += metadata['size'] | ||||
return total_files, total_size, meta | ||||