diff --git a/configs/development.ini b/configs/development.ini
--- a/configs/development.ini
+++ b/configs/development.ini
@@ -296,14 +296,24 @@ file_store.storage_path = /var/opt/rhode
; the repository. This path is important to be shared across filesystems and with
; RhodeCode and vcsserver
+; Redis url to acquire/check generation of archives locks
+archive_cache.locking.url = redis://redis:6379/1
+
+; Storage backend, only 'filesystem' is available now
+archive_cache.backend.type = filesystem
+
; Default is $cache_dir/archive_cache if not set
-archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache
+archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
-archive_cache.cache_size_gb = 10
+archive_cache.filesystem.cache_size_gb = 1
+
+; Eviction policy used to clear out after cache_size_gb limit is reached
+archive_cache.filesystem.eviction_policy = least-recently-stored
; By default cache uses sharding technique, this specifies how many shards are there
-archive_cache.cache_shards = 4
+archive_cache.filesystem.cache_shards = 8
+
; #############
; CELERY CONFIG
diff --git a/configs/production.ini b/configs/production.ini
--- a/configs/production.ini
+++ b/configs/production.ini
@@ -264,14 +264,24 @@ file_store.storage_path = /var/opt/rhode
; the repository. This path is important to be shared across filesystems and with
; RhodeCode and vcsserver
+; Redis url to acquire/check generation of archives locks
+archive_cache.locking.url = redis://redis:6379/1
+
+; Storage backend, only 'filesystem' is available now
+archive_cache.backend.type = filesystem
+
; Default is $cache_dir/archive_cache if not set
-archive_cache.store_dir = /var/opt/rhodecode_data/tarballcache
+archive_cache.filesystem.store_dir = /var/opt/rhodecode_data/archive_cache
; The limit in GB sets how much data we cache before recycling last used, defaults to 10 gb
-archive_cache.cache_size_gb = 40
+archive_cache.filesystem.cache_size_gb = 40
+
+; Eviction policy used to clear out after cache_size_gb limit is reached
+archive_cache.filesystem.eviction_policy = least-recently-stored
; By default cache uses sharding technique, this specifies how many shards are there
-archive_cache.cache_shards = 4
+archive_cache.filesystem.cache_shards = 8
+
; #############
; CELERY CONFIG
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -82,7 +82,6 @@ deform==2.0.15
peppercorn==0.6
translationstring==1.4
zope.deprecation==5.0.0
-diskcache==5.6.3
docutils==0.19
dogpile.cache==1.3.3
decorator==5.1.1
diff --git a/rhodecode/apps/repository/views/repo_files.py b/rhodecode/apps/repository/views/repo_files.py
--- a/rhodecode/apps/repository/views/repo_files.py
+++ b/rhodecode/apps/repository/views/repo_files.py
@@ -24,6 +24,8 @@ import urllib.request
import urllib.parse
import urllib.error
import pathlib
+import time
+import random
from pyramid.httpexceptions import HTTPNotFound, HTTPBadRequest, HTTPFound
@@ -37,7 +39,8 @@ from rhodecode.apps._base import RepoApp
from rhodecode.lib import diffs, helpers as h, rc_cache
from rhodecode.lib import audit_logger
from rhodecode.lib.hash_utils import sha1_safe
-from rhodecode.lib.rc_cache.archive_cache import get_archival_cache_store, get_archival_config, ReentrantLock
+from rhodecode.lib.rc_cache.archive_cache import (
+ get_archival_cache_store, get_archival_config, ArchiveCacheLock, archive_iterator)
from rhodecode.lib.str_utils import safe_bytes, convert_special_chars
from rhodecode.lib.view_utils import parse_path_ref
from rhodecode.lib.exceptions import NonRelativePathError
@@ -417,41 +420,45 @@ class RepoFilesView(RepoAppView):
# NOTE: we get the config to pass to a call to lazy-init the SAME type of cache on vcsserver
d_cache_conf = get_archival_config(config=CONFIG)
+ # This is also a cache key, and lock key
reentrant_lock_key = archive_name_key + '.lock'
- with ReentrantLock(d_cache, reentrant_lock_key):
- # This is also a cache key
- use_cached_archive = False
- if not archive_cache_disable and archive_name_key in d_cache:
- reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True)
- use_cached_archive = True
- log.debug('Found cached archive as key=%s tag=%s, serving archive from cache reader=%s',
- archive_name_key, tag, reader.name)
- else:
- reader = None
- log.debug('Archive with key=%s is not yet cached, creating one now...', archive_name_key)
+
+ use_cached_archive = False
+ if not archive_cache_disable and archive_name_key in d_cache:
+ reader, metadata = d_cache.fetch(archive_name_key)
- # generate new archive, as previous was not found in the cache
- if not reader:
-
- try:
- commit.archive_repo(archive_name_key, archive_dir_name=archive_dir_name,
- kind=fileformat, subrepos=subrepos,
- archive_at_path=at_path, cache_config=d_cache_conf)
- except ImproperArchiveTypeError:
- return _('Unknown archive type')
-
- reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True)
+ use_cached_archive = True
+ log.debug('Found cached archive as key=%s tag=%s, serving archive from cache reader=%s',
+ archive_name_key, metadata, reader.name)
+ else:
+ reader = None
+ log.debug('Archive with key=%s is not yet cached, creating one now...', archive_name_key)
if not reader:
- raise ValueError('archive cache reader is empty, failed to fetch file from distributed archive cache')
+ # generate new archive, as previous was not found in the cache
+ try:
+ with d_cache.get_lock(reentrant_lock_key):
+ try:
+ commit.archive_repo(archive_name_key, archive_dir_name=archive_dir_name,
+ kind=fileformat, subrepos=subrepos,
+ archive_at_path=at_path, cache_config=d_cache_conf)
+ except ImproperArchiveTypeError:
+ return _('Unknown archive type')
+ except ArchiveCacheLock:
+ retry_after = round(random.uniform(0.3, 3.0), 1)
+ time.sleep(retry_after)
- def archive_iterator(_reader, block_size: int = 4096*512):
- # 4096 * 64 = 64KB
- while 1:
- data = _reader.read(block_size)
- if not data:
- break
- yield data
+ location = self.request.url
+ response = Response(
+ f"archive {archive_name_key} generation in progress, Retry-After={retry_after}, Location={location}"
+ )
+ response.headers["Retry-After"] = str(retry_after)
+ response.status_code = 307 # temporary redirect
+
+ response.location = location
+ return response
+
+ reader, metadata = d_cache.fetch(archive_name_key)
response = Response(app_iter=archive_iterator(reader))
response.content_disposition = f'attachment; filename={response_archive_name}'
diff --git a/rhodecode/config/config_maker.py b/rhodecode/config/config_maker.py
--- a/rhodecode/config/config_maker.py
+++ b/rhodecode/config/config_maker.py
@@ -189,9 +189,13 @@ def sanitize_settings_and_apply_defaults
settings_maker.make_setting('rc_cache.sql_cache_short.max_size', 10000, parser='int')
# archive_cache
- settings_maker.make_setting('archive_cache.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
- settings_maker.make_setting('archive_cache.cache_size_gb', 10, parser='float')
- settings_maker.make_setting('archive_cache.cache_shards', 10, parser='int')
+ settings_maker.make_setting('archive_cache.locking.url', 'redis://redis:6379/1')
+ settings_maker.make_setting('archive_cache.backend.type', 'filesystem')
+
+ settings_maker.make_setting('archive_cache.filesystem.store_dir', jn(default_cache_dir, 'archive_cache'), default_when_empty=True,)
+ settings_maker.make_setting('archive_cache.filesystem.cache_size_gb', 10, parser='float')
+ settings_maker.make_setting('archive_cache.filesystem.cache_shards', 8, parser='int')
+ settings_maker.make_setting('archive_cache.filesystem.eviction_policy', 'least-recently-stored')
settings_maker.env_expand()
diff --git a/rhodecode/lib/rc_cache/archive_cache/__init__.py b/rhodecode/lib/rc_cache/archive_cache/__init__.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/rc_cache/archive_cache/__init__.py
@@ -0,0 +1,29 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+from .fanout_cache import get_archival_cache_store
+from .fanout_cache import get_archival_config
+
+from .utils import archive_iterator
+from .utils import ArchiveCacheLock
+
+
+def includeme(config):
+ # init our cache at start
+ settings = config.get_settings()
+ get_archival_cache_store(settings)
diff --git a/rhodecode/lib/rc_cache/archive_cache.py b/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py
rename from rhodecode/lib/rc_cache/archive_cache.py
rename to rhodecode/lib/rc_cache/archive_cache/fanout_cache.py
--- a/rhodecode/lib/rc_cache/archive_cache.py
+++ b/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2023 RhodeCode GmbH
+# Copyright (C) 2015-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
@@ -16,33 +16,210 @@
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
-import logging
+import codecs
+import contextlib
+import functools
import os
-import diskcache
-from diskcache import RLock
+import logging
+import time
+import typing
+import zlib
+
+from rhodecode.lib.ext_json import json
+from .lock import GenerationLock
log = logging.getLogger(__name__)
cache_meta = None
+UNKNOWN = -241
+NO_VAL = -917
-class ReentrantLock(RLock):
- def __enter__(self):
- reentrant_lock_key = self._key
+MODE_BINARY = 'BINARY'
+
+
+class FileSystemCache:
+
+ def __init__(self, index, directory, **settings):
+ self._index = index
+ self._directory = directory
+
+ def _write_file(self, full_path, iterator, mode, encoding=None):
+ full_dir, _ = os.path.split(full_path)
+
+ for count in range(1, 11):
+ with contextlib.suppress(OSError):
+ os.makedirs(full_dir)
+
+ try:
+ # Another cache may have deleted the directory before
+ # the file could be opened.
+ writer = open(full_path, mode, encoding=encoding)
+ except OSError:
+ if count == 10:
+ # Give up after 10 tries to open the file.
+ raise
+ continue
+
+ with writer:
+ size = 0
+ for chunk in iterator:
+ size += len(chunk)
+ writer.write(chunk)
+ return size
+
+ def _get_keyfile(self, key):
+ return os.path.join(self._directory, f'{key}.key')
+
+ def store(self, key, value_reader, metadata):
+ filename, full_path = self.random_filename()
+ key_file = self._get_keyfile(key)
+
+ # STORE METADATA
+ _metadata = {
+ "version": "v1",
+ "timestamp": time.time(),
+ "filename": filename,
+ "full_path": full_path,
+ "key_file": key_file,
+ }
+ if metadata:
+ _metadata.update(metadata)
+
+ reader = functools.partial(value_reader.read, 2**22)
+
+ iterator = iter(reader, b'')
+ size = self._write_file(full_path, iterator, 'xb')
+
+ # after archive is finished, we create a key to save the presence of the binary file
+ with open(key_file, 'wb') as f:
+ f.write(json.dumps(_metadata))
+
+ return key, size, MODE_BINARY, filename, _metadata
+
+ def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
+ if key not in self:
+ raise KeyError(key)
+
+ key_file = self._get_keyfile(key)
+ with open(key_file, 'rb') as f:
+ metadata = json.loads(f.read())
+
+ filename = metadata['filename']
+
+ return open(os.path.join(self._directory, filename), 'rb'), metadata
+
+ def random_filename(self):
+ """Return filename and full-path tuple for file storage.
+
+ Filename will be a randomly generated 28 character hexadecimal string
+ with ".archive_cache" suffixed. Two levels of sub-directories will be used to
+ reduce the size of directories. On older filesystems, lookups in
+ directories with many files may be slow.
+ """
- log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key)
- #self.acquire()
- log.debug('Lock for key=%s acquired', reentrant_lock_key)
+ hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
+ sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
+ name = hex_name[4:] + '.archive_cache'
+ filename = os.path.join(sub_dir, name)
+ full_path = os.path.join(self._directory, filename)
+ return filename, full_path
+
+ def hash(self, key):
+ """Compute portable hash for `key`.
+
+ :param key: key to hash
+ :return: hash value
+
+ """
+ mask = 0xFFFFFFFF
+ return zlib.adler32(key.encode('utf-8')) & mask # noqa
+
+ def __contains__(self, key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param key: key matching item
+ :return: True if key matching item
+
+ """
+ key_file = self._get_keyfile(key)
+ return os.path.exists(key_file)
+
+
+class FanoutCache:
+ """Cache that shards keys and values."""
+
+ def __init__(
+ self, directory=None, **settings
+ ):
+ """Initialize cache instance.
+
+ :param str directory: cache directory
+ :param settings: settings dict
+
+ """
+ if directory is None:
+ raise ValueError('directory cannot be None')
+
+ directory = str(directory)
+ directory = os.path.expanduser(directory)
+ directory = os.path.expandvars(directory)
+ self._directory = directory
- def __exit__(self, *exc_info):
- #self.release()
- pass
+ self._count = settings.pop('cache_shards')
+ self._locking_url = settings.pop('locking_url')
+
+ self._shards = tuple(
+ FileSystemCache(
+ index=num,
+ directory=os.path.join(directory, 'shard_%03d' % num),
+ **settings,
+ )
+ for num in range(self._count)
+ )
+ self._hash = self._shards[0].hash
+
+ def get_lock(self, lock_key):
+ return GenerationLock(lock_key, self._locking_url)
+
+ def _get_shard(self, key) -> FileSystemCache:
+ index = self._hash(key) % self._count
+ shard = self._shards[index]
+ return shard
+
+ def store(self, key, value_reader, metadata=None):
+ shard = self._get_shard(key)
+ return shard.store(key, value_reader, metadata)
+
+ def fetch(self, key):
+ """Return file handle corresponding to `key` from cache.
+ """
+ shard = self._get_shard(key)
+ return shard.fetch(key)
+
+ def has_key(self, key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param key: key for item
+ :return: True if key is found
+
+ """
+ shard = self._get_shard(key)
+ return key in shard
+
+ def __contains__(self, item):
+ return self.has_key(item)
+
+ def evict(self):
+ """Remove old items based on the conditions"""
+ # TODO: Implement this...
+ return
def get_archival_config(config):
final_config = {
- 'archive_cache.eviction_policy': 'least-frequently-used'
+
}
for k, v in config.items():
@@ -59,11 +236,15 @@ def get_archival_cache_store(config):
return cache_meta
config = get_archival_config(config)
+ backend = config['archive_cache.backend.type']
+ if backend != 'filesystem':
+ raise ValueError('archive_cache.backend.type only supports "filesystem"')
- archive_cache_dir = config['archive_cache.store_dir']
- archive_cache_size_gb = config['archive_cache.cache_size_gb']
- archive_cache_shards = config['archive_cache.cache_shards']
- archive_cache_eviction_policy = config['archive_cache.eviction_policy']
+ archive_cache_locking_url = config['archive_cache.locking.url']
+ archive_cache_dir = config['archive_cache.filesystem.store_dir']
+ archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
+ archive_cache_shards = config['archive_cache.filesystem.cache_shards']
+ archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
log.debug('Initializing archival cache instance under %s', archive_cache_dir)
@@ -71,18 +252,13 @@ def get_archival_cache_store(config):
if not os.path.isdir(archive_cache_dir):
os.makedirs(archive_cache_dir, exist_ok=True)
- d_cache = diskcache.FanoutCache(
- archive_cache_dir, shards=archive_cache_shards,
- cull_limit=0, # manual eviction required
- size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
- eviction_policy=archive_cache_eviction_policy,
- timeout=30
+ d_cache = FanoutCache(
+ archive_cache_dir,
+ locking_url=archive_cache_locking_url,
+ cache_shards=archive_cache_shards,
+ cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
+ cache_eviction_policy=archive_cache_eviction_policy
)
cache_meta = d_cache
return cache_meta
-
-def includeme(config):
- # init our cache at start
- settings = config.get_settings()
- get_archival_cache_store(settings)
diff --git a/rhodecode/lib/rc_cache/archive_cache/lock.py b/rhodecode/lib/rc_cache/archive_cache/lock.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/rc_cache/archive_cache/lock.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import redis
+from rhodecode.lib._vendor import redis_lock
+
+from .utils import ArchiveCacheLock
+
+
+class GenerationLock:
+ """
+ Locking mechanism that detects if a lock is acquired
+
+ with GenerationLock(lock_key):
+ compute_archive()
+ """
+ lock_timeout = 7200
+
+ def __init__(self, lock_key, url):
+ self.lock_key = lock_key
+ self._create_client(url)
+ self.lock = self.get_lock()
+
+ def _create_client(self, url):
+ connection_pool = redis.ConnectionPool.from_url(url)
+ self.writer_client = redis.StrictRedis(
+ connection_pool=connection_pool
+ )
+ self.reader_client = self.writer_client
+
+ def get_lock(self):
+ return redis_lock.Lock(
+ redis_client=self.writer_client,
+ name=self.lock_key,
+ expire=self.lock_timeout,
+ strict=True
+ )
+
+ def __enter__(self):
+ acquired = self.lock.acquire(blocking=False)
+ if not acquired:
+ raise ArchiveCacheLock('Failed to create a lock')
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.lock.release()
diff --git a/rhodecode/lib/rc_cache/archive_cache/utils.py b/rhodecode/lib/rc_cache/archive_cache/utils.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/rc_cache/archive_cache/utils.py
@@ -0,0 +1,30 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+
+class ArchiveCacheLock(Exception):
+ pass
+
+
+def archive_iterator(_reader, block_size: int = 4096 * 512):
+ # 4096 * 64 = 64KB
+ while 1:
+ data = _reader.read(block_size)
+ if not data:
+ break
+ yield data
diff --git a/rhodecode/lib/system_info.py b/rhodecode/lib/system_info.py
--- a/rhodecode/lib/system_info.py
+++ b/rhodecode/lib/system_info.py
@@ -401,13 +401,20 @@ def storage_archives():
from rhodecode.lib.utils import safe_str
from rhodecode.lib.helpers import format_byte_size_binary
- msg = 'Archive cache storage is controlled by ' \
- 'archive_cache.store_dir=/path/to/cache option in the .ini file'
- path = safe_str(rhodecode.CONFIG.get('archive_cache.store_dir', msg))
+ storage_type = rhodecode.ConfigGet().get_str('archive_cache.backend.type')
+ storage_key = 'archive_cache.filesystem.store_dir'
+
+ default_msg = 'Archive cache storage is controlled by '\
+ f'{storage_key}=/path/to/cache option in the .ini file'
+ path = rhodecode.ConfigGet().get_str(storage_key, missing=default_msg)
value = dict(percent=0, used=0, total=0, items=0, path=path, text='')
state = STATE_OK_DEFAULT
try:
+ if storage_type != 'filesystem':
+ # raise Exc to stop reporting on different type
+ raise ValueError('Storage type must be "filesystem"')
+
items_count = 0
used = 0
for root, dirs, files in os.walk(path):