# HG changeset patch # User RhodeCode Admin # Date 2024-06-20 08:57:56 # Node ID 6c0bdadc482e59450af2055eac81182bb496984f # Parent df00ef8a7593218a730c552dc1ca097ad2996481 feat(archive-cache): added presigned_url support for objectstore diff --git a/rhodecode/lib/archive_cache/backends/base.py b/rhodecode/lib/archive_cache/backends/base.py --- a/rhodecode/lib/archive_cache/backends/base.py +++ b/rhodecode/lib/archive_cache/backends/base.py @@ -54,6 +54,9 @@ class BaseShard: def random_filename(self): raise NotImplementedError + def store(self, *args, **kwargs): + raise NotImplementedError + def _store(self, key, value_reader, metadata, mode): (filename, # hash-name full_path # full-path/hash-name @@ -91,7 +94,11 @@ class BaseShard: return key, filename, size, _metadata - def _fetch(self, key, retry, retry_attempts, retry_backoff): + def fetch(self, *args, **kwargs): + raise NotImplementedError + + def _fetch(self, key, retry, retry_attempts, retry_backoff, + presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: if retry is NOT_GIVEN: retry = False if retry_attempts is NOT_GIVEN: @@ -113,6 +120,8 @@ class BaseShard: metadata = json.loads(f.read()) archive_path = metadata['archive_full_path'] + if presigned_url_expires and presigned_url_expires > 0: + metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires) try: return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata @@ -125,6 +134,9 @@ class BaseShard: with self.fs.open(key_file_path, 'wb') as f: f.write(json.dumps(metadata)) + def remove(self, *args, **kwargs): + raise NotImplementedError + def _remove(self, key): if key not in self: log.exception(f'requested key={key} not found in {self}') @@ -161,12 +173,14 @@ class BaseShard: class BaseCache: _locking_url: str = '' _storage_path: str = '' - _config = {} + _config: dict = {} retry = False - retry_attempts = 0 - retry_backoff = 1 + retry_attempts: int = 0 + retry_backoff: int | float = 1 _shards = tuple() shard_cls = BaseShard + # define the presigned url expiration, 0 == disabled + presigned_url_expires: int = 0 def __contains__(self, key): """Return `True` if `key` matching item is found in cache. @@ -221,9 +235,13 @@ class BaseCache: if retry_attempts is NOT_GIVEN: retry_attempts = self.retry_attempts retry_backoff = self.retry_backoff + presigned_url_expires = self.presigned_url_expires shard = self._get_shard(key) - return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff) + return shard.fetch(key, retry=retry, + retry_attempts=retry_attempts, + retry_backoff=retry_backoff, + presigned_url_expires=presigned_url_expires) def remove(self, key): shard = self._get_shard(key) @@ -352,4 +370,3 @@ class BaseCache: total_size += metadata['size'] return total_files, total_size, meta - diff --git a/rhodecode/lib/archive_cache/backends/fanout_cache.py b/rhodecode/lib/archive_cache/backends/fanout_cache.py --- a/rhodecode/lib/archive_cache/backends/fanout_cache.py +++ b/rhodecode/lib/archive_cache/backends/fanout_cache.py @@ -20,6 +20,7 @@ import codecs import hashlib import logging import os +import typing import fsspec @@ -33,20 +34,20 @@ log = logging.getLogger(__name__) class FileSystemShard(BaseShard): def __init__(self, index, directory, directory_folder, fs, **settings): - self._index = index - self._directory = directory - self._directory_folder = directory_folder - self.storage_type = 'directory' + self._index: int = index + self._directory: str = directory + self._directory_folder: str = directory_folder + self.storage_type: str = 'directory' self.fs = fs @property - def directory(self): + def directory(self) -> str: """Cache directory final path.""" return os.path.join(self._directory, self._directory_folder) def _get_keyfile(self, archive_key) -> tuple[str, str]: - key_file = f'{archive_key}.{self.key_suffix}' + key_file: str = f'{archive_key}.{self.key_suffix}' return key_file, os.path.join(self.directory, key_file) def _get_writer(self, path, mode): @@ -62,6 +63,7 @@ class FileSystemShard(BaseShard): continue def _write_file(self, full_path, iterator, mode): + # ensure dir exists destination, _ = os.path.split(full_path) if not self.fs.exists(destination): @@ -89,7 +91,8 @@ class FileSystemShard(BaseShard): def store(self, key, value_reader, metadata: dict | None = None): return self._store(key, value_reader, metadata, mode='xb') - def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: + def fetch(self, key, retry=NOT_GIVEN, + retry_attempts=NOT_GIVEN, retry_backoff=1, **kwargs) -> tuple[ShardFileReader, dict]: return self._fetch(key, retry, retry_attempts, retry_backoff) def remove(self, key): @@ -117,7 +120,7 @@ class FileSystemShard(BaseShard): class FileSystemFanoutCache(BaseCache): - shard_name = 'shard_%03d' + shard_name: str = 'shard_{:03d}' shard_cls = FileSystemShard def __init__(self, locking_url, **settings): @@ -162,7 +165,7 @@ class FileSystemFanoutCache(BaseCache): self.shard_cls( index=num, directory=directory, - directory_folder=self.shard_name % num, + directory_folder=self.shard_name.format(num), fs=fs, **settings, ) diff --git a/rhodecode/lib/archive_cache/backends/objectstore_cache.py b/rhodecode/lib/archive_cache/backends/objectstore_cache.py --- a/rhodecode/lib/archive_cache/backends/objectstore_cache.py +++ b/rhodecode/lib/archive_cache/backends/objectstore_cache.py @@ -20,6 +20,7 @@ import codecs import hashlib import logging import os +import typing import fsspec @@ -33,20 +34,20 @@ log = logging.getLogger(__name__) class S3Shard(BaseShard): def __init__(self, index, bucket, bucket_folder, fs, **settings): - self._index = index - self._bucket_folder = bucket_folder - self.storage_type = 'bucket' - self._bucket_main = bucket + self._index: int = index + self._bucket_folder: str = bucket_folder + self.storage_type: str = 'bucket' + self._bucket_main: str = bucket self.fs = fs @property - def bucket(self): + def bucket(self) -> str: """Cache bucket final path.""" return os.path.join(self._bucket_main, self._bucket_folder) def _get_keyfile(self, archive_key) -> tuple[str, str]: - key_file = f'{archive_key}-{self.key_suffix}' + key_file: str = f'{archive_key}-{self.key_suffix}' return key_file, os.path.join(self.bucket, key_file) def _get_writer(self, path, mode): @@ -76,8 +77,10 @@ class S3Shard(BaseShard): def store(self, key, value_reader, metadata: dict | None = None): return self._store(key, value_reader, metadata, mode='wb') - def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: - return self._fetch(key, retry, retry_attempts, retry_backoff) + def fetch(self, key, retry=NOT_GIVEN, + retry_attempts=NOT_GIVEN, retry_backoff=1, + presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: + return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires) def remove(self, key): return self._remove(key) @@ -104,7 +107,7 @@ class S3Shard(BaseShard): class ObjectStoreCache(BaseCache): - shard_name = 'shard-%03d' + shard_name: str = 'shard-{:03d}' shard_cls = S3Shard def __init__(self, locking_url, **settings): @@ -152,7 +155,7 @@ class ObjectStoreCache(BaseCache): self.shard_cls( index=num, bucket=self._bucket, - bucket_folder=self.shard_name % num, + bucket_folder=self.shard_name.format(num), fs=fs, **settings, ) @@ -162,3 +165,6 @@ class ObjectStoreCache(BaseCache): def _get_size(self, shard, archive_path): return shard.fs.info(archive_path)['size'] + + def set_presigned_url_expiry(self, val: int) -> None: + self.presigned_url_expires = val