|
|
# Copyright (C) 2016-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
import errno
|
|
|
import os
|
|
|
import hashlib
|
|
|
import functools
|
|
|
import time
|
|
|
import logging
|
|
|
|
|
|
from .. import config_keys
|
|
|
from ..exceptions import FileOverSizeException
|
|
|
from ..backends.base import BaseFileStoreBackend, fsspec, BaseShard, ShardFileReader
|
|
|
|
|
|
from ....lib.ext_json import json
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class LegacyFileSystemShard(BaseShard):
|
|
|
# legacy ver
|
|
|
METADATA_VER = 'v2'
|
|
|
BACKEND_TYPE = config_keys.backend_legacy_filesystem
|
|
|
storage_type: str = 'dir_struct'
|
|
|
|
|
|
# legacy suffix
|
|
|
metadata_suffix: str = '.meta'
|
|
|
|
|
|
@classmethod
|
|
|
def _sub_store_from_filename(cls, filename):
|
|
|
return filename[:2]
|
|
|
|
|
|
@classmethod
|
|
|
def apply_counter(cls, counter, filename):
|
|
|
name_counted = '%d-%s' % (counter, filename)
|
|
|
return name_counted
|
|
|
|
|
|
@classmethod
|
|
|
def safe_make_dirs(cls, dir_path):
|
|
|
if not os.path.exists(dir_path):
|
|
|
try:
|
|
|
os.makedirs(dir_path)
|
|
|
except OSError as e:
|
|
|
if e.errno != errno.EEXIST:
|
|
|
raise
|
|
|
return
|
|
|
|
|
|
@classmethod
|
|
|
def resolve_name(cls, name, directory):
|
|
|
"""
|
|
|
Resolves a unique name and the correct path. If a filename
|
|
|
for that path already exists then a numeric prefix with values > 0 will be
|
|
|
added, for example test.jpg -> 1-test.jpg etc. initially file would have 0 prefix.
|
|
|
|
|
|
:param name: base name of file
|
|
|
:param directory: absolute directory path
|
|
|
"""
|
|
|
|
|
|
counter = 0
|
|
|
while True:
|
|
|
name_counted = cls.apply_counter(counter, name)
|
|
|
|
|
|
# sub_store prefix to optimize disk usage, e.g some_path/ab/final_file
|
|
|
sub_store: str = cls._sub_store_from_filename(name_counted)
|
|
|
sub_store_path: str = os.path.join(directory, sub_store)
|
|
|
cls.safe_make_dirs(sub_store_path)
|
|
|
|
|
|
path = os.path.join(sub_store_path, name_counted)
|
|
|
if not os.path.exists(path):
|
|
|
return name_counted, path
|
|
|
counter += 1
|
|
|
|
|
|
def __init__(self, index, directory, directory_folder, fs, **settings):
|
|
|
self._index: int = index
|
|
|
self._directory: str = directory
|
|
|
self._directory_folder: str = directory_folder
|
|
|
self.fs = fs
|
|
|
|
|
|
@property
|
|
|
def dir_struct(self) -> str:
|
|
|
"""Cache directory final path."""
|
|
|
return os.path.join(self._directory, '0-')
|
|
|
|
|
|
def _write_file(self, full_path, iterator, max_filesize, mode='wb'):
|
|
|
|
|
|
# ensure dir exists
|
|
|
destination, _ = os.path.split(full_path)
|
|
|
if not self.fs.exists(destination):
|
|
|
self.fs.makedirs(destination)
|
|
|
|
|
|
writer = self.fs.open(full_path, mode)
|
|
|
|
|
|
digest = hashlib.sha256()
|
|
|
oversize_cleanup = False
|
|
|
with writer:
|
|
|
size = 0
|
|
|
for chunk in iterator:
|
|
|
size += len(chunk)
|
|
|
digest.update(chunk)
|
|
|
writer.write(chunk)
|
|
|
|
|
|
if max_filesize and size > max_filesize:
|
|
|
# free up the copied file, and raise exc
|
|
|
oversize_cleanup = True
|
|
|
break
|
|
|
|
|
|
writer.flush()
|
|
|
# Get the file descriptor
|
|
|
fd = writer.fileno()
|
|
|
|
|
|
# Sync the file descriptor to disk, helps with NFS cases...
|
|
|
os.fsync(fd)
|
|
|
|
|
|
if oversize_cleanup:
|
|
|
self.fs.rm(full_path)
|
|
|
raise FileOverSizeException(f'given file is over size limit ({max_filesize}): {full_path}')
|
|
|
|
|
|
sha256 = digest.hexdigest()
|
|
|
log.debug('written new artifact under %s, sha256: %s', full_path, sha256)
|
|
|
return size, sha256
|
|
|
|
|
|
def _store(self, key: str, uid_key, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs):
|
|
|
|
|
|
filename = key
|
|
|
uid_filename = uid_key
|
|
|
|
|
|
# NOTE:, also apply N- Counter...
|
|
|
uid_filename, full_path = self.resolve_name(uid_filename, self._directory)
|
|
|
|
|
|
# STORE METADATA
|
|
|
# TODO: make it compatible, and backward proof
|
|
|
_metadata = {
|
|
|
"version": self.METADATA_VER,
|
|
|
|
|
|
"filename": filename,
|
|
|
"filename_uid_path": full_path,
|
|
|
"filename_uid": uid_filename,
|
|
|
"sha256": "", # NOTE: filled in by reader iteration
|
|
|
|
|
|
"store_time": time.time(),
|
|
|
|
|
|
"size": 0
|
|
|
}
|
|
|
if metadata:
|
|
|
_metadata.update(metadata)
|
|
|
|
|
|
read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
|
|
|
size, sha256 = self._write_file(full_path, read_iterator, max_filesize)
|
|
|
_metadata['size'] = size
|
|
|
_metadata['sha256'] = sha256
|
|
|
|
|
|
# after storing the artifacts, we write the metadata present
|
|
|
_metadata_file, metadata_file_path = self.get_metadata_filename(uid_filename)
|
|
|
|
|
|
with self.fs.open(metadata_file_path, 'wb') as f:
|
|
|
f.write(json.dumps(_metadata))
|
|
|
|
|
|
return uid_filename, _metadata
|
|
|
|
|
|
def store_path(self, uid_filename):
|
|
|
"""
|
|
|
Returns absolute file path of the uid_filename
|
|
|
"""
|
|
|
prefix_dir = ''
|
|
|
if '/' in uid_filename:
|
|
|
prefix_dir, filename = uid_filename.split('/')
|
|
|
sub_store = self._sub_store_from_filename(filename)
|
|
|
else:
|
|
|
sub_store = self._sub_store_from_filename(uid_filename)
|
|
|
|
|
|
return os.path.join(self._directory, prefix_dir, sub_store, uid_filename)
|
|
|
|
|
|
def metadata_convert(self, uid_filename, metadata):
|
|
|
# NOTE: backward compat mode here... this is for file created PRE 5.2 system
|
|
|
if 'meta_ver' in metadata:
|
|
|
full_path = self.store_path(uid_filename)
|
|
|
metadata = {
|
|
|
"_converted": True,
|
|
|
"_org": metadata,
|
|
|
"version": self.METADATA_VER,
|
|
|
"store_type": self.BACKEND_TYPE,
|
|
|
|
|
|
"filename": metadata['filename'],
|
|
|
"filename_uid_path": full_path,
|
|
|
"filename_uid": uid_filename,
|
|
|
"sha256": metadata['sha256'],
|
|
|
|
|
|
"store_time": metadata['time'],
|
|
|
|
|
|
"size": metadata['size']
|
|
|
}
|
|
|
return metadata
|
|
|
|
|
|
def _fetch(self, key, presigned_url_expires: int = 0):
|
|
|
if key not in self:
|
|
|
log.exception(f'requested key={key} not found in {self}')
|
|
|
raise KeyError(key)
|
|
|
|
|
|
metadata = self.get_metadata(key)
|
|
|
|
|
|
file_path = metadata['filename_uid_path']
|
|
|
if presigned_url_expires and presigned_url_expires > 0:
|
|
|
metadata['url'] = self.fs.url(file_path, expires=presigned_url_expires)
|
|
|
|
|
|
return ShardFileReader(self.fs.open(file_path, 'rb')), metadata
|
|
|
|
|
|
def delete(self, key):
|
|
|
return self._delete(key)
|
|
|
|
|
|
def _delete(self, key):
|
|
|
if key not in self:
|
|
|
log.exception(f'requested key={key} not found in {self}')
|
|
|
raise KeyError(key)
|
|
|
|
|
|
metadata = self.get_metadata(key)
|
|
|
metadata_file, metadata_file_path = self.get_metadata_filename(key)
|
|
|
artifact_file_path = metadata['filename_uid_path']
|
|
|
self.fs.rm(artifact_file_path)
|
|
|
self.fs.rm(metadata_file_path)
|
|
|
|
|
|
def get_metadata_filename(self, uid_filename) -> tuple[str, str]:
|
|
|
|
|
|
metadata_file: str = f'{uid_filename}{self.metadata_suffix}'
|
|
|
uid_path_in_store = self.store_path(uid_filename)
|
|
|
|
|
|
metadata_file_path = f'{uid_path_in_store}{self.metadata_suffix}'
|
|
|
return metadata_file, metadata_file_path
|
|
|
|
|
|
|
|
|
class LegacyFileSystemBackend(BaseFileStoreBackend):
|
|
|
_shard_cls = LegacyFileSystemShard
|
|
|
|
|
|
def __init__(self, settings):
|
|
|
super().__init__(settings)
|
|
|
|
|
|
store_dir = self.get_conf(config_keys.legacy_filesystem_storage_path)
|
|
|
directory = os.path.expanduser(store_dir)
|
|
|
|
|
|
self._directory = directory
|
|
|
self._storage_path = directory # common path for all from BaseCache
|
|
|
|
|
|
log.debug('Initializing %s file_store instance', self)
|
|
|
fs = fsspec.filesystem('file')
|
|
|
|
|
|
if not fs.exists(self._directory):
|
|
|
fs.makedirs(self._directory, exist_ok=True)
|
|
|
|
|
|
# legacy system uses single shard
|
|
|
self._shards = tuple(
|
|
|
[
|
|
|
self._shard_cls(
|
|
|
index=0,
|
|
|
directory=directory,
|
|
|
directory_folder='',
|
|
|
fs=fs,
|
|
|
**settings,
|
|
|
)
|
|
|
]
|
|
|
)
|
|
|
|
|
|
@classmethod
|
|
|
def get_shard_index(cls, filename: str, num_shards) -> int:
|
|
|
# legacy filesystem doesn't use shards, and always uses single shard
|
|
|
return 0
|
|
|
|