filesystem_legacy.py
278 lines
| 9.2 KiB
| text/x-python
|
PythonLexer
r5516 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import errno | ||||
import os | ||||
import hashlib | ||||
import functools | ||||
import time | ||||
import logging | ||||
from .. import config_keys | ||||
from ..exceptions import FileOverSizeException | ||||
from ..backends.base import BaseFileStoreBackend, fsspec, BaseShard, ShardFileReader | ||||
from ....lib.ext_json import json | ||||
log = logging.getLogger(__name__) | ||||
class LegacyFileSystemShard(BaseShard): | ||||
# legacy ver | ||||
METADATA_VER = 'v2' | ||||
BACKEND_TYPE = config_keys.backend_legacy_filesystem | ||||
storage_type: str = 'dir_struct' | ||||
# legacy suffix | ||||
metadata_suffix: str = '.meta' | ||||
@classmethod | ||||
def _sub_store_from_filename(cls, filename): | ||||
return filename[:2] | ||||
@classmethod | ||||
def apply_counter(cls, counter, filename): | ||||
name_counted = '%d-%s' % (counter, filename) | ||||
return name_counted | ||||
@classmethod | ||||
def safe_make_dirs(cls, dir_path): | ||||
if not os.path.exists(dir_path): | ||||
try: | ||||
os.makedirs(dir_path) | ||||
except OSError as e: | ||||
if e.errno != errno.EEXIST: | ||||
raise | ||||
return | ||||
@classmethod | ||||
def resolve_name(cls, name, directory): | ||||
""" | ||||
Resolves a unique name and the correct path. If a filename | ||||
for that path already exists then a numeric prefix with values > 0 will be | ||||
added, for example test.jpg -> 1-test.jpg etc. initially file would have 0 prefix. | ||||
:param name: base name of file | ||||
:param directory: absolute directory path | ||||
""" | ||||
counter = 0 | ||||
while True: | ||||
name_counted = cls.apply_counter(counter, name) | ||||
# sub_store prefix to optimize disk usage, e.g some_path/ab/final_file | ||||
sub_store: str = cls._sub_store_from_filename(name_counted) | ||||
sub_store_path: str = os.path.join(directory, sub_store) | ||||
cls.safe_make_dirs(sub_store_path) | ||||
path = os.path.join(sub_store_path, name_counted) | ||||
if not os.path.exists(path): | ||||
return name_counted, path | ||||
counter += 1 | ||||
def __init__(self, index, directory, directory_folder, fs, **settings): | ||||
self._index: int = index | ||||
self._directory: str = directory | ||||
self._directory_folder: str = directory_folder | ||||
self.fs = fs | ||||
@property | ||||
def dir_struct(self) -> str: | ||||
"""Cache directory final path.""" | ||||
return os.path.join(self._directory, '0-') | ||||
def _write_file(self, full_path, iterator, max_filesize, mode='wb'): | ||||
# ensure dir exists | ||||
destination, _ = os.path.split(full_path) | ||||
if not self.fs.exists(destination): | ||||
self.fs.makedirs(destination) | ||||
writer = self.fs.open(full_path, mode) | ||||
digest = hashlib.sha256() | ||||
oversize_cleanup = False | ||||
with writer: | ||||
size = 0 | ||||
for chunk in iterator: | ||||
size += len(chunk) | ||||
digest.update(chunk) | ||||
writer.write(chunk) | ||||
if max_filesize and size > max_filesize: | ||||
# free up the copied file, and raise exc | ||||
oversize_cleanup = True | ||||
break | ||||
writer.flush() | ||||
# Get the file descriptor | ||||
fd = writer.fileno() | ||||
# Sync the file descriptor to disk, helps with NFS cases... | ||||
os.fsync(fd) | ||||
if oversize_cleanup: | ||||
self.fs.rm(full_path) | ||||
raise FileOverSizeException(f'given file is over size limit ({max_filesize}): {full_path}') | ||||
sha256 = digest.hexdigest() | ||||
log.debug('written new artifact under %s, sha256: %s', full_path, sha256) | ||||
return size, sha256 | ||||
def _store(self, key: str, uid_key, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs): | ||||
filename = key | ||||
uid_filename = uid_key | ||||
# NOTE:, also apply N- Counter... | ||||
uid_filename, full_path = self.resolve_name(uid_filename, self._directory) | ||||
# STORE METADATA | ||||
# TODO: make it compatible, and backward proof | ||||
_metadata = { | ||||
"version": self.METADATA_VER, | ||||
"filename": filename, | ||||
"filename_uid_path": full_path, | ||||
"filename_uid": uid_filename, | ||||
"sha256": "", # NOTE: filled in by reader iteration | ||||
"store_time": time.time(), | ||||
"size": 0 | ||||
} | ||||
if metadata: | ||||
_metadata.update(metadata) | ||||
read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') | ||||
size, sha256 = self._write_file(full_path, read_iterator, max_filesize) | ||||
_metadata['size'] = size | ||||
_metadata['sha256'] = sha256 | ||||
# after storing the artifacts, we write the metadata present | ||||
_metadata_file, metadata_file_path = self.get_metadata_filename(uid_filename) | ||||
with self.fs.open(metadata_file_path, 'wb') as f: | ||||
f.write(json.dumps(_metadata)) | ||||
return uid_filename, _metadata | ||||
def store_path(self, uid_filename): | ||||
""" | ||||
Returns absolute file path of the uid_filename | ||||
""" | ||||
prefix_dir = '' | ||||
if '/' in uid_filename: | ||||
prefix_dir, filename = uid_filename.split('/') | ||||
sub_store = self._sub_store_from_filename(filename) | ||||
else: | ||||
sub_store = self._sub_store_from_filename(uid_filename) | ||||
return os.path.join(self._directory, prefix_dir, sub_store, uid_filename) | ||||
def metadata_convert(self, uid_filename, metadata): | ||||
# NOTE: backward compat mode here... this is for file created PRE 5.2 system | ||||
if 'meta_ver' in metadata: | ||||
full_path = self.store_path(uid_filename) | ||||
metadata = { | ||||
"_converted": True, | ||||
"_org": metadata, | ||||
"version": self.METADATA_VER, | ||||
"store_type": self.BACKEND_TYPE, | ||||
"filename": metadata['filename'], | ||||
"filename_uid_path": full_path, | ||||
"filename_uid": uid_filename, | ||||
"sha256": metadata['sha256'], | ||||
"store_time": metadata['time'], | ||||
"size": metadata['size'] | ||||
} | ||||
return metadata | ||||
def _fetch(self, key, presigned_url_expires: int = 0): | ||||
if key not in self: | ||||
log.exception(f'requested key={key} not found in {self}') | ||||
raise KeyError(key) | ||||
metadata = self.get_metadata(key) | ||||
file_path = metadata['filename_uid_path'] | ||||
if presigned_url_expires and presigned_url_expires > 0: | ||||
metadata['url'] = self.fs.url(file_path, expires=presigned_url_expires) | ||||
return ShardFileReader(self.fs.open(file_path, 'rb')), metadata | ||||
def delete(self, key): | ||||
return self._delete(key) | ||||
def _delete(self, key): | ||||
if key not in self: | ||||
log.exception(f'requested key={key} not found in {self}') | ||||
raise KeyError(key) | ||||
metadata = self.get_metadata(key) | ||||
metadata_file, metadata_file_path = self.get_metadata_filename(key) | ||||
artifact_file_path = metadata['filename_uid_path'] | ||||
self.fs.rm(artifact_file_path) | ||||
self.fs.rm(metadata_file_path) | ||||
def get_metadata_filename(self, uid_filename) -> tuple[str, str]: | ||||
metadata_file: str = f'{uid_filename}{self.metadata_suffix}' | ||||
uid_path_in_store = self.store_path(uid_filename) | ||||
metadata_file_path = f'{uid_path_in_store}{self.metadata_suffix}' | ||||
return metadata_file, metadata_file_path | ||||
class LegacyFileSystemBackend(BaseFileStoreBackend): | ||||
_shard_cls = LegacyFileSystemShard | ||||
def __init__(self, settings): | ||||
super().__init__(settings) | ||||
store_dir = self.get_conf(config_keys.legacy_filesystem_storage_path) | ||||
directory = os.path.expanduser(store_dir) | ||||
self._directory = directory | ||||
self._storage_path = directory # common path for all from BaseCache | ||||
log.debug('Initializing %s file_store instance', self) | ||||
fs = fsspec.filesystem('file') | ||||
if not fs.exists(self._directory): | ||||
fs.makedirs(self._directory, exist_ok=True) | ||||
# legacy system uses single shard | ||||
self._shards = tuple( | ||||
[ | ||||
self._shard_cls( | ||||
index=0, | ||||
directory=directory, | ||||
directory_folder='', | ||||
fs=fs, | ||||
**settings, | ||||
) | ||||
] | ||||
) | ||||
@classmethod | ||||
def get_shard_index(cls, filename: str, num_shards) -> int: | ||||
# legacy filesystem doesn't use shards, and always uses single shard | ||||
return 0 | ||||