##// END OF EJS Templates
feat(archive-cache): implemented s3 based backend for filecaches
feat(archive-cache): implemented s3 based backend for filecaches

File last commit:

r5433:d96689c8 default
r5433:d96689c8 default
Show More
objectstore_cache.py
150 lines | 5.3 KiB | text/x-python | PythonLexer
# Copyright (C) 2015-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import codecs
import hashlib
import logging
import os
import fsspec
from .base import BaseCache, BaseShard
from ..utils import ShardFileReader, NOT_GIVEN
from ...type_utils import str2bool
log = logging.getLogger(__name__)
class S3Shard(BaseShard):
def __init__(self, index, bucket, **settings):
self._index = index
self._bucket = bucket
self.storage_type = 'bucket'
endpoint_url = settings.pop('archive_cache.objectstore.url')
key = settings.pop('archive_cache.objectstore.key')
secret = settings.pop('archive_cache.objectstore.secret')
self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
@property
def bucket(self):
"""Cache bucket."""
return self._bucket
def _get_keyfile(self, archive_key) -> tuple[str, str]:
key_file = f'{archive_key}-{self.key_suffix}'
return key_file, os.path.join(self.bucket, key_file)
def _get_writer(self, path, mode):
return self.fs.open(path, 'wb')
def _write_file(self, full_path, iterator, mode):
# ensure bucket exists
destination = self.bucket
if not self.fs.exists(destination):
self.fs.mkdir(destination, s3_additional_kwargs={})
writer = self._get_writer(full_path, mode)
digest = hashlib.sha256()
with writer:
size = 0
for chunk in iterator:
size += len(chunk)
digest.update(chunk)
writer.write(chunk)
sha256 = digest.hexdigest()
log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
return size, sha256
def store(self, key, value_reader, metadata: dict | None = None):
return self._store(key, value_reader, metadata, mode='wb')
def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
return self._fetch(key, retry, retry_attempts, retry_backoff)
def remove(self, key):
return self._remove(key)
def random_filename(self):
"""Return filename and full-path tuple for file storage.
Filename will be a randomly generated 28 character hexadecimal string
with ".archive_cache" suffixed. Two levels of sub-directories will be used to
reduce the size of directories. On older filesystems, lookups in
directories with many files may be slow.
"""
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
archive_name = hex_name[4:] + '.archive_cache'
filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
full_path = os.path.join(self.bucket, filename)
return archive_name, full_path
def __repr__(self):
return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
class ObjectStoreCache(BaseCache):
def __init__(self, locking_url, **settings):
"""
Initialize objectstore cache instance.
:param str locking_url: redis url for a lock
:param settings: settings dict
"""
self._locking_url = locking_url
self._config = settings
objectstore_url = self.get_conf('archive_cache.objectstore.url')
self._storage_path = objectstore_url
self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
log.debug('Initializing archival cache instance under %s', objectstore_url)
self._shards = tuple(
S3Shard(
index=num,
bucket='rhodecode-archivecache-%03d' % num,
**settings,
)
for num in range(self._count)
)
self._hash = self._shards[0].hash
def _get_shard(self, key) -> S3Shard:
index = self._hash(key) % self._count
shard = self._shards[index]
return shard
def _get_size(self, shard, archive_path):
return shard.fs.info(archive_path)['size']