objectstore_cache.py
173 lines
| 6.2 KiB
| text/x-python
|
PythonLexer
r5433 | # Copyright (C) 2015-2024 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import codecs | ||||
import hashlib | ||||
import logging | ||||
import os | ||||
r5450 | import typing | |||
r5433 | ||||
import fsspec | ||||
from .base import BaseCache, BaseShard | ||||
from ..utils import ShardFileReader, NOT_GIVEN | ||||
from ...type_utils import str2bool | ||||
log = logging.getLogger(__name__) | ||||
class S3Shard(BaseShard): | ||||
r5447 | def __init__(self, index, bucket, bucket_folder, fs, **settings): | |||
r5450 | self._index: int = index | |||
self._bucket_folder: str = bucket_folder | ||||
self.storage_type: str = 'bucket' | ||||
self._bucket_main: str = bucket | ||||
r5433 | ||||
r5447 | self.fs = fs | |||
r5433 | ||||
@property | ||||
r5450 | def bucket(self) -> str: | |||
r5447 | """Cache bucket final path.""" | |||
return os.path.join(self._bucket_main, self._bucket_folder) | ||||
r5433 | ||||
def _get_keyfile(self, archive_key) -> tuple[str, str]: | ||||
r5450 | key_file: str = f'{archive_key}-{self.key_suffix}' | |||
r5433 | return key_file, os.path.join(self.bucket, key_file) | |||
def _get_writer(self, path, mode): | ||||
return self.fs.open(path, 'wb') | ||||
def _write_file(self, full_path, iterator, mode): | ||||
r5445 | ||||
r5447 | # ensure folder in bucket exists | |||
r5433 | destination = self.bucket | |||
if not self.fs.exists(destination): | ||||
self.fs.mkdir(destination, s3_additional_kwargs={}) | ||||
writer = self._get_writer(full_path, mode) | ||||
digest = hashlib.sha256() | ||||
with writer: | ||||
size = 0 | ||||
for chunk in iterator: | ||||
size += len(chunk) | ||||
digest.update(chunk) | ||||
writer.write(chunk) | ||||
sha256 = digest.hexdigest() | ||||
log.debug('written new archive cache under %s, sha256: %s', full_path, sha256) | ||||
return size, sha256 | ||||
def store(self, key, value_reader, metadata: dict | None = None): | ||||
return self._store(key, value_reader, metadata, mode='wb') | ||||
r5450 | def fetch(self, key, retry=NOT_GIVEN, | |||
retry_attempts=NOT_GIVEN, retry_backoff=1, | ||||
presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: | ||||
return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires) | ||||
r5433 | ||||
def remove(self, key): | ||||
return self._remove(key) | ||||
def random_filename(self): | ||||
"""Return filename and full-path tuple for file storage. | ||||
Filename will be a randomly generated 28 character hexadecimal string | ||||
with ".archive_cache" suffixed. Two levels of sub-directories will be used to | ||||
reduce the size of directories. On older filesystems, lookups in | ||||
directories with many files may be slow. | ||||
""" | ||||
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') | ||||
archive_name = hex_name[4:] + '.archive_cache' | ||||
filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}" | ||||
full_path = os.path.join(self.bucket, filename) | ||||
return archive_name, full_path | ||||
def __repr__(self): | ||||
return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})' | ||||
class ObjectStoreCache(BaseCache): | ||||
r5450 | shard_name: str = 'shard-{:03d}' | |||
r5447 | shard_cls = S3Shard | |||
r5433 | ||||
def __init__(self, locking_url, **settings): | ||||
""" | ||||
Initialize objectstore cache instance. | ||||
:param str locking_url: redis url for a lock | ||||
:param settings: settings dict | ||||
""" | ||||
self._locking_url = locking_url | ||||
self._config = settings | ||||
objectstore_url = self.get_conf('archive_cache.objectstore.url') | ||||
r5447 | self._storage_path = objectstore_url # common path for all from BaseCache | |||
r5433 | ||||
r5447 | self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) | |||
if self._shard_count < 1: | ||||
raise ValueError('cache_shards must be 1 or more') | ||||
self._bucket = settings.pop('archive_cache.objectstore.bucket') | ||||
if not self._bucket: | ||||
raise ValueError('archive_cache.objectstore.bucket needs to have a value') | ||||
r5433 | ||||
self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True) | ||||
self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb'))) | ||||
self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True)) | ||||
self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True)) | ||||
self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True)) | ||||
r5447 | endpoint_url = settings.pop('archive_cache.objectstore.url') | |||
key = settings.pop('archive_cache.objectstore.key') | ||||
secret = settings.pop('archive_cache.objectstore.secret') | ||||
r5456 | region = settings.pop('archive_cache.objectstore.region') | |||
r5447 | ||||
r5448 | log.debug('Initializing %s archival cache instance', self) | |||
r5447 | ||||
r5457 | fs = fsspec.filesystem( | |||
's3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, client_kwargs={'region_name': region} | ||||
) | ||||
r5447 | ||||
# init main bucket | ||||
if not fs.exists(self._bucket): | ||||
fs.mkdir(self._bucket) | ||||
r5433 | self._shards = tuple( | |||
r5447 | self.shard_cls( | |||
r5433 | index=num, | |||
r5447 | bucket=self._bucket, | |||
r5450 | bucket_folder=self.shard_name.format(num), | |||
r5447 | fs=fs, | |||
r5433 | **settings, | |||
) | ||||
r5447 | for num in range(self._shard_count) | |||
r5433 | ) | |||
self._hash = self._shards[0].hash | ||||
def _get_size(self, shard, archive_path): | ||||
return shard.fs.info(archive_path)['size'] | ||||
r5450 | ||||
def set_presigned_url_expiry(self, val: int) -> None: | ||||
self.presigned_url_expires = val | ||||