diff --git a/configs/development.ini b/configs/development.ini
--- a/configs/development.ini
+++ b/configs/development.ini
@@ -290,19 +290,41 @@ file_store.backend = local
; path to store the uploaded binaries and artifacts
file_store.storage_path = /var/opt/rhodecode_data/file_store
-; Uncomment and set this path to control settings for archive download cache.
+
+; Redis url to acquire/check generation of archives locks
+archive_cache.locking.url = redis://redis:6379/1
+
+; Storage backend, only 'filesystem' and 'objectstore' are available now
+archive_cache.backend.type = filesystem
+
+; url for s3 compatible storage that allows to upload artifacts
+; e.g http://minio:9000
+archive_cache.objectstore.url = http://s3-minio:9000
+
+; key for s3 auth
+archive_cache.objectstore.key = key
+
+; secret for s3 auth
+archive_cache.objectstore.secret = secret
+
+; number of sharded buckets to create to distribute archives across
+; default is 8 shards
+archive_cache.objectstore.bucket_shards = 8
+
+; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
+archive_cache.objectstore.retry = false
+
+; number of seconds to wait for next try using retry
+archive_cache.objectstore.retry_backoff = 1
+
+; how many tries do do a retry fetch from this backend
+archive_cache.objectstore.retry_attempts = 10
+
+; Default is $cache_dir/archive_cache if not set
; Generated repo archives will be cached at this location
; and served from the cache during subsequent requests for the same archive of
; the repository. This path is important to be shared across filesystems and with
; RhodeCode and vcsserver
-
-; Redis url to acquire/check generation of archives locks
-archive_cache.locking.url = redis://redis:6379/1
-
-; Storage backend, only 'filesystem' is available now
-archive_cache.backend.type = filesystem
-
-; Default is $cache_dir/archive_cache if not set
archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
@@ -312,8 +334,18 @@ archive_cache.filesystem.cache_size_gb =
archive_cache.filesystem.eviction_policy = least-recently-stored
; By default cache uses sharding technique, this specifies how many shards are there
+; default is 8 shards
archive_cache.filesystem.cache_shards = 8
+; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
+archive_cache.filesystem.retry = false
+
+; number of seconds to wait for next try using retry
+archive_cache.filesystem.retry_backoff = 1
+
+; how many tries do do a retry fetch from this backend
+archive_cache.filesystem.retry_attempts = 10
+
; #############
; CELERY CONFIG
diff --git a/configs/production.ini b/configs/production.ini
--- a/configs/production.ini
+++ b/configs/production.ini
@@ -258,19 +258,41 @@ file_store.backend = local
; path to store the uploaded binaries and artifacts
file_store.storage_path = /var/opt/rhodecode_data/file_store
-; Uncomment and set this path to control settings for archive download cache.
+
+; Redis url to acquire/check generation of archives locks
+archive_cache.locking.url = redis://redis:6379/1
+
+; Storage backend, only 'filesystem' and 'objectstore' are available now
+archive_cache.backend.type = filesystem
+
+; url for s3 compatible storage that allows to upload artifacts
+; e.g http://minio:9000
+archive_cache.objectstore.url = http://s3-minio:9000
+
+; key for s3 auth
+archive_cache.objectstore.key = key
+
+; secret for s3 auth
+archive_cache.objectstore.secret = secret
+
+; number of sharded buckets to create to distribute archives across
+; default is 8 shards
+archive_cache.objectstore.bucket_shards = 8
+
+; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
+archive_cache.objectstore.retry = false
+
+; number of seconds to wait for next try using retry
+archive_cache.objectstore.retry_backoff = 1
+
+; how many tries do do a retry fetch from this backend
+archive_cache.objectstore.retry_attempts = 10
+
+; Default is $cache_dir/archive_cache if not set
; Generated repo archives will be cached at this location
; and served from the cache during subsequent requests for the same archive of
; the repository. This path is important to be shared across filesystems and with
; RhodeCode and vcsserver
-
-; Redis url to acquire/check generation of archives locks
-archive_cache.locking.url = redis://redis:6379/1
-
-; Storage backend, only 'filesystem' is available now
-archive_cache.backend.type = filesystem
-
-; Default is $cache_dir/archive_cache if not set
archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
@@ -280,8 +302,18 @@ archive_cache.filesystem.cache_size_gb =
archive_cache.filesystem.eviction_policy = least-recently-stored
; By default cache uses sharding technique, this specifies how many shards are there
+; default is 8 shards
archive_cache.filesystem.cache_shards = 8
+; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
+archive_cache.filesystem.retry = false
+
+; number of seconds to wait for next try using retry
+archive_cache.filesystem.retry_backoff = 1
+
+; how many tries do do a retry fetch from this backend
+archive_cache.filesystem.retry_attempts = 10
+
; #############
; CELERY CONFIG
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -89,6 +89,7 @@ dogpile.cache==1.3.3
pbr==5.11.1
formencode==2.1.0
six==1.16.0
+fsspec==2024.6.0
gunicorn==21.2.0
packaging==24.0
gevent==24.2.1
@@ -257,6 +258,34 @@ regex==2022.10.31
routes==2.5.1
repoze.lru==0.7
six==1.16.0
+s3fs==2024.6.0
+ aiobotocore==2.13.0
+ aiohttp==3.9.5
+ aiosignal==1.3.1
+ frozenlist==1.4.1
+ attrs==22.2.0
+ frozenlist==1.4.1
+ multidict==6.0.5
+ yarl==1.9.4
+ idna==3.4
+ multidict==6.0.5
+ aioitertools==0.11.0
+ botocore==1.34.106
+ jmespath==1.0.1
+ python-dateutil==2.8.2
+ six==1.16.0
+ urllib3==1.26.14
+ wrapt==1.16.0
+ aiohttp==3.9.5
+ aiosignal==1.3.1
+ frozenlist==1.4.1
+ attrs==22.2.0
+ frozenlist==1.4.1
+ multidict==6.0.5
+ yarl==1.9.4
+ idna==3.4
+ multidict==6.0.5
+ fsspec==2024.6.0
simplejson==3.19.2
sshpubkeys==3.3.1
cryptography==40.0.2
diff --git a/rhodecode/apps/repository/views/repo_files.py b/rhodecode/apps/repository/views/repo_files.py
--- a/rhodecode/apps/repository/views/repo_files.py
+++ b/rhodecode/apps/repository/views/repo_files.py
@@ -39,7 +39,7 @@ from rhodecode.apps._base import RepoApp
from rhodecode.lib import diffs, helpers as h, rc_cache
from rhodecode.lib import audit_logger
from rhodecode.lib.hash_utils import sha1_safe
-from rhodecode.lib.rc_cache.archive_cache import (
+from rhodecode.lib.archive_cache import (
get_archival_cache_store, get_archival_config, ArchiveCacheGenerationLock, archive_iterator)
from rhodecode.lib.str_utils import safe_bytes, convert_special_chars
from rhodecode.lib.view_utils import parse_path_ref
diff --git a/rhodecode/config/config_maker.py b/rhodecode/config/config_maker.py
--- a/rhodecode/config/config_maker.py
+++ b/rhodecode/config/config_maker.py
@@ -193,10 +193,26 @@ def sanitize_settings_and_apply_defaults
settings_maker.make_setting('archive_cache.backend.type', 'filesystem')
settings_maker.make_setting('archive_cache.filesystem.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
+ settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int')
settings_maker.make_setting('archive_cache.filesystem.cache_size_gb', 10, parser='float')
- settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int')
settings_maker.make_setting('archive_cache.filesystem.eviction_policy', 'least-recently-stored')
+ settings_maker.make_setting('archive_cache.filesystem.retry', False, parser='bool')
+ settings_maker.make_setting('archive_cache.filesystem.retry_backoff', 1, parser='int')
+ settings_maker.make_setting('archive_cache.filesystem.retry_attempts', 10, parser='int')
+
+ settings_maker.make_setting('archive_cache.objectstore.url', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
+ settings_maker.make_setting('archive_cache.objectstore.key', '')
+ settings_maker.make_setting('archive_cache.objectstore.secret', '')
+ settings_maker.make_setting('archive_cache.objectstore.bucket_shards', 8, parser='int')
+
+ settings_maker.make_setting('archive_cache.objectstore.cache_size_gb', 10, parser='float')
+ settings_maker.make_setting('archive_cache.objectstore.eviction_policy', 'least-recently-stored')
+
+ settings_maker.make_setting('archive_cache.objectstore.retry', False, parser='bool')
+ settings_maker.make_setting('archive_cache.objectstore.retry_backoff', 1, parser='int')
+ settings_maker.make_setting('archive_cache.objectstore.retry_attempts', 10, parser='int')
+
settings_maker.env_expand()
# configure instance id
diff --git a/rhodecode/config/middleware.py b/rhodecode/config/middleware.py
--- a/rhodecode/config/middleware.py
+++ b/rhodecode/config/middleware.py
@@ -326,7 +326,7 @@ def includeme(config, auth_resources=Non
config.include('pyramid_mako')
config.include('rhodecode.lib.rc_beaker')
config.include('rhodecode.lib.rc_cache')
- config.include('rhodecode.lib.rc_cache.archive_cache')
+ config.include('rhodecode.lib.archive_cache')
config.include('rhodecode.apps._base.navigation')
config.include('rhodecode.apps._base.subscribers')
diff --git a/rhodecode/lib/rc_cache/archive_cache/__init__.py b/rhodecode/lib/archive_cache/__init__.py
rename from rhodecode/lib/rc_cache/archive_cache/__init__.py
rename to rhodecode/lib/archive_cache/__init__.py
--- a/rhodecode/lib/rc_cache/archive_cache/__init__.py
+++ b/rhodecode/lib/archive_cache/__init__.py
@@ -16,14 +16,63 @@
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
-from .fanout_cache import get_archival_cache_store
-from .fanout_cache import get_archival_config
+import logging
+
+from .backends.fanout_cache import FileSystemFanoutCache
+from .backends.objectstore_cache import ObjectStoreCache
-from .utils import archive_iterator
-from .utils import ArchiveCacheGenerationLock
+from .utils import archive_iterator # noqa
+from .lock import ArchiveCacheGenerationLock # noqa
+
+log = logging.getLogger(__name__)
+
+
+cache_meta = None
def includeme(config):
# init our cache at start
settings = config.get_settings()
get_archival_cache_store(settings)
+
+
+def get_archival_config(config):
+
+ final_config = {
+
+ }
+
+ for k, v in config.items():
+ if k.startswith('archive_cache'):
+ final_config[k] = v
+
+ return final_config
+
+
+def get_archival_cache_store(config, always_init=False):
+
+ global cache_meta
+ if cache_meta is not None and not always_init:
+ return cache_meta
+
+ config = get_archival_config(config)
+ backend = config['archive_cache.backend.type']
+
+ archive_cache_locking_url = config['archive_cache.locking.url']
+
+ match backend:
+ case 'filesystem':
+ d_cache = FileSystemFanoutCache(
+ locking_url=archive_cache_locking_url,
+ **config
+ )
+ case 'objectstore':
+ d_cache = ObjectStoreCache(
+ locking_url=archive_cache_locking_url,
+ **config
+ )
+ case _:
+ raise ValueError(f'archive_cache.backend.type only supports "filesystem" or "objectstore" got {backend} ')
+
+ cache_meta = d_cache
+ return cache_meta
diff --git a/rhodecode/lib/archive_cache/backends/__init__.py b/rhodecode/lib/archive_cache/backends/__init__.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/archive_cache/backends/__init__.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
diff --git a/rhodecode/lib/archive_cache/backends/base.py b/rhodecode/lib/archive_cache/backends/base.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/archive_cache/backends/base.py
@@ -0,0 +1,348 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import functools
+import logging
+import typing
+import time
+import zlib
+
+from ...ext_json import json
+from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
+from ..lock import GenerationLock
+
+log = logging.getLogger(__name__)
+
+
+class BaseShard:
+ storage_type: str = ''
+ fs = None
+
+ @classmethod
+ def hash(cls, key):
+ """Compute portable hash for `key`.
+
+ :param key: key to hash
+ :return: hash value
+
+ """
+ mask = 0xFFFFFFFF
+ return zlib.adler32(key.encode('utf-8')) & mask # noqa
+
+ def _write_file(self, full_path, read_iterator, mode):
+ raise NotImplementedError
+
+ def _get_keyfile(self, key):
+ raise NotImplementedError
+
+ def random_filename(self):
+ raise NotImplementedError
+
+ def _store(self, key, value_reader, metadata, mode):
+ (filename, # hash-name
+ full_path # full-path/hash-name
+ ) = self.random_filename()
+
+ key_file, key_file_path = self._get_keyfile(key)
+
+ # STORE METADATA
+ _metadata = {
+ "version": "v1",
+
+ "key_file": key_file, # this is the .key.json file storing meta
+ "key_file_path": key_file_path, # full path to key_file
+ "archive_key": key, # original name we stored archive under, e.g my-archive.zip
+ "archive_filename": filename, # the actual filename we stored that file under
+ "archive_full_path": full_path,
+
+ "store_time": time.time(),
+ "access_count": 0,
+ "access_time": 0,
+
+ "size": 0
+ }
+ if metadata:
+ _metadata.update(metadata)
+
+ read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
+ size, sha256 = self._write_file(full_path, read_iterator, mode)
+ _metadata['size'] = size
+ _metadata['sha256'] = sha256
+
+ # after archive is finished, we create a key to save the presence of the binary file
+ with self.fs.open(key_file_path, 'wb') as f:
+ f.write(json.dumps(_metadata))
+
+ return key, filename, size, _metadata
+
+ def _fetch(self, key, retry, retry_attempts, retry_backoff):
+ if retry is NOT_GIVEN:
+ retry = False
+ if retry_attempts is NOT_GIVEN:
+ retry_attempts = 0
+
+ if retry and retry_attempts > 0:
+ for attempt in range(1, retry_attempts + 1):
+ if key in self:
+ break
+ # we didn't find the key, wait retry_backoff N seconds, and re-check
+ time.sleep(retry_backoff)
+
+ if key not in self:
+ log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
+ raise KeyError(key)
+
+ key_file, key_file_path = self._get_keyfile(key)
+ with self.fs.open(key_file_path, 'rb') as f:
+ metadata = json.loads(f.read())
+
+ archive_path = metadata['archive_full_path']
+
+ try:
+ return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
+ finally:
+ # update usage stats, count and accessed
+ metadata["access_count"] = metadata.get("access_count", 0) + 1
+ metadata["access_time"] = time.time()
+ log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
+ key_file, metadata['access_count'], metadata['access_time'])
+ with self.fs.open(key_file_path, 'wb') as f:
+ f.write(json.dumps(metadata))
+
+ def _remove(self, key):
+ if key not in self:
+ log.exception(f'requested key={key} not found in {self}')
+ raise KeyError(key)
+
+ key_file, key_file_path = self._get_keyfile(key)
+ with self.fs.open(key_file_path, 'rb') as f:
+ metadata = json.loads(f.read())
+
+ archive_path = metadata['archive_full_path']
+ self.fs.rm(archive_path)
+ self.fs.rm(key_file_path)
+ return 1
+
+ @property
+ def storage_medium(self):
+ return getattr(self, self.storage_type)
+
+ @property
+ def key_suffix(self):
+ return 'key.json'
+
+ def __contains__(self, key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param key: key matching item
+ :return: True if key matching item
+
+ """
+ key_file, key_file_path = self._get_keyfile(key)
+ return self.fs.exists(key_file_path)
+
+
+class BaseCache:
+ _locking_url: str = ''
+ _storage_path: str = ''
+ _config = {}
+ retry = False
+ retry_attempts = 0
+ retry_backoff = 1
+ _shards = tuple()
+
+ def __contains__(self, key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param key: key matching item
+ :return: True if key matching item
+
+ """
+ return self.has_key(key)
+
+ def __repr__(self):
+ return f'<{self.__class__.__name__}(storage={self._storage_path})>'
+
+ @classmethod
+ def gb_to_bytes(cls, gb):
+ return gb * (1024 ** 3)
+
+ @property
+ def storage_path(self):
+ return self._storage_path
+
+ @classmethod
+ def get_stats_db(cls):
+ return StatsDB()
+
+ def get_conf(self, key, pop=False):
+ if key not in self._config:
+ raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
+ val = self._config[key]
+ if pop:
+ del self._config[key]
+ return val
+
+ def _get_shard(self, key):
+ raise NotImplementedError
+
+ def _get_size(self, shard, archive_path):
+ raise NotImplementedError
+
+ def store(self, key, value_reader, metadata=None):
+ shard = self._get_shard(key)
+ return shard.store(key, value_reader, metadata)
+
+ def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
+ """
+ Return file handle corresponding to `key` from specific shard cache.
+ """
+ if retry is NOT_GIVEN:
+ retry = self.retry
+ if retry_attempts is NOT_GIVEN:
+ retry_attempts = self.retry_attempts
+ retry_backoff = self.retry_backoff
+
+ shard = self._get_shard(key)
+ return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
+
+ def remove(self, key):
+ shard = self._get_shard(key)
+ return shard.remove(key)
+
+ def has_key(self, archive_key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
+ :return: True if key is found
+
+ """
+ shard = self._get_shard(archive_key)
+ return archive_key in shard
+
+ def iter_keys(self):
+ for shard in self._shards:
+ if shard.fs.exists(shard.storage_medium):
+ for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
+ for key_file_path in _files:
+ if key_file_path.endswith(shard.key_suffix):
+ yield shard, key_file_path
+
+ def get_lock(self, lock_key):
+ return GenerationLock(lock_key, self._locking_url)
+
+ def evict(self, policy=None, size_limit=None) -> int:
+ """
+ Remove old items based on the conditions
+
+
+ explanation of this algo:
+ iterate over each shard, then for each shard iterate over the .key files
+ read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
+ access data, time creation, and access counts.
+
+ Store that into a memory DB so we can run different sorting strategies easily.
+ Summing the size is a sum sql query.
+
+ Then we run a sorting strategy based on eviction policy.
+ We iterate over sorted keys, and remove each checking if we hit the overall limit.
+ """
+
+ policy = policy or self._eviction_policy
+ size_limit = size_limit or self._cache_size_limit
+
+ select_policy = EVICTION_POLICY[policy]['evict']
+
+ log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
+ policy, format_size(size_limit))
+
+ if select_policy is None:
+ return 0
+
+ db = self.get_stats_db()
+
+ data = []
+ cnt = 1
+
+ for shard, key_file in self.iter_keys():
+ with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
+ metadata = json.loads(f.read())
+
+ key_file_path = os.path.join(shard.storage_medium, key_file)
+
+ archive_key = metadata['archive_key']
+ archive_path = metadata['archive_full_path']
+
+ size = metadata.get('size')
+ if not size:
+ # in case we don't have size re-calc it...
+ size = self._get_size(shard, archive_path)
+
+ data.append([
+ cnt,
+ key_file,
+ key_file_path,
+ archive_key,
+ archive_path,
+ metadata.get('store_time', 0),
+ metadata.get('access_time', 0),
+ metadata.get('access_count', 0),
+ size,
+ ])
+ cnt += 1
+
+ # Insert bulk data using executemany
+ db.bulk_insert(data)
+
+ total_size = db.get_total_size()
+ log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
+ len(data), format_size(total_size), format_size(size_limit))
+
+ removed_items = 0
+ removed_size = 0
+ for key_file, archive_key, size in db.get_sorted_keys(select_policy):
+ # simulate removal impact BEFORE removal
+ total_size -= size
+
+ if total_size <= size_limit:
+ # we obtained what we wanted...
+ break
+
+ self.remove(archive_key)
+ removed_items += 1
+ removed_size += size
+
+ log.debug('Removed %s cache archives, and reduced size by: %s',
+ removed_items, format_size(removed_size))
+ return removed_items
+
+ def get_statistics(self):
+ total_files = 0
+ total_size = 0
+ meta = {}
+
+ for shard, key_file in self.iter_keys():
+ json_key = f"{shard.storage_medium}/{key_file}"
+ with shard.fs.open(json_key, 'rb') as f:
+ total_files += 1
+ metadata = json.loads(f.read())
+ total_size += metadata['size']
+
+ return total_files, total_size, meta
+
diff --git a/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py b/rhodecode/lib/archive_cache/backends/fanout_cache.py
rename from rhodecode/lib/rc_cache/archive_cache/fanout_cache.py
rename to rhodecode/lib/archive_cache/backends/fanout_cache.py
--- a/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py
+++ b/rhodecode/lib/archive_cache/backends/fanout_cache.py
@@ -17,198 +17,81 @@
# and proprietary license terms, please see https://rhodecode.com/licenses/
import codecs
-import contextlib
-import functools
-import os
+import hashlib
import logging
-import time
-import typing
-import zlib
-import sqlite3
+import os
+
+import fsspec
-from ...ext_json import json
-from .lock import GenerationLock
-from .utils import format_size
+from .base import BaseCache, BaseShard
+from ..utils import ShardFileReader, NOT_GIVEN
+from ...type_utils import str2bool
log = logging.getLogger(__name__)
-cache_meta = None
-UNKNOWN = -241
-NO_VAL = -917
-
-MODE_BINARY = 'BINARY'
-
-
-EVICTION_POLICY = {
- 'none': {
- 'evict': None,
- },
- 'least-recently-stored': {
- 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
- },
- 'least-recently-used': {
- 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
- },
- 'least-frequently-used': {
- 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
- },
-}
-
-
-class DB:
-
- def __init__(self):
- self.connection = sqlite3.connect(':memory:')
- self._init_db()
-
- def _init_db(self):
- qry = '''
- CREATE TABLE IF NOT EXISTS archive_cache (
- rowid INTEGER PRIMARY KEY,
- key_file TEXT,
- key_file_path TEXT,
- filename TEXT,
- full_path TEXT,
- store_time REAL,
- access_time REAL,
- access_count INTEGER DEFAULT 0,
- size INTEGER DEFAULT 0
- )
- '''
-
- self.sql(qry)
- self.connection.commit()
-
- @property
- def sql(self):
- return self.connection.execute
-
- def bulk_insert(self, rows):
- qry = '''
- INSERT INTO archive_cache (
- rowid,
- key_file,
- key_file_path,
- filename,
- full_path,
- store_time,
- access_time,
- access_count,
- size
- )
- VALUES (
- ?, ?, ?, ?, ?, ?, ?, ?, ?
- )
- '''
- cursor = self.connection.cursor()
- cursor.executemany(qry, rows)
- self.connection.commit()
-
-
-class FileSystemCache:
+class FileSystemShard(BaseShard):
def __init__(self, index, directory, **settings):
self._index = index
self._directory = directory
+ self.storage_type = 'directory'
+ self.fs = fsspec.filesystem('file')
@property
def directory(self):
"""Cache directory."""
return self._directory
- def _write_file(self, full_path, iterator, mode, encoding=None):
- full_dir, _ = os.path.split(full_path)
+ def _get_keyfile(self, archive_key) -> tuple[str, str]:
+ key_file = f'{archive_key}.{self.key_suffix}'
+ return key_file, os.path.join(self.directory, key_file)
+ def _get_writer(self, path, mode):
for count in range(1, 11):
- with contextlib.suppress(OSError):
- os.makedirs(full_dir)
-
try:
# Another cache may have deleted the directory before
# the file could be opened.
- writer = open(full_path, mode, encoding=encoding)
+ return self.fs.open(path, mode)
except OSError:
if count == 10:
# Give up after 10 tries to open the file.
raise
continue
- with writer:
- size = 0
- for chunk in iterator:
- size += len(chunk)
- writer.write(chunk)
- writer.flush()
- # Get the file descriptor
- fd = writer.fileno()
-
- # Sync the file descriptor to disk, helps with NFS cases...
- os.fsync(fd)
- log.debug('written new archive cache under %s', full_path)
- return size
-
- def _get_keyfile(self, key):
- return os.path.join(self._directory, f'{key}.key')
+ def _write_file(self, full_path, iterator, mode):
+ # ensure dir exists
+ destination, _ = os.path.split(full_path)
+ if not self.fs.exists(destination):
+ self.fs.makedirs(destination)
- def store(self, key, value_reader, metadata):
- filename, full_path = self.random_filename()
- key_file = self._get_keyfile(key)
-
- # STORE METADATA
- _metadata = {
- "version": "v1",
- "filename": filename,
- "full_path": full_path,
- "key_file": key_file,
- "store_time": time.time(),
- "access_count": 1,
- "access_time": 0,
- "size": 0
- }
- if metadata:
- _metadata.update(metadata)
-
- reader = functools.partial(value_reader.read, 2**22)
+ writer = self._get_writer(full_path, mode)
- iterator = iter(reader, b'')
- size = self._write_file(full_path, iterator, 'xb')
- metadata['size'] = size
-
- # after archive is finished, we create a key to save the presence of the binary file
- with open(key_file, 'wb') as f:
- f.write(json.dumps(_metadata))
-
- return key, size, MODE_BINARY, filename, _metadata
-
- def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
-
- if retry:
- for attempt in range(retry_attempts):
- if key in self:
- break
- # we dind't find the key, wait 1s, and re-check
- time.sleep(1)
+ digest = hashlib.sha256()
+ with writer:
+ size = 0
+ for chunk in iterator:
+ size += len(chunk)
+ digest.update(chunk)
+ writer.write(chunk)
+ writer.flush()
+ # Get the file descriptor
+ fd = writer.fileno()
- if key not in self:
- log.exception('requested {key} not found in {self}', key, self)
- raise KeyError(key)
-
- key_file = self._get_keyfile(key)
- with open(key_file, 'rb') as f:
- metadata = json.loads(f.read())
-
- filename = metadata['filename']
+ # Sync the file descriptor to disk, helps with NFS cases...
+ os.fsync(fd)
+ sha256 = digest.hexdigest()
+ log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
+ return size, sha256
- try:
- return open(os.path.join(self.directory, filename), 'rb'), metadata
- finally:
- # update usage stats, count and accessed
- metadata["access_count"] = metadata.get("access_count", 0) + 1
- metadata["access_time"] = time.time()
+ def store(self, key, value_reader, metadata: dict | None = None):
+ return self._store(key, value_reader, metadata, mode='xb')
- with open(key_file, 'wb') as f:
- f.write(json.dumps(metadata))
+ def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
+ return self._fetch(key, retry, retry_attempts, retry_backoff)
+
+ def remove(self, key):
+ return self._remove(key)
def random_filename(self):
"""Return filename and full-path tuple for file storage.
@@ -220,64 +103,52 @@ class FileSystemCache:
"""
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
- sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
- name = hex_name[4:] + '.archive_cache'
- filename = os.path.join(sub_dir, name)
- full_path = os.path.join(self.directory, filename)
- return filename, full_path
-
- def hash(self, key):
- """Compute portable hash for `key`.
-
- :param key: key to hash
- :return: hash value
- """
- mask = 0xFFFFFFFF
- return zlib.adler32(key.encode('utf-8')) & mask # noqa
-
- def __contains__(self, key):
- """Return `True` if `key` matching item is found in cache.
+ archive_name = hex_name[4:] + '.archive_cache'
+ filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
- :param key: key matching item
- :return: True if key matching item
-
- """
- key_file = self._get_keyfile(key)
- return os.path.exists(key_file)
+ full_path = os.path.join(self.directory, filename)
+ return archive_name, full_path
def __repr__(self):
- return f'FileSystemCache(index={self._index}, dir={self.directory})'
+ return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
-class FanoutCache:
- """Cache that shards keys and values."""
+class FileSystemFanoutCache(BaseCache):
- def __init__(
- self, directory=None, **settings
- ):
- """Initialize cache instance.
+ def __init__(self, locking_url, **settings):
+ """
+ Initialize file system cache instance.
- :param str directory: cache directory
+ :param str locking_url: redis url for a lock
:param settings: settings dict
"""
- if directory is None:
- raise ValueError('directory cannot be None')
-
- directory = str(directory)
+ self._locking_url = locking_url
+ self._config = settings
+ cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
+ directory = str(cache_dir)
directory = os.path.expanduser(directory)
directory = os.path.expandvars(directory)
self._directory = directory
+ self._storage_path = directory
- self._count = settings.pop('cache_shards')
- self._locking_url = settings.pop('locking_url')
+ # check if it's ok to write, and re-create the archive cache
+ if not os.path.isdir(self._directory):
+ os.makedirs(self._directory, exist_ok=True)
+
+ self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
- self._eviction_policy = settings['cache_eviction_policy']
- self._cache_size_limit = settings['cache_size_limit']
+ self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
+ self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
+ self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
+ self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
+ self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
+
+ log.debug('Initializing archival cache instance under %s', self._directory)
self._shards = tuple(
- FileSystemCache(
+ FileSystemShard(
index=num,
directory=os.path.join(directory, 'shard_%03d' % num),
**settings,
@@ -286,171 +157,10 @@ class FanoutCache:
)
self._hash = self._shards[0].hash
- @property
- def directory(self):
- """Cache directory."""
- return self._directory
-
- def get_lock(self, lock_key):
- return GenerationLock(lock_key, self._locking_url)
-
- def _get_shard(self, key) -> FileSystemCache:
+ def _get_shard(self, key) -> FileSystemShard:
index = self._hash(key) % self._count
shard = self._shards[index]
return shard
- def store(self, key, value_reader, metadata=None):
- shard = self._get_shard(key)
- return shard.store(key, value_reader, metadata)
-
- def fetch(self, key, retry=False, retry_attempts=10):
- """Return file handle corresponding to `key` from cache.
- """
- shard = self._get_shard(key)
- return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
-
- def has_key(self, key):
- """Return `True` if `key` matching item is found in cache.
-
- :param key: key for item
- :return: True if key is found
-
- """
- shard = self._get_shard(key)
- return key in shard
-
- def __contains__(self, item):
- return self.has_key(item)
-
- def evict(self, policy=None, size_limit=None):
- """
- Remove old items based on the conditions
-
-
- explanation of this algo:
- iterate over each shard, then for each shard iterate over the .key files
- read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
- access data, time creation, and access counts.
-
- Store that into a memory DB so we can run different sorting strategies easily.
- Summing the size is a sum sql query.
-
- Then we run a sorting strategy based on eviction policy.
- We iterate over sorted keys, and remove each checking if we hit the overall limit.
- """
-
- policy = policy or self._eviction_policy
- size_limit = size_limit or self._cache_size_limit
-
- select_policy = EVICTION_POLICY[policy]['evict']
-
- log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
- policy, format_size(size_limit))
-
- if select_policy is None:
- return 0
-
- db = DB()
-
- data = []
- cnt = 1
- for shard in self._shards:
- for key_file in os.listdir(shard.directory):
- if key_file.endswith('.key'):
- key_file_path = os.path.join(shard.directory, key_file)
- with open(key_file_path, 'rb') as f:
- metadata = json.loads(f.read())
-
- size = metadata.get('size')
- filename = metadata.get('filename')
- full_path = metadata.get('full_path')
-
- if not size:
- # in case we don't have size re-calc it...
- size = os.stat(full_path).st_size
-
- data.append([
- cnt,
- key_file,
- key_file_path,
- filename,
- full_path,
- metadata.get('store_time', 0),
- metadata.get('access_time', 0),
- metadata.get('access_count', 0),
- size,
- ])
- cnt += 1
-
- # Insert bulk data using executemany
- db.bulk_insert(data)
-
- ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
- log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
- select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
- sorted_keys = db.sql(select_policy_qry).fetchall()
-
- removed_items = 0
- removed_size = 0
- for key, cached_file, size in sorted_keys:
- # simulate removal impact BEFORE removal
- total_size -= size
-
- if total_size <= size_limit:
- # we obtained what we wanted...
- break
-
- os.remove(cached_file)
- os.remove(key)
- removed_items += 1
- removed_size += size
-
- log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
- return removed_items
-
-
-def get_archival_config(config):
-
- final_config = {
-
- }
-
- for k, v in config.items():
- if k.startswith('archive_cache'):
- final_config[k] = v
-
- return final_config
-
-
-def get_archival_cache_store(config):
-
- global cache_meta
- if cache_meta is not None:
- return cache_meta
-
- config = get_archival_config(config)
- backend = config['archive_cache.backend.type']
- if backend != 'filesystem':
- raise ValueError('archive_cache.backend.type only supports "filesystem"')
-
- archive_cache_locking_url = config['archive_cache.locking.url']
- archive_cache_dir = config['archive_cache.filesystem.store_dir']
- archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
- archive_cache_shards = config['archive_cache.filesystem.cache_shards']
- archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
-
- log.debug('Initializing archival cache instance under %s', archive_cache_dir)
-
- # check if it's ok to write, and re-create the archive cache
- if not os.path.isdir(archive_cache_dir):
- os.makedirs(archive_cache_dir, exist_ok=True)
-
- d_cache = FanoutCache(
- archive_cache_dir,
- locking_url=archive_cache_locking_url,
- cache_shards=archive_cache_shards,
- cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
- cache_eviction_policy=archive_cache_eviction_policy
- )
- cache_meta = d_cache
- return cache_meta
+ def _get_size(self, shard, archive_path):
+ return os.stat(archive_path).st_size
diff --git a/rhodecode/lib/archive_cache/backends/objectstore_cache.py b/rhodecode/lib/archive_cache/backends/objectstore_cache.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/archive_cache/backends/objectstore_cache.py
@@ -0,0 +1,150 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import codecs
+import hashlib
+import logging
+import os
+
+import fsspec
+
+from .base import BaseCache, BaseShard
+from ..utils import ShardFileReader, NOT_GIVEN
+from ...type_utils import str2bool
+
+log = logging.getLogger(__name__)
+
+
+class S3Shard(BaseShard):
+
+ def __init__(self, index, bucket, **settings):
+ self._index = index
+ self._bucket = bucket
+ self.storage_type = 'bucket'
+
+ endpoint_url = settings.pop('archive_cache.objectstore.url')
+ key = settings.pop('archive_cache.objectstore.key')
+ secret = settings.pop('archive_cache.objectstore.secret')
+
+ self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
+
+ @property
+ def bucket(self):
+ """Cache bucket."""
+ return self._bucket
+
+ def _get_keyfile(self, archive_key) -> tuple[str, str]:
+ key_file = f'{archive_key}-{self.key_suffix}'
+ return key_file, os.path.join(self.bucket, key_file)
+
+ def _get_writer(self, path, mode):
+ return self.fs.open(path, 'wb')
+
+ def _write_file(self, full_path, iterator, mode):
+ # ensure bucket exists
+ destination = self.bucket
+ if not self.fs.exists(destination):
+ self.fs.mkdir(destination, s3_additional_kwargs={})
+
+ writer = self._get_writer(full_path, mode)
+
+ digest = hashlib.sha256()
+ with writer:
+ size = 0
+ for chunk in iterator:
+ size += len(chunk)
+ digest.update(chunk)
+ writer.write(chunk)
+
+ sha256 = digest.hexdigest()
+ log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
+ return size, sha256
+
+ def store(self, key, value_reader, metadata: dict | None = None):
+ return self._store(key, value_reader, metadata, mode='wb')
+
+ def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
+ return self._fetch(key, retry, retry_attempts, retry_backoff)
+
+ def remove(self, key):
+ return self._remove(key)
+
+ def random_filename(self):
+ """Return filename and full-path tuple for file storage.
+
+ Filename will be a randomly generated 28 character hexadecimal string
+ with ".archive_cache" suffixed. Two levels of sub-directories will be used to
+ reduce the size of directories. On older filesystems, lookups in
+ directories with many files may be slow.
+ """
+
+ hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
+
+ archive_name = hex_name[4:] + '.archive_cache'
+ filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
+
+ full_path = os.path.join(self.bucket, filename)
+ return archive_name, full_path
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
+
+
+class ObjectStoreCache(BaseCache):
+
+ def __init__(self, locking_url, **settings):
+ """
+ Initialize objectstore cache instance.
+
+ :param str locking_url: redis url for a lock
+ :param settings: settings dict
+
+ """
+ self._locking_url = locking_url
+ self._config = settings
+
+ objectstore_url = self.get_conf('archive_cache.objectstore.url')
+ self._storage_path = objectstore_url
+
+ self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
+
+ self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
+ self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
+
+ self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
+ self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
+ self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
+
+ log.debug('Initializing archival cache instance under %s', objectstore_url)
+ self._shards = tuple(
+ S3Shard(
+ index=num,
+ bucket='rhodecode-archivecache-%03d' % num,
+ **settings,
+ )
+ for num in range(self._count)
+ )
+ self._hash = self._shards[0].hash
+
+ def _get_shard(self, key) -> S3Shard:
+ index = self._hash(key) % self._count
+ shard = self._shards[index]
+ return shard
+
+ def _get_size(self, shard, archive_path):
+ return shard.fs.info(archive_path)['size']
diff --git a/rhodecode/lib/rc_cache/archive_cache/lock.py b/rhodecode/lib/archive_cache/lock.py
rename from rhodecode/lib/rc_cache/archive_cache/lock.py
rename to rhodecode/lib/archive_cache/lock.py
--- a/rhodecode/lib/rc_cache/archive_cache/lock.py
+++ b/rhodecode/lib/archive_cache/lock.py
@@ -17,8 +17,11 @@
# and proprietary license terms, please see https://rhodecode.com/licenses/
import redis
-from ..._vendor import redis_lock
-from .utils import ArchiveCacheGenerationLock
+from .._vendor import redis_lock
+
+
+class ArchiveCacheGenerationLock(Exception):
+ pass
class GenerationLock:
diff --git a/rhodecode/lib/rc_cache/archive_cache/utils.py b/rhodecode/lib/archive_cache/utils.py
rename from rhodecode/lib/rc_cache/archive_cache/utils.py
rename to rhodecode/lib/archive_cache/utils.py
--- a/rhodecode/lib/rc_cache/archive_cache/utils.py
+++ b/rhodecode/lib/archive_cache/utils.py
@@ -16,11 +16,26 @@
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
-import os
+import sqlite3
+import s3fs.core
+
+NOT_GIVEN = -917
-class ArchiveCacheGenerationLock(Exception):
- pass
+EVICTION_POLICY = {
+ 'none': {
+ 'evict': None,
+ },
+ 'least-recently-stored': {
+ 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
+ },
+ 'least-recently-used': {
+ 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
+ },
+ 'least-frequently-used': {
+ 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
+ },
+}
def archive_iterator(_reader, block_size: int = 4096 * 512):
@@ -32,41 +47,88 @@ def archive_iterator(_reader, block_size
yield data
-def get_directory_statistics(start_path):
- """
- total_files, total_size, directory_stats = get_directory_statistics(start_path)
-
- print(f"Directory statistics for: {start_path}\n")
- print(f"Total files: {total_files}")
- print(f"Total size: {format_size(total_size)}\n")
-
- :param start_path:
- :return:
- """
-
- total_files = 0
- total_size = 0
- directory_stats = {}
-
- for dir_path, dir_names, file_names in os.walk(start_path):
- dir_size = 0
- file_count = len(file_names)
-
- for file in file_names:
- filepath = os.path.join(dir_path, file)
- file_size = os.path.getsize(filepath)
- dir_size += file_size
-
- directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
- total_files += file_count
- total_size += dir_size
-
- return total_files, total_size, directory_stats
-
-
def format_size(size):
# Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.2f} {unit}"
size /= 1024
+
+
+class StatsDB:
+
+ def __init__(self):
+ self.connection = sqlite3.connect(':memory:')
+ self._init_db()
+
+ def _init_db(self):
+ qry = '''
+ CREATE TABLE IF NOT EXISTS archive_cache (
+ rowid INTEGER PRIMARY KEY,
+ key_file TEXT,
+ key_file_path TEXT,
+ archive_key TEXT,
+ archive_path TEXT,
+ store_time REAL,
+ access_time REAL,
+ access_count INTEGER DEFAULT 0,
+ size INTEGER DEFAULT 0
+ )
+ '''
+
+ self.sql(qry)
+ self.connection.commit()
+
+ @property
+ def sql(self):
+ return self.connection.execute
+
+ def bulk_insert(self, rows):
+ qry = '''
+ INSERT INTO archive_cache (
+ rowid,
+ key_file,
+ key_file_path,
+ archive_key,
+ archive_path,
+ store_time,
+ access_time,
+ access_count,
+ size
+ )
+ VALUES (
+ ?, ?, ?, ?, ?, ?, ?, ?, ?
+ )
+ '''
+ cursor = self.connection.cursor()
+ cursor.executemany(qry, rows)
+ self.connection.commit()
+
+ def get_total_size(self):
+ qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache'
+ ((total_size,),) = self.sql(qry).fetchall()
+ return total_size
+
+ def get_sorted_keys(self, select_policy):
+ select_policy_qry = select_policy.format(fields='key_file, archive_key, size')
+ return self.sql(select_policy_qry).fetchall()
+
+
+class ShardFileReader:
+
+ def __init__(self, file_like_reader):
+ self._file_like_reader = file_like_reader
+
+ def __getattr__(self, item):
+ if isinstance(self._file_like_reader, s3fs.core.S3File):
+ match item:
+ case 'name':
+ # S3 FileWrapper doesn't support name attribute, and we use it
+ return self._file_like_reader.full_name
+ case _:
+ return getattr(self._file_like_reader, item)
+ else:
+ return getattr(self._file_like_reader, item)
+
+ def __repr__(self):
+ return f'<{self.__class__.__name__}={self._file_like_reader}>'
diff --git a/rhodecode/lib/base.py b/rhodecode/lib/base.py
--- a/rhodecode/lib/base.py
+++ b/rhodecode/lib/base.py
@@ -578,7 +578,7 @@ def bootstrap_config(request, registry_n
config.include('pyramid_mako')
config.include('rhodecode.lib.rc_beaker')
config.include('rhodecode.lib.rc_cache')
- config.include('rhodecode.lib.rc_cache.archive_cache')
+ config.include('rhodecode.lib.archive_cache')
add_events_routes(config)
return config
diff --git a/rhodecode/lib/helpers.py b/rhodecode/lib/helpers.py
--- a/rhodecode/lib/helpers.py
+++ b/rhodecode/lib/helpers.py
@@ -2196,3 +2196,35 @@ class IssuesRegistry(object):
@property
def issues_unique_count(self):
return len(set(i['id'] for i in self.issues))
+
+
+def get_directory_statistics(start_path):
+ """
+ total_files, total_size, directory_stats = get_directory_statistics(start_path)
+
+ print(f"Directory statistics for: {start_path}\n")
+ print(f"Total files: {total_files}")
+ print(f"Total size: {format_size(total_size)}\n")
+
+ :param start_path:
+ :return:
+ """
+
+ total_files = 0
+ total_size = 0
+ directory_stats = {}
+
+ for dir_path, dir_names, file_names in os.walk(start_path):
+ dir_size = 0
+ file_count = len(file_names)
+
+ for fname in file_names:
+ filepath = os.path.join(dir_path, fname)
+ file_size = os.path.getsize(filepath)
+ dir_size += file_size
+
+ directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
+ total_files += file_count
+ total_size += dir_size
+
+ return total_files, total_size, directory_stats
diff --git a/rhodecode/lib/system_info.py b/rhodecode/lib/system_info.py
--- a/rhodecode/lib/system_info.py
+++ b/rhodecode/lib/system_info.py
@@ -399,29 +399,23 @@ def storage_inodes():
def storage_archives():
import rhodecode
from rhodecode.lib.helpers import format_byte_size_binary
- from rhodecode.lib.rc_cache.archive_cache.utils import get_directory_statistics
+ from rhodecode.lib.archive_cache import get_archival_cache_store
storage_type = rhodecode.ConfigGet().get_str('archive_cache.backend.type')
- storage_key = 'archive_cache.filesystem.store_dir'
- default_msg = 'Archive cache storage is controlled by '\
- f'{storage_key}=/path/to/cache option in the .ini file'
- path = rhodecode.ConfigGet().get_str(storage_key, missing=default_msg)
-
- value = dict(percent=0, used=0, total=0, items=0, path=path, text='', type=storage_type)
+ value = dict(percent=0, used=0, total=0, items=0, path='', text='', type=storage_type)
state = STATE_OK_DEFAULT
try:
- if storage_type != 'filesystem':
- # raise Exc to stop reporting on different type
- raise ValueError('Storage type must be "filesystem"')
+ d_cache = get_archival_cache_store(config=rhodecode.CONFIG)
- total_files, total_size, _directory_stats = get_directory_statistics(path)
+ total_files, total_size, _directory_stats = d_cache.get_statistics()
value.update({
'percent': 100,
'used': total_size,
'total': total_size,
- 'items': total_files
+ 'items': total_files,
+ 'path': d_cache.storage_path
})
except Exception as e:
@@ -441,8 +435,7 @@ def storage_archives():
def storage_gist():
from rhodecode.model.gist import GIST_STORE_LOC
from rhodecode.lib.utils import safe_str, get_rhodecode_repo_store_path
- from rhodecode.lib.helpers import format_byte_size_binary
- from rhodecode.lib.rc_cache.archive_cache.utils import get_directory_statistics
+ from rhodecode.lib.helpers import format_byte_size_binary, get_directory_statistics
path = safe_str(os.path.join(
get_rhodecode_repo_store_path(), GIST_STORE_LOC))
diff --git a/rhodecode/tests/fixture_mods/fixture_pyramid.py b/rhodecode/tests/fixture_mods/fixture_pyramid.py
--- a/rhodecode/tests/fixture_mods/fixture_pyramid.py
+++ b/rhodecode/tests/fixture_mods/fixture_pyramid.py
@@ -100,7 +100,7 @@ def ini_config(request, tmpdir_factory,
overrides = [
{'server:main': {'port': rcserver_port}},
{'app:main': {
- 'cache_dir': '%(here)s/rc_data',
+ 'cache_dir': '%(here)s/rc-tests/rc_data',
'vcs.server': f'localhost:{vcsserver_port}',
# johbo: We will always start the VCSServer on our own based on the
# fixtures of the test cases. For the test run it must always be
diff --git a/rhodecode/tests/lib/test_archive_caches.py b/rhodecode/tests/lib/test_archive_caches.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/tests/lib/test_archive_caches.py
@@ -0,0 +1,105 @@
+# Copyright (C) 2016-2023 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import time
+import pytest
+import rhodecode
+import os
+import shutil
+from tempfile import mkdtemp
+
+from rhodecode.lib import archive_cache
+
+
+def file_reader(temp_store):
+ with open(temp_store, 'w') as f:
+ for cnt in range(10000):
+ f.write(str(cnt))
+ return open(temp_store, 'rb')
+
+
+@pytest.fixture()
+def d_cache_instance(ini_settings):
+ config = ini_settings
+ d_cache = archive_cache.get_archival_cache_store(config=config, always_init=True)
+ return d_cache
+
+
+@pytest.mark.usefixtures('app')
+class TestArchiveCaches(object):
+
+ def test_archivecache_empty_stats(self, d_cache_instance):
+ d_cache = d_cache_instance
+ shutil.rmtree(d_cache._directory)
+
+ stats = d_cache.get_statistics()
+ assert (0, 0, {}) == stats
+
+ def test_archivecache_store_keys(self, d_cache_instance, tmp_path):
+ d_cache = d_cache_instance
+ shutil.rmtree(d_cache._directory)
+
+ for n in range(100):
+
+ archive_name = f'my-archive-abc-{n}.zip'
+ temp_archive_path = os.path.join(tmp_path, archive_name)
+ d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'})
+ reader, meta = d_cache.fetch(archive_name)
+ content = reader.read()
+ assert content == open(temp_archive_path, 'rb').read()
+
+ stats = d_cache.get_statistics()
+ assert (100, 3889000, {}) == stats
+
+ def test_archivecache_remove_keys(self, d_cache_instance, tmp_path):
+ d_cache = d_cache_instance
+ shutil.rmtree(d_cache._directory)
+
+ n = 1
+ archive_name = f'my-archive-abc-{n}.zip'
+ temp_archive_path = os.path.join(tmp_path, archive_name)
+
+ d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'})
+ stats = d_cache.get_statistics()
+ assert (1, 38890, {}) == stats
+
+ assert 1 == d_cache.remove(archive_name)
+
+ stats = d_cache.get_statistics()
+ assert (0, 0, {}) == stats
+
+ def test_archivecache_evict_keys(self, d_cache_instance, tmp_path):
+ d_cache = d_cache_instance
+ shutil.rmtree(d_cache._directory)
+ tries = 500
+ for n in range(tries):
+
+ archive_name = f'my-archive-abc-{n}.zip'
+ temp_archive_path = os.path.join(tmp_path, archive_name)
+ d_cache.store(archive_name, file_reader(temp_archive_path ), {'foo': 'bar'})
+
+ stats = d_cache.get_statistics()
+ assert (tries, 19445000, {}) == stats
+ evict_to = 0.005 # around (5mb)
+ evicted_items = d_cache.evict(size_limit=d_cache.gb_to_bytes(evict_to))
+ evicted = 361
+ assert evicted == evicted_items
+
+ stats = d_cache.get_statistics()
+ assert (tries - evicted, 5405710, {}) == stats
+
diff --git a/rhodecode/tests/rhodecode.ini b/rhodecode/tests/rhodecode.ini
--- a/rhodecode/tests/rhodecode.ini
+++ b/rhodecode/tests/rhodecode.ini
@@ -258,20 +258,62 @@ file_store.backend = local
; path to store the uploaded binaries and artifacts
file_store.storage_path = /var/opt/rhodecode_data/file_store
-; Uncomment and set this path to control settings for archive download cache.
+
+; Redis url to acquire/check generation of archives locks
+archive_cache.locking.url = redis://redis:6379/1
+
+; Storage backend, only 'filesystem' and 'objectstore' are available now
+archive_cache.backend.type = filesystem
+
+; url for s3 compatible storage that allows to upload artifacts
+; e.g http://minio:9000
+archive_cache.objectstore.url = http://s3-minio:9000
+
+; key for s3 auth
+archive_cache.objectstore.key = key
+
+; secret for s3 auth
+archive_cache.objectstore.secret = secret
+
+; number of sharded buckets to create to distribute archives across
+; default is 8 shards
+archive_cache.objectstore.bucket_shards = 8
+
+; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
+archive_cache.objectstore.retry = false
+
+; number of seconds to wait for next try using retry
+archive_cache.objectstore.retry_backoff = 1
+
+; how many tries do do a retry fetch from this backend
+archive_cache.objectstore.retry_attempts = 10
+
+; Default is $cache_dir/archive_cache if not set
; Generated repo archives will be cached at this location
; and served from the cache during subsequent requests for the same archive of
; the repository. This path is important to be shared across filesystems and with
; RhodeCode and vcsserver
-
-; Default is $cache_dir/archive_cache if not set
-archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache
+archive_cache.filesystem.store_dir = %(here)s/rc-tests/archive_cache
; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
-archive_cache.cache_size_gb = 10
+archive_cache.filesystem.cache_size_gb = 2
+
+; Eviction policy used to clear out after cache_size_gb limit is reached
+archive_cache.filesystem.eviction_policy = least-recently-stored
; By default cache uses sharding technique, this specifies how many shards are there
-archive_cache.cache_shards = 10
+; default is 8 shards
+archive_cache.filesystem.cache_shards = 8
+
+; if true, this cache will try to retry with retry_attempts=N times waiting retry_backoff time
+archive_cache.filesystem.retry = false
+
+; number of seconds to wait for next try using retry
+archive_cache.filesystem.retry_backoff = 1
+
+; how many tries do do a retry fetch from this backend
+archive_cache.filesystem.retry_attempts = 10
+
; #############
; CELERY CONFIG
@@ -335,7 +377,7 @@ rc_cache.cache_repo_longterm.max_size =
rc_cache.cache_general.backend = dogpile.cache.rc.file_namespace
rc_cache.cache_general.expiration_time = 43200
; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
-rc_cache.cache_general.arguments.filename = %(here)s/cache-backend/cache_general_db
+rc_cache.cache_general.arguments.filename = %(here)s/rc-tests/cache-backend/cache_general_db
; alternative `cache_general` redis backend with distributed lock
#rc_cache.cache_general.backend = dogpile.cache.rc.redis
@@ -362,7 +404,7 @@ rc_cache.cache_general.arguments.filenam
rc_cache.cache_perms.backend = dogpile.cache.rc.file_namespace
rc_cache.cache_perms.expiration_time = 0
; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
-rc_cache.cache_perms.arguments.filename = %(here)s/cache-backend/cache_perms_db
+rc_cache.cache_perms.arguments.filename = %(here)s/rc-tests/cache-backend/cache_perms_db
; alternative `cache_perms` redis backend with distributed lock
#rc_cache.cache_perms.backend = dogpile.cache.rc.redis
@@ -389,7 +431,7 @@ rc_cache.cache_perms.arguments.filename
rc_cache.cache_repo.backend = dogpile.cache.rc.file_namespace
rc_cache.cache_repo.expiration_time = 2592000
; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
-rc_cache.cache_repo.arguments.filename = %(here)s/cache-backend/cache_repo_db
+rc_cache.cache_repo.arguments.filename = %(here)s/rc-tests/cache-backend/cache_repo_db
; alternative `cache_repo` redis backend with distributed lock
#rc_cache.cache_repo.backend = dogpile.cache.rc.redis
@@ -432,7 +474,7 @@ beaker.session.data_dir = %(here)s/rc-te
beaker.session.key = rhodecode
beaker.session.secret = test-rc-uytcxaz
-beaker.session.lock_dir = %(here)s/data/sessions/lock
+beaker.session.lock_dir = %(here)s/rc-tests/data/sessions/lock
; Secure encrypted cookie. Requires AES and AES python libraries
; you must disable beaker.session.secret to use this
@@ -464,7 +506,7 @@ beaker.session.secure = false
; WHOOSH Backend, doesn't require additional services to run
; it works good with few dozen repos
search.module = rhodecode.lib.index.whoosh
-search.location = %(here)s/data/index
+search.location = %(here)s/rc-tests/data/index
; ####################
; CHANNELSTREAM CONFIG
@@ -484,7 +526,7 @@ channelstream.server = channelstream:980
; see Nginx/Apache configuration examples in our docs
channelstream.ws_url = ws://rhodecode.yourserver.com/_channelstream
channelstream.secret = ENV_GENERATED
-channelstream.history.location = %(here)s/channelstream_history
+channelstream.history.location = %(here)s/rc-tests/channelstream_history
; Internal application path that Javascript uses to connect into.
; If you use proxy-prefix the prefix should be added before /_channelstream
@@ -501,7 +543,7 @@ channelstream.proxy_path = /_channelstre
; pymysql is an alternative driver for MySQL, use in case of problems with default one
#sqlalchemy.db1.url = mysql+pymysql://root:qweqwe@localhost/rhodecode
-sqlalchemy.db1.url = sqlite:///%(here)s/rhodecode_test.db?timeout=30
+sqlalchemy.db1.url = sqlite:///%(here)s/rc-tests/rhodecode_test.db?timeout=30
; see sqlalchemy docs for other advanced settings
; print the sql statements to output
@@ -590,7 +632,7 @@ svn.proxy.generate_config = false
svn.proxy.list_parent_path = true
; Set location and file name of generated config file.
-svn.proxy.config_file_path = %(here)s/mod_dav_svn.conf
+svn.proxy.config_file_path = %(here)s/rc-tests/mod_dav_svn.conf
; alternative mod_dav config template. This needs to be a valid mako template
; Example template can be found in the source code:
@@ -626,7 +668,7 @@ ssh.generate_authorized_keyfile = true
; Path to the authorized_keys file where the generate entries are placed.
; It is possible to have multiple key files specified in `sshd_config` e.g.
; AuthorizedKeysFile %h/.ssh/authorized_keys %h/.ssh/authorized_keys_rhodecode
-ssh.authorized_keys_file_path = %(here)s/rc/authorized_keys_rhodecode
+ssh.authorized_keys_file_path = %(here)s/rc-tests/authorized_keys_rhodecode
; Command to execute the SSH wrapper. The binary is available in the
; RhodeCode installation directory.
diff --git a/rhodecode/tests/vcs/test_archives.py b/rhodecode/tests/vcs/test_archives.py
--- a/rhodecode/tests/vcs/test_archives.py
+++ b/rhodecode/tests/vcs/test_archives.py
@@ -28,7 +28,7 @@ import mock
import pytest
import rhodecode
-from rhodecode.lib.rc_cache.archive_cache import get_archival_config
+from rhodecode.lib.archive_cache import get_archival_config
from rhodecode.lib.str_utils import ascii_bytes
from rhodecode.lib.vcs.backends import base
from rhodecode.lib.vcs.exceptions import ImproperArchiveTypeError, VCSError