|
|
# Copyright (C) 2015-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import codecs
|
|
|
import hashlib
|
|
|
import logging
|
|
|
import os
|
|
|
import typing
|
|
|
|
|
|
import fsspec
|
|
|
|
|
|
from .base import BaseCache, BaseShard
|
|
|
from ..utils import ShardFileReader, NOT_GIVEN
|
|
|
from ...type_utils import str2bool
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class S3Shard(BaseShard):
|
|
|
|
|
|
def __init__(self, index, bucket, bucket_folder, fs, **settings):
|
|
|
self._index: int = index
|
|
|
self._bucket_folder: str = bucket_folder
|
|
|
self.storage_type: str = 'bucket'
|
|
|
self._bucket_main: str = bucket
|
|
|
|
|
|
self.fs = fs
|
|
|
|
|
|
@property
|
|
|
def bucket(self) -> str:
|
|
|
"""Cache bucket final path."""
|
|
|
return os.path.join(self._bucket_main, self._bucket_folder)
|
|
|
|
|
|
def _get_keyfile(self, archive_key) -> tuple[str, str]:
|
|
|
key_file: str = f'{archive_key}-{self.key_suffix}'
|
|
|
return key_file, os.path.join(self.bucket, key_file)
|
|
|
|
|
|
def _get_writer(self, path, mode):
|
|
|
return self.fs.open(path, 'wb')
|
|
|
|
|
|
def _write_file(self, full_path, iterator, mode):
|
|
|
|
|
|
# ensure folder in bucket exists
|
|
|
destination = self.bucket
|
|
|
if not self.fs.exists(destination):
|
|
|
self.fs.mkdir(destination, s3_additional_kwargs={})
|
|
|
|
|
|
writer = self._get_writer(full_path, mode)
|
|
|
|
|
|
digest = hashlib.sha256()
|
|
|
with writer:
|
|
|
size = 0
|
|
|
for chunk in iterator:
|
|
|
size += len(chunk)
|
|
|
digest.update(chunk)
|
|
|
writer.write(chunk)
|
|
|
|
|
|
sha256 = digest.hexdigest()
|
|
|
log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
|
|
|
return size, sha256
|
|
|
|
|
|
def store(self, key, value_reader, metadata: dict | None = None):
|
|
|
return self._store(key, value_reader, metadata, mode='wb')
|
|
|
|
|
|
def fetch(self, key, retry=NOT_GIVEN,
|
|
|
retry_attempts=NOT_GIVEN, retry_backoff=1,
|
|
|
presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
|
|
|
return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
|
|
|
|
|
|
def remove(self, key):
|
|
|
return self._remove(key)
|
|
|
|
|
|
def random_filename(self):
|
|
|
"""Return filename and full-path tuple for file storage.
|
|
|
|
|
|
Filename will be a randomly generated 28 character hexadecimal string
|
|
|
with ".archive_cache" suffixed. Two levels of sub-directories will be used to
|
|
|
reduce the size of directories. On older filesystems, lookups in
|
|
|
directories with many files may be slow.
|
|
|
"""
|
|
|
|
|
|
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
|
|
|
|
|
|
archive_name = hex_name[4:] + '.archive_cache'
|
|
|
filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
|
|
|
|
|
|
full_path = os.path.join(self.bucket, filename)
|
|
|
return archive_name, full_path
|
|
|
|
|
|
def __repr__(self):
|
|
|
return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
|
|
|
|
|
|
|
|
|
class ObjectStoreCache(BaseCache):
|
|
|
shard_name: str = 'shard-{:03d}'
|
|
|
shard_cls = S3Shard
|
|
|
|
|
|
def __init__(self, locking_url, **settings):
|
|
|
"""
|
|
|
Initialize objectstore cache instance.
|
|
|
|
|
|
:param str locking_url: redis url for a lock
|
|
|
:param settings: settings dict
|
|
|
|
|
|
"""
|
|
|
self._locking_url = locking_url
|
|
|
self._config = settings
|
|
|
|
|
|
objectstore_url = self.get_conf('archive_cache.objectstore.url')
|
|
|
self._storage_path = objectstore_url # common path for all from BaseCache
|
|
|
|
|
|
self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
|
|
|
if self._shard_count < 1:
|
|
|
raise ValueError('cache_shards must be 1 or more')
|
|
|
|
|
|
self._bucket = settings.pop('archive_cache.objectstore.bucket')
|
|
|
if not self._bucket:
|
|
|
raise ValueError('archive_cache.objectstore.bucket needs to have a value')
|
|
|
|
|
|
self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
|
|
|
self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
|
|
|
|
|
|
self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
|
|
|
self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
|
|
|
self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
|
|
|
|
|
|
endpoint_url = settings.pop('archive_cache.objectstore.url')
|
|
|
key = settings.pop('archive_cache.objectstore.key')
|
|
|
secret = settings.pop('archive_cache.objectstore.secret')
|
|
|
region = settings.pop('archive_cache.objectstore.region')
|
|
|
|
|
|
log.debug('Initializing %s archival cache instance', self)
|
|
|
|
|
|
fs = fsspec.filesystem(
|
|
|
's3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, client_kwargs={'region_name': region}
|
|
|
)
|
|
|
|
|
|
# init main bucket
|
|
|
if not fs.exists(self._bucket):
|
|
|
fs.mkdir(self._bucket)
|
|
|
|
|
|
self._shards = tuple(
|
|
|
self.shard_cls(
|
|
|
index=num,
|
|
|
bucket=self._bucket,
|
|
|
bucket_folder=self.shard_name.format(num),
|
|
|
fs=fs,
|
|
|
**settings,
|
|
|
)
|
|
|
for num in range(self._shard_count)
|
|
|
)
|
|
|
self._hash = self._shards[0].hash
|
|
|
|
|
|
def _get_size(self, shard, archive_path):
|
|
|
return shard.fs.info(archive_path)['size']
|
|
|
|
|
|
def set_presigned_url_expiry(self, val: int) -> None:
|
|
|
self.presigned_url_expires = val
|
|
|
|