##// END OF EJS Templates
fix(tests): fixed tests for PR celery hooks deamon
fix(tests): fixed tests for PR celery hooks deamon

File last commit:

r5516:3496180b default
r5589:750c46dc default
Show More
objectstore.py
184 lines | 6.3 KiB | text/x-python | PythonLexer
# Copyright (C) 2016-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import hashlib
import functools
import time
import logging
from .. import config_keys
from ..exceptions import FileOverSizeException
from ..backends.base import BaseFileStoreBackend, fsspec, BaseShard, ShardFileReader
from ....lib.ext_json import json
log = logging.getLogger(__name__)
class S3Shard(BaseShard):
METADATA_VER = 'v2'
BACKEND_TYPE = config_keys.backend_objectstore
storage_type: str = 'bucket'
def __init__(self, index, bucket, bucket_folder, fs, **settings):
self._index: int = index
self._bucket_main: str = bucket
self._bucket_folder: str = bucket_folder
self.fs = fs
@property
def bucket(self) -> str:
"""Cache bucket final path."""
return os.path.join(self._bucket_main, self._bucket_folder)
def _write_file(self, full_path, iterator, max_filesize, mode='wb'):
# ensure dir exists
destination, _ = os.path.split(full_path)
if not self.fs.exists(destination):
self.fs.makedirs(destination)
writer = self.fs.open(full_path, mode)
digest = hashlib.sha256()
oversize_cleanup = False
with writer:
size = 0
for chunk in iterator:
size += len(chunk)
digest.update(chunk)
writer.write(chunk)
if max_filesize and size > max_filesize:
oversize_cleanup = True
# free up the copied file, and raise exc
break
if oversize_cleanup:
self.fs.rm(full_path)
raise FileOverSizeException(f'given file is over size limit ({max_filesize}): {full_path}')
sha256 = digest.hexdigest()
log.debug('written new artifact under %s, sha256: %s', full_path, sha256)
return size, sha256
def _store(self, key: str, uid_key, value_reader, max_filesize: int | None = None, metadata: dict | None = None, **kwargs):
filename = key
uid_filename = uid_key
full_path = self.store_path(uid_filename)
# STORE METADATA
_metadata = {
"version": self.METADATA_VER,
"store_type": self.BACKEND_TYPE,
"filename": filename,
"filename_uid_path": full_path,
"filename_uid": uid_filename,
"sha256": "", # NOTE: filled in by reader iteration
"store_time": time.time(),
"size": 0
}
if metadata:
if kwargs.pop('import_mode', False):
# in import mode, we don't need to compute metadata, we just take the old version
_metadata["import_mode"] = True
else:
_metadata.update(metadata)
read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
size, sha256 = self._write_file(full_path, read_iterator, max_filesize)
_metadata['size'] = size
_metadata['sha256'] = sha256
# after storing the artifacts, we write the metadata present
metadata_file, metadata_file_path = self.get_metadata_filename(uid_key)
with self.fs.open(metadata_file_path, 'wb') as f:
f.write(json.dumps(_metadata))
return uid_filename, _metadata
def store_path(self, uid_filename):
"""
Returns absolute file path of the uid_filename
"""
return os.path.join(self._bucket_main, self._bucket_folder, uid_filename)
def _fetch(self, key, presigned_url_expires: int = 0):
if key not in self:
log.exception(f'requested key={key} not found in {self}')
raise KeyError(key)
metadata_file, metadata_file_path = self.get_metadata_filename(key)
with self.fs.open(metadata_file_path, 'rb') as f:
metadata = json.loads(f.read())
file_path = metadata['filename_uid_path']
if presigned_url_expires and presigned_url_expires > 0:
metadata['url'] = self.fs.url(file_path, expires=presigned_url_expires)
return ShardFileReader(self.fs.open(file_path, 'rb')), metadata
def delete(self, key):
return self._delete(key)
class ObjectStoreBackend(BaseFileStoreBackend):
shard_name: str = 'shard-{:03d}'
_shard_cls = S3Shard
def __init__(self, settings):
super().__init__(settings)
self._shard_count = int(self.get_conf(config_keys.objectstore_bucket_shards, pop=True))
if self._shard_count < 1:
raise ValueError('cache_shards must be 1 or more')
self._bucket = settings.pop(config_keys.objectstore_bucket)
if not self._bucket:
raise ValueError(f'{config_keys.objectstore_bucket} needs to have a value')
objectstore_url = self.get_conf(config_keys.objectstore_url)
key = settings.pop(config_keys.objectstore_key)
secret = settings.pop(config_keys.objectstore_secret)
self._storage_path = objectstore_url # common path for all from BaseCache
log.debug('Initializing %s file_store instance', self)
fs = fsspec.filesystem('s3', anon=False, endpoint_url=objectstore_url, key=key, secret=secret)
# init main bucket
if not fs.exists(self._bucket):
fs.mkdir(self._bucket)
self._shards = tuple(
self._shard_cls(
index=num,
bucket=self._bucket,
bucket_folder=self.shard_name.format(num),
fs=fs,
**settings,
)
for num in range(self._shard_count)
)