##// END OF EJS Templates
feat(artifacts): new artifact storage engines allowing an s3 based uploads
feat(artifacts): new artifact storage engines allowing an s3 based uploads

File last commit:

r5516:3496180b default
r5516:3496180b default
Show More
base.py
269 lines | 9.5 KiB | text/x-python | PythonLexer
feat(artifacts): new artifact storage engines allowing an s3 based uploads
r5516 # Copyright (C) 2016-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import fsspec # noqa
import logging
from rhodecode.lib.ext_json import json
from rhodecode.apps.file_store.utils import sha256_safe, ShardFileReader, get_uid_filename
from rhodecode.apps.file_store.extensions import resolve_extensions
from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException # noqa: F401
log = logging.getLogger(__name__)
class BaseShard:
metadata_suffix: str = '.metadata'
storage_type: str = ''
fs = None
@property
def storage_medium(self):
if not self.storage_type:
raise ValueError('No storage type set for this shard storage_type=""')
return getattr(self, self.storage_type)
def __contains__(self, key):
full_path = self.store_path(key)
return self.fs.exists(full_path)
def metadata_convert(self, uid_filename, metadata):
return metadata
def get_metadata_filename(self, uid_filename) -> tuple[str, str]:
metadata_file: str = f'{uid_filename}{self.metadata_suffix}'
return metadata_file, self.store_path(metadata_file)
def get_metadata(self, uid_filename, ignore_missing=False) -> dict:
_metadata_file, metadata_file_path = self.get_metadata_filename(uid_filename)
if ignore_missing and not self.fs.exists(metadata_file_path):
return {}
with self.fs.open(metadata_file_path, 'rb') as f:
metadata = json.loads(f.read())
metadata = self.metadata_convert(uid_filename, metadata)
return metadata
def _store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs):
raise NotImplementedError
def store(self, key: str, uid_key: str, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs):
return self._store(key, uid_key, value_reader, max_filesize, metadata, **kwargs)
def _fetch(self, key, presigned_url_expires: int = 0):
raise NotImplementedError
def fetch(self, key, **kwargs) -> tuple[ShardFileReader, dict]:
return self._fetch(key)
def _delete(self, key):
if key not in self:
log.exception(f'requested key={key} not found in {self}')
raise KeyError(key)
metadata = self.get_metadata(key)
_metadata_file, metadata_file_path = self.get_metadata_filename(key)
artifact_file_path = metadata['filename_uid_path']
self.fs.rm(artifact_file_path)
self.fs.rm(metadata_file_path)
return 1
def delete(self, key):
raise NotImplementedError
def store_path(self, uid_filename):
raise NotImplementedError
class BaseFileStoreBackend:
_shards = tuple()
_shard_cls = BaseShard
_config: dict | None = None
_storage_path: str = ''
def __init__(self, settings, extension_groups=None):
self._config = settings
extension_groups = extension_groups or ['any']
self.extensions = resolve_extensions([], groups=extension_groups)
def __contains__(self, key):
return self.filename_exists(key)
def __repr__(self):
return f'<{self.__class__.__name__}(storage={self.storage_path})>'
@property
def storage_path(self):
return self._storage_path
@classmethod
def get_shard_index(cls, filename: str, num_shards) -> int:
# Generate a hash value from the filename
hash_value = sha256_safe(filename)
# Convert the hash value to an integer
hash_int = int(hash_value, 16)
# Map the hash integer to a shard number between 1 and num_shards
shard_number = (hash_int % num_shards)
return shard_number
@classmethod
def apply_counter(cls, counter: int, filename: str) -> str:
"""
Apply a counter to the filename.
:param counter: The counter value to apply.
:param filename: The original filename.
:return: The modified filename with the counter.
"""
name_counted = f'{counter:d}-{filename}'
return name_counted
def _get_shard(self, key) -> _shard_cls:
index = self.get_shard_index(key, len(self._shards))
shard = self._shards[index]
return shard
def get_conf(self, key, pop=False):
if key not in self._config:
raise ValueError(
f"No configuration key '{key}', please make sure it exists in filestore config")
val = self._config[key]
if pop:
del self._config[key]
return val
def filename_allowed(self, filename, extensions=None):
"""Checks if a filename has an allowed extension
:param filename: base name of file
:param extensions: iterable of extensions (or self.extensions)
"""
_, ext = os.path.splitext(filename)
return self.extension_allowed(ext, extensions)
def extension_allowed(self, ext, extensions=None):
"""
Checks if an extension is permitted. Both e.g. ".jpg" and
"jpg" can be passed in. Extension lookup is case-insensitive.
:param ext: extension to check
:param extensions: iterable of extensions to validate against (or self.extensions)
"""
def normalize_ext(_ext):
if _ext.startswith('.'):
_ext = _ext[1:]
return _ext.lower()
extensions = extensions or self.extensions
if not extensions:
return True
ext = normalize_ext(ext)
return ext in [normalize_ext(x) for x in extensions]
def filename_exists(self, uid_filename):
shard = self._get_shard(uid_filename)
return uid_filename in shard
def store_path(self, uid_filename):
"""
Returns absolute file path of the uid_filename
"""
shard = self._get_shard(uid_filename)
return shard.store_path(uid_filename)
def store_metadata(self, uid_filename):
shard = self._get_shard(uid_filename)
return shard.get_metadata_filename(uid_filename)
def store(self, filename, value_reader, extensions=None, metadata=None, max_filesize=None, randomized_name=True, **kwargs):
extensions = extensions or self.extensions
if not self.filename_allowed(filename, extensions):
msg = f'filename {filename} does not allow extensions {extensions}'
raise FileNotAllowedException(msg)
# # TODO: check why we need this setting ? it looks stupid...
# no_body_seek is used in stream mode importer somehow
# no_body_seek = kwargs.pop('no_body_seek', False)
# if no_body_seek:
# pass
# else:
# value_reader.seek(0)
uid_filename = kwargs.pop('uid_filename', None)
if uid_filename is None:
uid_filename = get_uid_filename(filename, randomized=randomized_name)
shard = self._get_shard(uid_filename)
return shard.store(filename, uid_filename, value_reader, max_filesize, metadata, **kwargs)
def import_to_store(self, value_reader, org_filename, uid_filename, metadata, **kwargs):
shard = self._get_shard(uid_filename)
max_filesize = None
return shard.store(org_filename, uid_filename, value_reader, max_filesize, metadata, import_mode=True)
def delete(self, uid_filename):
shard = self._get_shard(uid_filename)
return shard.delete(uid_filename)
def fetch(self, uid_filename) -> tuple[ShardFileReader, dict]:
shard = self._get_shard(uid_filename)
return shard.fetch(uid_filename)
def get_metadata(self, uid_filename, ignore_missing=False) -> dict:
shard = self._get_shard(uid_filename)
return shard.get_metadata(uid_filename, ignore_missing=ignore_missing)
def iter_keys(self):
for shard in self._shards:
if shard.fs.exists(shard.storage_medium):
for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
for key_file_path in _files:
if key_file_path.endswith(shard.metadata_suffix):
yield shard, key_file_path
def iter_artifacts(self):
for shard, key_file in self.iter_keys():
json_key = f"{shard.storage_medium}/{key_file}"
with shard.fs.open(json_key, 'rb') as f:
yield shard, json.loads(f.read())['filename_uid']
def get_statistics(self):
total_files = 0
total_size = 0
meta = {}
for shard, key_file in self.iter_keys():
json_key = f"{shard.storage_medium}/{key_file}"
with shard.fs.open(json_key, 'rb') as f:
total_files += 1
metadata = json.loads(f.read())
total_size += metadata['size']
return total_files, total_size, meta