# HG changeset patch # User RhodeCode Admin # Date 2024-06-19 10:38:59 # Node ID 60bf3236da7d06135dd1184d3004a0a8910153bf # Parent c72dd577671fbd30fef2402593eb1982a505ea7f archive-cache: synced with CE codebase diff --git a/vcsserver/lib/archive_cache/backends/base.py b/vcsserver/lib/archive_cache/backends/base.py --- a/vcsserver/lib/archive_cache/backends/base.py +++ b/vcsserver/lib/archive_cache/backends/base.py @@ -166,6 +166,7 @@ class BaseCache: retry_attempts = 0 retry_backoff = 1 _shards = tuple() + shard_cls = BaseShard def __contains__(self, key): """Return `True` if `key` matching item is found in cache. @@ -199,8 +200,10 @@ class BaseCache: del self._config[key] return val - def _get_shard(self, key): - raise NotImplementedError + def _get_shard(self, key) -> shard_cls: + index = self._hash(key) % self._shard_count + shard = self._shards[index] + return shard def _get_size(self, shard, archive_path): raise NotImplementedError diff --git a/vcsserver/lib/archive_cache/backends/fanout_cache.py b/vcsserver/lib/archive_cache/backends/fanout_cache.py --- a/vcsserver/lib/archive_cache/backends/fanout_cache.py +++ b/vcsserver/lib/archive_cache/backends/fanout_cache.py @@ -32,16 +32,18 @@ log = logging.getLogger(__name__) class FileSystemShard(BaseShard): - def __init__(self, index, directory, **settings): + def __init__(self, index, directory, directory_folder, fs, **settings): self._index = index self._directory = directory + self._directory_folder = directory_folder self.storage_type = 'directory' - self.fs = fsspec.filesystem('file') + + self.fs = fs @property def directory(self): - """Cache directory.""" - return self._directory + """Cache directory final path.""" + return os.path.join(self._directory, self._directory_folder) def _get_keyfile(self, archive_key) -> tuple[str, str]: key_file = f'{archive_key}.{self.key_suffix}' @@ -116,6 +118,7 @@ class FileSystemShard(BaseShard): class FileSystemFanoutCache(BaseCache): shard_name = 'shard_%03d' + shard_cls = FileSystemShard def __init__(self, locking_url, **settings): """ @@ -132,13 +135,11 @@ class FileSystemFanoutCache(BaseCache): directory = os.path.expanduser(directory) directory = os.path.expandvars(directory) self._directory = directory - self._storage_path = directory + self._storage_path = directory # common path for all from BaseCache - # check if it's ok to write, and re-create the archive cache - if not os.path.isdir(self._directory): - os.makedirs(self._directory, exist_ok=True) - - self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True)) + self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True)) + if self._shard_count < 1: + raise ValueError('cache_shards must be 1 or more') self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True) self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb'))) @@ -147,21 +148,27 @@ class FileSystemFanoutCache(BaseCache): self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True)) self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True)) - log.debug('Initializing archival cache instance under %s', self._directory) + log.debug('Initializing %s archival cache instance under %s', self) + fs = fsspec.filesystem('file') + # check if it's ok to write, and re-create the archive cache main dir + # A directory is the virtual equivalent of a physical file cabinet. + # In other words, it's a container for organizing digital data. + # Unlike a folder, which can only store files, a directory can store files, + # subdirectories, and other directories. + if not fs.exists(self._directory): + fs.makedirs(self._directory, exist_ok=True) + self._shards = tuple( - FileSystemShard( + self.shard_cls( index=num, - directory=os.path.join(directory, self.shard_name % num), + directory=directory, + directory_folder=self.shard_name % num, + fs=fs, **settings, ) - for num in range(self._count) + for num in range(self._shard_count) ) self._hash = self._shards[0].hash - def _get_shard(self, key) -> FileSystemShard: - index = self._hash(key) % self._count - shard = self._shards[index] - return shard - def _get_size(self, shard, archive_path): return os.stat(archive_path).st_size diff --git a/vcsserver/lib/archive_cache/backends/objectstore_cache.py b/vcsserver/lib/archive_cache/backends/objectstore_cache.py --- a/vcsserver/lib/archive_cache/backends/objectstore_cache.py +++ b/vcsserver/lib/archive_cache/backends/objectstore_cache.py @@ -32,24 +32,18 @@ log = logging.getLogger(__name__) class S3Shard(BaseShard): - def __init__(self, index, bucket, **settings): + def __init__(self, index, bucket, bucket_folder, fs, **settings): self._index = index - self._bucket = bucket + self._bucket_folder = bucket_folder self.storage_type = 'bucket' + self._bucket_main = bucket - endpoint_url = settings.pop('archive_cache.objectstore.url') - key = settings.pop('archive_cache.objectstore.key') - secret = settings.pop('archive_cache.objectstore.secret') - - # TODO: Add it all over the place... - self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root') - - self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) + self.fs = fs @property def bucket(self): - """Cache bucket.""" - return os.path.join(self._bucket_root, self._bucket) + """Cache bucket final path.""" + return os.path.join(self._bucket_main, self._bucket_folder) def _get_keyfile(self, archive_key) -> tuple[str, str]: key_file = f'{archive_key}-{self.key_suffix}' @@ -59,11 +53,8 @@ class S3Shard(BaseShard): return self.fs.open(path, 'wb') def _write_file(self, full_path, iterator, mode): - if self._bucket_root: - if not self.fs.exists(self._bucket_root): - self.fs.mkdir(self._bucket_root) - # ensure bucket exists + # ensure folder in bucket exists destination = self.bucket if not self.fs.exists(destination): self.fs.mkdir(destination, s3_additional_kwargs={}) @@ -113,7 +104,8 @@ class S3Shard(BaseShard): class ObjectStoreCache(BaseCache): - shard_name = 'shard-bucket-%03d' + shard_name = 'shard-%03d' + shard_cls = S3Shard def __init__(self, locking_url, **settings): """ @@ -127,9 +119,15 @@ class ObjectStoreCache(BaseCache): self._config = settings objectstore_url = self.get_conf('archive_cache.objectstore.url') - self._storage_path = objectstore_url + self._storage_path = objectstore_url # common path for all from BaseCache - self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) + self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) + if self._shard_count < 1: + raise ValueError('cache_shards must be 1 or more') + + self._bucket = settings.pop('archive_cache.objectstore.bucket') + if not self._bucket: + raise ValueError('archive_cache.objectstore.bucket needs to have a value') self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True) self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb'))) @@ -138,21 +136,29 @@ class ObjectStoreCache(BaseCache): self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True)) self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True)) - log.debug('Initializing archival cache instance under %s', objectstore_url) + endpoint_url = settings.pop('archive_cache.objectstore.url') + key = settings.pop('archive_cache.objectstore.key') + secret = settings.pop('archive_cache.objectstore.secret') + + log.debug('Initializing %s archival cache instance under %s', self) + + fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) + + # init main bucket + if not fs.exists(self._bucket): + fs.mkdir(self._bucket) + self._shards = tuple( - S3Shard( + self.shard_cls( index=num, - bucket=self.shard_name % num, + bucket=self._bucket, + bucket_folder=self.shard_name % num, + fs=fs, **settings, ) - for num in range(self._count) + for num in range(self._shard_count) ) self._hash = self._shards[0].hash - def _get_shard(self, key) -> S3Shard: - index = self._hash(key) % self._count - shard = self._shards[index] - return shard - def _get_size(self, shard, archive_path): return shard.fs.info(archive_path)['size']