diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -29,6 +29,7 @@ dogpile.cache==1.3.3
pbr==5.11.1
dulwich==0.21.6
urllib3==1.26.14
+fsspec==2024.6.0
gunicorn==21.2.0
packaging==24.0
hg-evolve==11.1.3
@@ -59,6 +60,34 @@ pyramid==2.0.2
redis==5.0.4
async-timeout==4.0.3
repoze.lru==0.7
+s3fs==2024.6.0
+ aiobotocore==2.13.0
+ aiohttp==3.9.5
+ aiosignal==1.3.1
+ frozenlist==1.4.1
+ attrs==22.2.0
+ frozenlist==1.4.1
+ multidict==6.0.5
+ yarl==1.9.4
+ idna==3.4
+ multidict==6.0.5
+ aioitertools==0.11.0
+ botocore==1.34.106
+ jmespath==1.0.1
+ python-dateutil==2.8.2
+ six==1.16.0
+ urllib3==1.26.14
+ wrapt==1.16.0
+ aiohttp==3.9.5
+ aiosignal==1.3.1
+ frozenlist==1.4.1
+ attrs==22.2.0
+ frozenlist==1.4.1
+ multidict==6.0.5
+ yarl==1.9.4
+ idna==3.4
+ multidict==6.0.5
+ fsspec==2024.6.0
scandir==1.10.0
setproctitle==1.3.3
subvertpy==0.11.0
diff --git a/vcsserver/base.py b/vcsserver/base.py
--- a/vcsserver/base.py
+++ b/vcsserver/base.py
@@ -20,7 +20,7 @@ import tempfile
import logging
import urllib.parse
-from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
+from vcsserver.lib.archive_cache import get_archival_cache_store
from vcsserver import exceptions
from vcsserver.exceptions import NoContentException
diff --git a/vcsserver/http_main.py b/vcsserver/http_main.py
--- a/vcsserver/http_main.py
+++ b/vcsserver/http_main.py
@@ -233,7 +233,7 @@ class HTTPApplication:
self.global_config = global_config
self.config.include('vcsserver.lib.rc_cache')
- self.config.include('vcsserver.lib.rc_cache.archive_cache')
+ self.config.include('vcsserver.lib.archive_cache')
settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
vcs = VCS(locale_conf=settings_locale, cache_config=settings)
diff --git a/vcsserver/lib/archive_cache/__init__.py b/vcsserver/lib/archive_cache/__init__.py
new file mode 100644
--- /dev/null
+++ b/vcsserver/lib/archive_cache/__init__.py
@@ -0,0 +1,79 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import logging
+
+from .backends.fanout_cache import FileSystemFanoutCache
+from .backends.objectstore_cache import ObjectStoreCache
+
+from .utils import archive_iterator # noqa
+from .lock import ArchiveCacheGenerationLock # noqa
+
+log = logging.getLogger(__name__)
+
+
+cache_meta = None
+
+
+def includeme(config):
+ return # vcsserver gets its config from rhodecode on a remote call
+ # init our cache at start
+ settings = config.get_settings()
+ get_archival_cache_store(settings)
+
+
+def get_archival_config(config):
+
+ final_config = {
+
+ }
+
+ for k, v in config.items():
+ if k.startswith('archive_cache'):
+ final_config[k] = v
+
+ return final_config
+
+
+def get_archival_cache_store(config, always_init=False):
+
+ global cache_meta
+ if cache_meta is not None and not always_init:
+ return cache_meta
+
+ config = get_archival_config(config)
+ backend = config['archive_cache.backend.type']
+
+ archive_cache_locking_url = config['archive_cache.locking.url']
+
+ match backend:
+ case 'filesystem':
+ d_cache = FileSystemFanoutCache(
+ locking_url=archive_cache_locking_url,
+ **config
+ )
+ case 'objectstore':
+ d_cache = ObjectStoreCache(
+ locking_url=archive_cache_locking_url,
+ **config
+ )
+ case _:
+ raise ValueError(f'archive_cache.backend.type only supports "filesystem" or "objectstore" got {backend} ')
+
+ cache_meta = d_cache
+ return cache_meta
diff --git a/vcsserver/lib/archive_cache/backends/__init__.py b/vcsserver/lib/archive_cache/backends/__init__.py
new file mode 100644
--- /dev/null
+++ b/vcsserver/lib/archive_cache/backends/__init__.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
diff --git a/vcsserver/lib/archive_cache/backends/base.py b/vcsserver/lib/archive_cache/backends/base.py
new file mode 100644
--- /dev/null
+++ b/vcsserver/lib/archive_cache/backends/base.py
@@ -0,0 +1,348 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import functools
+import logging
+import typing
+import time
+import zlib
+
+from ...ext_json import json
+from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
+from ..lock import GenerationLock
+
+log = logging.getLogger(__name__)
+
+
+class BaseShard:
+ storage_type: str = ''
+ fs = None
+
+ @classmethod
+ def hash(cls, key):
+ """Compute portable hash for `key`.
+
+ :param key: key to hash
+ :return: hash value
+
+ """
+ mask = 0xFFFFFFFF
+ return zlib.adler32(key.encode('utf-8')) & mask # noqa
+
+ def _write_file(self, full_path, read_iterator, mode):
+ raise NotImplementedError
+
+ def _get_keyfile(self, key):
+ raise NotImplementedError
+
+ def random_filename(self):
+ raise NotImplementedError
+
+ def _store(self, key, value_reader, metadata, mode):
+ (filename, # hash-name
+ full_path # full-path/hash-name
+ ) = self.random_filename()
+
+ key_file, key_file_path = self._get_keyfile(key)
+
+ # STORE METADATA
+ _metadata = {
+ "version": "v1",
+
+ "key_file": key_file, # this is the .key.json file storing meta
+ "key_file_path": key_file_path, # full path to key_file
+ "archive_key": key, # original name we stored archive under, e.g my-archive.zip
+ "archive_filename": filename, # the actual filename we stored that file under
+ "archive_full_path": full_path,
+
+ "store_time": time.time(),
+ "access_count": 0,
+ "access_time": 0,
+
+ "size": 0
+ }
+ if metadata:
+ _metadata.update(metadata)
+
+ read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
+ size, sha256 = self._write_file(full_path, read_iterator, mode)
+ _metadata['size'] = size
+ _metadata['sha256'] = sha256
+
+ # after archive is finished, we create a key to save the presence of the binary file
+ with self.fs.open(key_file_path, 'wb') as f:
+ f.write(json.dumps(_metadata))
+
+ return key, filename, size, _metadata
+
+ def _fetch(self, key, retry, retry_attempts, retry_backoff):
+ if retry is NOT_GIVEN:
+ retry = False
+ if retry_attempts is NOT_GIVEN:
+ retry_attempts = 0
+
+ if retry and retry_attempts > 0:
+ for attempt in range(1, retry_attempts + 1):
+ if key in self:
+ break
+ # we didn't find the key, wait retry_backoff N seconds, and re-check
+ time.sleep(retry_backoff)
+
+ if key not in self:
+ log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
+ raise KeyError(key)
+
+ key_file, key_file_path = self._get_keyfile(key)
+ with self.fs.open(key_file_path, 'rb') as f:
+ metadata = json.loads(f.read())
+
+ archive_path = metadata['archive_full_path']
+
+ try:
+ return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
+ finally:
+ # update usage stats, count and accessed
+ metadata["access_count"] = metadata.get("access_count", 0) + 1
+ metadata["access_time"] = time.time()
+ log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
+ key_file, metadata['access_count'], metadata['access_time'])
+ with self.fs.open(key_file_path, 'wb') as f:
+ f.write(json.dumps(metadata))
+
+ def _remove(self, key):
+ if key not in self:
+ log.exception(f'requested key={key} not found in {self}')
+ raise KeyError(key)
+
+ key_file, key_file_path = self._get_keyfile(key)
+ with self.fs.open(key_file_path, 'rb') as f:
+ metadata = json.loads(f.read())
+
+ archive_path = metadata['archive_full_path']
+ self.fs.rm(archive_path)
+ self.fs.rm(key_file_path)
+ return 1
+
+ @property
+ def storage_medium(self):
+ return getattr(self, self.storage_type)
+
+ @property
+ def key_suffix(self):
+ return 'key.json'
+
+ def __contains__(self, key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param key: key matching item
+ :return: True if key matching item
+
+ """
+ key_file, key_file_path = self._get_keyfile(key)
+ return self.fs.exists(key_file_path)
+
+
+class BaseCache:
+ _locking_url: str = ''
+ _storage_path: str = ''
+ _config = {}
+ retry = False
+ retry_attempts = 0
+ retry_backoff = 1
+ _shards = tuple()
+
+ def __contains__(self, key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param key: key matching item
+ :return: True if key matching item
+
+ """
+ return self.has_key(key)
+
+ def __repr__(self):
+ return f'<{self.__class__.__name__}(storage={self._storage_path})>'
+
+ @classmethod
+ def gb_to_bytes(cls, gb):
+ return gb * (1024 ** 3)
+
+ @property
+ def storage_path(self):
+ return self._storage_path
+
+ @classmethod
+ def get_stats_db(cls):
+ return StatsDB()
+
+ def get_conf(self, key, pop=False):
+ if key not in self._config:
+ raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
+ val = self._config[key]
+ if pop:
+ del self._config[key]
+ return val
+
+ def _get_shard(self, key):
+ raise NotImplementedError
+
+ def _get_size(self, shard, archive_path):
+ raise NotImplementedError
+
+ def store(self, key, value_reader, metadata=None):
+ shard = self._get_shard(key)
+ return shard.store(key, value_reader, metadata)
+
+ def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
+ """
+ Return file handle corresponding to `key` from specific shard cache.
+ """
+ if retry is NOT_GIVEN:
+ retry = self.retry
+ if retry_attempts is NOT_GIVEN:
+ retry_attempts = self.retry_attempts
+ retry_backoff = self.retry_backoff
+
+ shard = self._get_shard(key)
+ return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
+
+ def remove(self, key):
+ shard = self._get_shard(key)
+ return shard.remove(key)
+
+ def has_key(self, archive_key):
+ """Return `True` if `key` matching item is found in cache.
+
+ :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
+ :return: True if key is found
+
+ """
+ shard = self._get_shard(archive_key)
+ return archive_key in shard
+
+ def iter_keys(self):
+ for shard in self._shards:
+ if shard.fs.exists(shard.storage_medium):
+ for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
+ for key_file_path in _files:
+ if key_file_path.endswith(shard.key_suffix):
+ yield shard, key_file_path
+
+ def get_lock(self, lock_key):
+ return GenerationLock(lock_key, self._locking_url)
+
+ def evict(self, policy=None, size_limit=None) -> int:
+ """
+ Remove old items based on the conditions
+
+
+ explanation of this algo:
+ iterate over each shard, then for each shard iterate over the .key files
+ read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
+ access data, time creation, and access counts.
+
+ Store that into a memory DB so we can run different sorting strategies easily.
+ Summing the size is a sum sql query.
+
+ Then we run a sorting strategy based on eviction policy.
+ We iterate over sorted keys, and remove each checking if we hit the overall limit.
+ """
+
+ policy = policy or self._eviction_policy
+ size_limit = size_limit or self._cache_size_limit
+
+ select_policy = EVICTION_POLICY[policy]['evict']
+
+ log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
+ policy, format_size(size_limit))
+
+ if select_policy is None:
+ return 0
+
+ db = self.get_stats_db()
+
+ data = []
+ cnt = 1
+
+ for shard, key_file in self.iter_keys():
+ with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
+ metadata = json.loads(f.read())
+
+ key_file_path = os.path.join(shard.storage_medium, key_file)
+
+ archive_key = metadata['archive_key']
+ archive_path = metadata['archive_full_path']
+
+ size = metadata.get('size')
+ if not size:
+ # in case we don't have size re-calc it...
+ size = self._get_size(shard, archive_path)
+
+ data.append([
+ cnt,
+ key_file,
+ key_file_path,
+ archive_key,
+ archive_path,
+ metadata.get('store_time', 0),
+ metadata.get('access_time', 0),
+ metadata.get('access_count', 0),
+ size,
+ ])
+ cnt += 1
+
+ # Insert bulk data using executemany
+ db.bulk_insert(data)
+
+ total_size = db.get_total_size()
+ log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
+ len(data), format_size(total_size), format_size(size_limit))
+
+ removed_items = 0
+ removed_size = 0
+ for key_file, archive_key, size in db.get_sorted_keys(select_policy):
+ # simulate removal impact BEFORE removal
+ total_size -= size
+
+ if total_size <= size_limit:
+ # we obtained what we wanted...
+ break
+
+ self.remove(archive_key)
+ removed_items += 1
+ removed_size += size
+
+ log.debug('Removed %s cache archives, and reduced size by: %s',
+ removed_items, format_size(removed_size))
+ return removed_items
+
+ def get_statistics(self):
+ total_files = 0
+ total_size = 0
+ meta = {}
+
+ for shard, key_file in self.iter_keys():
+ json_key = f"{shard.storage_medium}/{key_file}"
+ with shard.fs.open(json_key, 'rb') as f:
+ total_files += 1
+ metadata = json.loads(f.read())
+ total_size += metadata['size']
+
+ return total_files, total_size, meta
+
diff --git a/vcsserver/lib/archive_cache/backends/fanout_cache.py b/vcsserver/lib/archive_cache/backends/fanout_cache.py
new file mode 100644
--- /dev/null
+++ b/vcsserver/lib/archive_cache/backends/fanout_cache.py
@@ -0,0 +1,166 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import codecs
+import hashlib
+import logging
+import os
+
+import fsspec
+
+from .base import BaseCache, BaseShard
+from ..utils import ShardFileReader, NOT_GIVEN
+from ...type_utils import str2bool
+
+log = logging.getLogger(__name__)
+
+
+class FileSystemShard(BaseShard):
+
+ def __init__(self, index, directory, **settings):
+ self._index = index
+ self._directory = directory
+ self.storage_type = 'directory'
+ self.fs = fsspec.filesystem('file')
+
+ @property
+ def directory(self):
+ """Cache directory."""
+ return self._directory
+
+ def _get_keyfile(self, archive_key) -> tuple[str, str]:
+ key_file = f'{archive_key}.{self.key_suffix}'
+ return key_file, os.path.join(self.directory, key_file)
+
+ def _get_writer(self, path, mode):
+ for count in range(1, 11):
+ try:
+ # Another cache may have deleted the directory before
+ # the file could be opened.
+ return self.fs.open(path, mode)
+ except OSError:
+ if count == 10:
+ # Give up after 10 tries to open the file.
+ raise
+ continue
+
+ def _write_file(self, full_path, iterator, mode):
+ # ensure dir exists
+ destination, _ = os.path.split(full_path)
+ if not self.fs.exists(destination):
+ self.fs.makedirs(destination)
+
+ writer = self._get_writer(full_path, mode)
+
+ digest = hashlib.sha256()
+ with writer:
+ size = 0
+ for chunk in iterator:
+ size += len(chunk)
+ digest.update(chunk)
+ writer.write(chunk)
+ writer.flush()
+ # Get the file descriptor
+ fd = writer.fileno()
+
+ # Sync the file descriptor to disk, helps with NFS cases...
+ os.fsync(fd)
+ sha256 = digest.hexdigest()
+ log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
+ return size, sha256
+
+ def store(self, key, value_reader, metadata: dict | None = None):
+ return self._store(key, value_reader, metadata, mode='xb')
+
+ def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
+ return self._fetch(key, retry, retry_attempts, retry_backoff)
+
+ def remove(self, key):
+ return self._remove(key)
+
+ def random_filename(self):
+ """Return filename and full-path tuple for file storage.
+
+ Filename will be a randomly generated 28 character hexadecimal string
+ with ".archive_cache" suffixed. Two levels of sub-directories will be used to
+ reduce the size of directories. On older filesystems, lookups in
+ directories with many files may be slow.
+ """
+
+ hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
+
+ archive_name = hex_name[4:] + '.archive_cache'
+ filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
+
+ full_path = os.path.join(self.directory, filename)
+ return archive_name, full_path
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
+
+
+class FileSystemFanoutCache(BaseCache):
+
+ def __init__(self, locking_url, **settings):
+ """
+ Initialize file system cache instance.
+
+ :param str locking_url: redis url for a lock
+ :param settings: settings dict
+
+ """
+ self._locking_url = locking_url
+ self._config = settings
+ cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
+ directory = str(cache_dir)
+ directory = os.path.expanduser(directory)
+ directory = os.path.expandvars(directory)
+ self._directory = directory
+ self._storage_path = directory
+
+ # check if it's ok to write, and re-create the archive cache
+ if not os.path.isdir(self._directory):
+ os.makedirs(self._directory, exist_ok=True)
+
+ self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
+
+ self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
+ self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
+
+ self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
+ self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
+ self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
+
+ log.debug('Initializing archival cache instance under %s', self._directory)
+ self._shards = tuple(
+ FileSystemShard(
+ index=num,
+ directory=os.path.join(directory, 'shard_%03d' % num),
+ **settings,
+ )
+ for num in range(self._count)
+ )
+ self._hash = self._shards[0].hash
+
+ def _get_shard(self, key) -> FileSystemShard:
+ index = self._hash(key) % self._count
+ shard = self._shards[index]
+ return shard
+
+ def _get_size(self, shard, archive_path):
+ return os.stat(archive_path).st_size
diff --git a/vcsserver/lib/archive_cache/backends/objectstore_cache.py b/vcsserver/lib/archive_cache/backends/objectstore_cache.py
new file mode 100644
--- /dev/null
+++ b/vcsserver/lib/archive_cache/backends/objectstore_cache.py
@@ -0,0 +1,150 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import codecs
+import hashlib
+import logging
+import os
+
+import fsspec
+
+from .base import BaseCache, BaseShard
+from ..utils import ShardFileReader, NOT_GIVEN
+from ...type_utils import str2bool
+
+log = logging.getLogger(__name__)
+
+
+class S3Shard(BaseShard):
+
+ def __init__(self, index, bucket, **settings):
+ self._index = index
+ self._bucket = bucket
+ self.storage_type = 'bucket'
+
+ endpoint_url = settings.pop('archive_cache.objectstore.url')
+ key = settings.pop('archive_cache.objectstore.key')
+ secret = settings.pop('archive_cache.objectstore.secret')
+
+ self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
+
+ @property
+ def bucket(self):
+ """Cache bucket."""
+ return self._bucket
+
+ def _get_keyfile(self, archive_key) -> tuple[str, str]:
+ key_file = f'{archive_key}-{self.key_suffix}'
+ return key_file, os.path.join(self.bucket, key_file)
+
+ def _get_writer(self, path, mode):
+ return self.fs.open(path, 'wb')
+
+ def _write_file(self, full_path, iterator, mode):
+ # ensure bucket exists
+ destination = self.bucket
+ if not self.fs.exists(destination):
+ self.fs.mkdir(destination, s3_additional_kwargs={})
+
+ writer = self._get_writer(full_path, mode)
+
+ digest = hashlib.sha256()
+ with writer:
+ size = 0
+ for chunk in iterator:
+ size += len(chunk)
+ digest.update(chunk)
+ writer.write(chunk)
+
+ sha256 = digest.hexdigest()
+ log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
+ return size, sha256
+
+ def store(self, key, value_reader, metadata: dict | None = None):
+ return self._store(key, value_reader, metadata, mode='wb')
+
+ def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
+ return self._fetch(key, retry, retry_attempts, retry_backoff)
+
+ def remove(self, key):
+ return self._remove(key)
+
+ def random_filename(self):
+ """Return filename and full-path tuple for file storage.
+
+ Filename will be a randomly generated 28 character hexadecimal string
+ with ".archive_cache" suffixed. Two levels of sub-directories will be used to
+ reduce the size of directories. On older filesystems, lookups in
+ directories with many files may be slow.
+ """
+
+ hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
+
+ archive_name = hex_name[4:] + '.archive_cache'
+ filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
+
+ full_path = os.path.join(self.bucket, filename)
+ return archive_name, full_path
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
+
+
+class ObjectStoreCache(BaseCache):
+
+ def __init__(self, locking_url, **settings):
+ """
+ Initialize objectstore cache instance.
+
+ :param str locking_url: redis url for a lock
+ :param settings: settings dict
+
+ """
+ self._locking_url = locking_url
+ self._config = settings
+
+ objectstore_url = self.get_conf('archive_cache.objectstore.url')
+ self._storage_path = objectstore_url
+
+ self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
+
+ self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
+ self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
+
+ self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
+ self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
+ self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
+
+ log.debug('Initializing archival cache instance under %s', objectstore_url)
+ self._shards = tuple(
+ S3Shard(
+ index=num,
+ bucket='rhodecode-archivecache-%03d' % num,
+ **settings,
+ )
+ for num in range(self._count)
+ )
+ self._hash = self._shards[0].hash
+
+ def _get_shard(self, key) -> S3Shard:
+ index = self._hash(key) % self._count
+ shard = self._shards[index]
+ return shard
+
+ def _get_size(self, shard, archive_path):
+ return shard.fs.info(archive_path)['size']
diff --git a/vcsserver/lib/archive_cache/lock.py b/vcsserver/lib/archive_cache/lock.py
new file mode 100644
--- /dev/null
+++ b/vcsserver/lib/archive_cache/lock.py
@@ -0,0 +1,62 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import redis
+from .._vendor import redis_lock
+
+
+class ArchiveCacheGenerationLock(Exception):
+ pass
+
+
+class GenerationLock:
+ """
+ Locking mechanism that detects if a lock is acquired
+
+ with GenerationLock(lock_key):
+ compute_archive()
+ """
+ lock_timeout = 7200
+
+ def __init__(self, lock_key, url):
+ self.lock_key = lock_key
+ self._create_client(url)
+ self.lock = self.get_lock()
+
+ def _create_client(self, url):
+ connection_pool = redis.ConnectionPool.from_url(url)
+ self.writer_client = redis.StrictRedis(
+ connection_pool=connection_pool
+ )
+ self.reader_client = self.writer_client
+
+ def get_lock(self):
+ return redis_lock.Lock(
+ redis_client=self.writer_client,
+ name=self.lock_key,
+ expire=self.lock_timeout,
+ strict=True
+ )
+
+ def __enter__(self):
+ acquired = self.lock.acquire(blocking=False)
+ if not acquired:
+ raise ArchiveCacheGenerationLock('Failed to create a lock')
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.lock.release()
diff --git a/vcsserver/lib/archive_cache/utils.py b/vcsserver/lib/archive_cache/utils.py
new file mode 100644
--- /dev/null
+++ b/vcsserver/lib/archive_cache/utils.py
@@ -0,0 +1,134 @@
+# Copyright (C) 2015-2024 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import sqlite3
+import s3fs.core
+
+NOT_GIVEN = -917
+
+
+EVICTION_POLICY = {
+ 'none': {
+ 'evict': None,
+ },
+ 'least-recently-stored': {
+ 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
+ },
+ 'least-recently-used': {
+ 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
+ },
+ 'least-frequently-used': {
+ 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
+ },
+}
+
+
+def archive_iterator(_reader, block_size: int = 4096 * 512):
+ # 4096 * 64 = 64KB
+ while 1:
+ data = _reader.read(block_size)
+ if not data:
+ break
+ yield data
+
+
+def format_size(size):
+ # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
+ for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
+ if size < 1024:
+ return f"{size:.2f} {unit}"
+ size /= 1024
+
+
+class StatsDB:
+
+ def __init__(self):
+ self.connection = sqlite3.connect(':memory:')
+ self._init_db()
+
+ def _init_db(self):
+ qry = '''
+ CREATE TABLE IF NOT EXISTS archive_cache (
+ rowid INTEGER PRIMARY KEY,
+ key_file TEXT,
+ key_file_path TEXT,
+ archive_key TEXT,
+ archive_path TEXT,
+ store_time REAL,
+ access_time REAL,
+ access_count INTEGER DEFAULT 0,
+ size INTEGER DEFAULT 0
+ )
+ '''
+
+ self.sql(qry)
+ self.connection.commit()
+
+ @property
+ def sql(self):
+ return self.connection.execute
+
+ def bulk_insert(self, rows):
+ qry = '''
+ INSERT INTO archive_cache (
+ rowid,
+ key_file,
+ key_file_path,
+ archive_key,
+ archive_path,
+ store_time,
+ access_time,
+ access_count,
+ size
+ )
+ VALUES (
+ ?, ?, ?, ?, ?, ?, ?, ?, ?
+ )
+ '''
+ cursor = self.connection.cursor()
+ cursor.executemany(qry, rows)
+ self.connection.commit()
+
+ def get_total_size(self):
+ qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache'
+ ((total_size,),) = self.sql(qry).fetchall()
+ return total_size
+
+ def get_sorted_keys(self, select_policy):
+ select_policy_qry = select_policy.format(fields='key_file, archive_key, size')
+ return self.sql(select_policy_qry).fetchall()
+
+
+class ShardFileReader:
+
+ def __init__(self, file_like_reader):
+ self._file_like_reader = file_like_reader
+
+ def __getattr__(self, item):
+ if isinstance(self._file_like_reader, s3fs.core.S3File):
+ match item:
+ case 'name':
+ # S3 FileWrapper doesn't support name attribute, and we use it
+ return self._file_like_reader.full_name
+ case _:
+ return getattr(self._file_like_reader, item)
+ else:
+ return getattr(self._file_like_reader, item)
+
+ def __repr__(self):
+ return f'<{self.__class__.__name__}={self._file_like_reader}>'
diff --git a/vcsserver/lib/rc_cache/archive_cache/__init__.py b/vcsserver/lib/rc_cache/archive_cache/__init__.py
deleted file mode 100644
--- a/vcsserver/lib/rc_cache/archive_cache/__init__.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# RhodeCode VCSServer provides access to different vcs backends via network.
-# Copyright (C) 2014-2024 RhodeCode GmbH
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software Foundation,
-# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-from .fanout_cache import get_archival_cache_store
-from .fanout_cache import get_archival_config
-
-from .utils import archive_iterator
-from .utils import ArchiveCacheGenerationLock
-
-
-def includeme(config):
- # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode
- return
-
- # init our cache at start
- settings = config.get_settings()
- get_archival_cache_store(settings)
diff --git a/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py b/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py
deleted file mode 100644
--- a/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py
+++ /dev/null
@@ -1,455 +0,0 @@
-# RhodeCode VCSServer provides access to different vcs backends via network.
-# Copyright (C) 2014-2024 RhodeCode GmbH
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software Foundation,
-# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import codecs
-import contextlib
-import functools
-import os
-import logging
-import time
-import typing
-import zlib
-import sqlite3
-
-from ...ext_json import json
-from .lock import GenerationLock
-from .utils import format_size
-
-log = logging.getLogger(__name__)
-
-cache_meta = None
-
-UNKNOWN = -241
-NO_VAL = -917
-
-MODE_BINARY = 'BINARY'
-
-
-EVICTION_POLICY = {
- 'none': {
- 'evict': None,
- },
- 'least-recently-stored': {
- 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
- },
- 'least-recently-used': {
- 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
- },
- 'least-frequently-used': {
- 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
- },
-}
-
-
-class DB:
-
- def __init__(self):
- self.connection = sqlite3.connect(':memory:')
- self._init_db()
-
- def _init_db(self):
- qry = '''
- CREATE TABLE IF NOT EXISTS archive_cache (
- rowid INTEGER PRIMARY KEY,
- key_file TEXT,
- key_file_path TEXT,
- filename TEXT,
- full_path TEXT,
- store_time REAL,
- access_time REAL,
- access_count INTEGER DEFAULT 0,
- size INTEGER DEFAULT 0
- )
- '''
-
- self.sql(qry)
- self.connection.commit()
-
- @property
- def sql(self):
- return self.connection.execute
-
- def bulk_insert(self, rows):
- qry = '''
- INSERT INTO archive_cache (
- rowid,
- key_file,
- key_file_path,
- filename,
- full_path,
- store_time,
- access_time,
- access_count,
- size
- )
- VALUES (
- ?, ?, ?, ?, ?, ?, ?, ?, ?
- )
- '''
- cursor = self.connection.cursor()
- cursor.executemany(qry, rows)
- self.connection.commit()
-
-
-class FileSystemCache:
-
- def __init__(self, index, directory, **settings):
- self._index = index
- self._directory = directory
-
- @property
- def directory(self):
- """Cache directory."""
- return self._directory
-
- def _write_file(self, full_path, iterator, mode, encoding=None):
- full_dir, _ = os.path.split(full_path)
-
- for count in range(1, 11):
- with contextlib.suppress(OSError):
- os.makedirs(full_dir)
-
- try:
- # Another cache may have deleted the directory before
- # the file could be opened.
- writer = open(full_path, mode, encoding=encoding)
- except OSError:
- if count == 10:
- # Give up after 10 tries to open the file.
- raise
- continue
-
- with writer:
- size = 0
- for chunk in iterator:
- size += len(chunk)
- writer.write(chunk)
- writer.flush()
- # Get the file descriptor
- fd = writer.fileno()
-
- # Sync the file descriptor to disk, helps with NFS cases...
- os.fsync(fd)
- log.debug('written new archive cache under %s', full_path)
- return size
-
- def _get_keyfile(self, key):
- return os.path.join(self._directory, f'{key}.key')
-
- def store(self, key, value_reader, metadata):
- filename, full_path = self.random_filename()
- key_file = self._get_keyfile(key)
-
- # STORE METADATA
- _metadata = {
- "version": "v1",
- "filename": filename,
- "full_path": full_path,
- "key_file": key_file,
- "store_time": time.time(),
- "access_count": 1,
- "access_time": 0,
- "size": 0
- }
- if metadata:
- _metadata.update(metadata)
-
- reader = functools.partial(value_reader.read, 2**22)
-
- iterator = iter(reader, b'')
- size = self._write_file(full_path, iterator, 'xb')
- metadata['size'] = size
-
- # after archive is finished, we create a key to save the presence of the binary file
- with open(key_file, 'wb') as f:
- f.write(json.dumps(_metadata))
-
- return key, size, MODE_BINARY, filename, _metadata
-
- def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
-
- if retry:
- for attempt in range(retry_attempts):
- if key in self:
- break
- # we dind't find the key, wait 1s, and re-check
- time.sleep(1)
-
- if key not in self:
- log.exception('requested {key} not found in {self}', key, self)
- raise KeyError(key)
-
- key_file = self._get_keyfile(key)
- with open(key_file, 'rb') as f:
- metadata = json.loads(f.read())
-
- filename = metadata['filename']
-
- try:
- return open(os.path.join(self.directory, filename), 'rb'), metadata
- finally:
- # update usage stats, count and accessed
- metadata["access_count"] = metadata.get("access_count", 0) + 1
- metadata["access_time"] = time.time()
-
- with open(key_file, 'wb') as f:
- f.write(json.dumps(metadata))
-
- def random_filename(self):
- """Return filename and full-path tuple for file storage.
-
- Filename will be a randomly generated 28 character hexadecimal string
- with ".archive_cache" suffixed. Two levels of sub-directories will be used to
- reduce the size of directories. On older filesystems, lookups in
- directories with many files may be slow.
- """
-
- hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
- sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
- name = hex_name[4:] + '.archive_cache'
- filename = os.path.join(sub_dir, name)
- full_path = os.path.join(self.directory, filename)
- return filename, full_path
-
- def hash(self, key):
- """Compute portable hash for `key`.
-
- :param key: key to hash
- :return: hash value
-
- """
- mask = 0xFFFFFFFF
- return zlib.adler32(key.encode('utf-8')) & mask # noqa
-
- def __contains__(self, key):
- """Return `True` if `key` matching item is found in cache.
-
- :param key: key matching item
- :return: True if key matching item
-
- """
- key_file = self._get_keyfile(key)
- return os.path.exists(key_file)
-
- def __repr__(self):
- return f'FileSystemCache(index={self._index}, dir={self.directory})'
-
-
-class FanoutCache:
- """Cache that shards keys and values."""
-
- def __init__(
- self, directory=None, **settings
- ):
- """Initialize cache instance.
-
- :param str directory: cache directory
- :param settings: settings dict
-
- """
- if directory is None:
- raise ValueError('directory cannot be None')
-
- directory = str(directory)
- directory = os.path.expanduser(directory)
- directory = os.path.expandvars(directory)
- self._directory = directory
-
- self._count = settings.pop('cache_shards')
- self._locking_url = settings.pop('locking_url')
-
- self._eviction_policy = settings['cache_eviction_policy']
- self._cache_size_limit = settings['cache_size_limit']
-
- self._shards = tuple(
- FileSystemCache(
- index=num,
- directory=os.path.join(directory, 'shard_%03d' % num),
- **settings,
- )
- for num in range(self._count)
- )
- self._hash = self._shards[0].hash
-
- @property
- def directory(self):
- """Cache directory."""
- return self._directory
-
- def get_lock(self, lock_key):
- return GenerationLock(lock_key, self._locking_url)
-
- def _get_shard(self, key) -> FileSystemCache:
- index = self._hash(key) % self._count
- shard = self._shards[index]
- return shard
-
- def store(self, key, value_reader, metadata=None):
- shard = self._get_shard(key)
- return shard.store(key, value_reader, metadata)
-
- def fetch(self, key, retry=False, retry_attempts=10):
- """Return file handle corresponding to `key` from cache.
- """
- shard = self._get_shard(key)
- return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
-
- def has_key(self, key):
- """Return `True` if `key` matching item is found in cache.
-
- :param key: key for item
- :return: True if key is found
-
- """
- shard = self._get_shard(key)
- return key in shard
-
- def __contains__(self, item):
- return self.has_key(item)
-
- def evict(self, policy=None, size_limit=None):
- """
- Remove old items based on the conditions
-
-
- explanation of this algo:
- iterate over each shard, then for each shard iterate over the .key files
- read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
- access data, time creation, and access counts.
-
- Store that into a memory DB so we can run different sorting strategies easily.
- Summing the size is a sum sql query.
-
- Then we run a sorting strategy based on eviction policy.
- We iterate over sorted keys, and remove each checking if we hit the overall limit.
- """
-
- policy = policy or self._eviction_policy
- size_limit = size_limit or self._cache_size_limit
-
- select_policy = EVICTION_POLICY[policy]['evict']
-
- log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
- policy, format_size(size_limit))
-
- if select_policy is None:
- return 0
-
- db = DB()
-
- data = []
- cnt = 1
- for shard in self._shards:
- for key_file in os.listdir(shard.directory):
- if key_file.endswith('.key'):
- key_file_path = os.path.join(shard.directory, key_file)
- with open(key_file_path, 'rb') as f:
- metadata = json.loads(f.read())
-
- size = metadata.get('size')
- filename = metadata.get('filename')
- full_path = metadata.get('full_path')
-
- if not size:
- # in case we don't have size re-calc it...
- size = os.stat(full_path).st_size
-
- data.append([
- cnt,
- key_file,
- key_file_path,
- filename,
- full_path,
- metadata.get('store_time', 0),
- metadata.get('access_time', 0),
- metadata.get('access_count', 0),
- size,
- ])
- cnt += 1
-
- # Insert bulk data using executemany
- db.bulk_insert(data)
-
- ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
- log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
- select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
- sorted_keys = db.sql(select_policy_qry).fetchall()
-
- removed_items = 0
- removed_size = 0
- for key, cached_file, size in sorted_keys:
- # simulate removal impact BEFORE removal
- total_size -= size
-
- if total_size <= size_limit:
- # we obtained what we wanted...
- break
-
- os.remove(cached_file)
- os.remove(key)
- removed_items += 1
- removed_size += size
-
- log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
- return removed_items
-
-
-def get_archival_config(config):
-
- final_config = {
-
- }
-
- for k, v in config.items():
- if k.startswith('archive_cache'):
- final_config[k] = v
-
- return final_config
-
-
-def get_archival_cache_store(config):
-
- global cache_meta
- if cache_meta is not None:
- return cache_meta
-
- config = get_archival_config(config)
- backend = config['archive_cache.backend.type']
- if backend != 'filesystem':
- raise ValueError('archive_cache.backend.type only supports "filesystem"')
-
- archive_cache_locking_url = config['archive_cache.locking.url']
- archive_cache_dir = config['archive_cache.filesystem.store_dir']
- archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
- archive_cache_shards = config['archive_cache.filesystem.cache_shards']
- archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
-
- log.debug('Initializing archival cache instance under %s', archive_cache_dir)
-
- # check if it's ok to write, and re-create the archive cache
- if not os.path.isdir(archive_cache_dir):
- os.makedirs(archive_cache_dir, exist_ok=True)
-
- d_cache = FanoutCache(
- archive_cache_dir,
- locking_url=archive_cache_locking_url,
- cache_shards=archive_cache_shards,
- cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
- cache_eviction_policy=archive_cache_eviction_policy
- )
- cache_meta = d_cache
- return cache_meta
diff --git a/vcsserver/lib/rc_cache/archive_cache/lock.py b/vcsserver/lib/rc_cache/archive_cache/lock.py
deleted file mode 100644
--- a/vcsserver/lib/rc_cache/archive_cache/lock.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# RhodeCode VCSServer provides access to different vcs backends via network.
-# Copyright (C) 2014-2024 RhodeCode GmbH
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software Foundation,
-# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import redis
-from ..._vendor import redis_lock
-from .utils import ArchiveCacheGenerationLock
-
-
-class GenerationLock:
- """
- Locking mechanism that detects if a lock is acquired
-
- with GenerationLock(lock_key):
- compute_archive()
- """
- lock_timeout = 7200
-
- def __init__(self, lock_key, url):
- self.lock_key = lock_key
- self._create_client(url)
- self.lock = self.get_lock()
-
- def _create_client(self, url):
- connection_pool = redis.ConnectionPool.from_url(url)
- self.writer_client = redis.StrictRedis(
- connection_pool=connection_pool
- )
- self.reader_client = self.writer_client
-
- def get_lock(self):
- return redis_lock.Lock(
- redis_client=self.writer_client,
- name=self.lock_key,
- expire=self.lock_timeout,
- strict=True
- )
-
- def __enter__(self):
- acquired = self.lock.acquire(blocking=False)
- if not acquired:
- raise ArchiveCacheGenerationLock('Failed to create a lock')
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.lock.release()
diff --git a/vcsserver/lib/rc_cache/archive_cache/utils.py b/vcsserver/lib/rc_cache/archive_cache/utils.py
deleted file mode 100644
--- a/vcsserver/lib/rc_cache/archive_cache/utils.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# RhodeCode VCSServer provides access to different vcs backends via network.
-# Copyright (C) 2014-2024 RhodeCode GmbH
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software Foundation,
-# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import os
-
-
-class ArchiveCacheGenerationLock(Exception):
- pass
-
-
-def archive_iterator(_reader, block_size: int = 4096 * 512):
- # 4096 * 64 = 64KB
- while 1:
- data = _reader.read(block_size)
- if not data:
- break
- yield data
-
-
-def get_directory_statistics(start_path):
- """
- total_files, total_size, directory_stats = get_directory_statistics(start_path)
-
- print(f"Directory statistics for: {start_path}\n")
- print(f"Total files: {total_files}")
- print(f"Total size: {format_size(total_size)}\n")
-
- :param start_path:
- :return:
- """
-
- total_files = 0
- total_size = 0
- directory_stats = {}
-
- for dir_path, dir_names, file_names in os.walk(start_path):
- dir_size = 0
- file_count = len(file_names)
-
- for file in file_names:
- filepath = os.path.join(dir_path, file)
- file_size = os.path.getsize(filepath)
- dir_size += file_size
-
- directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
- total_files += file_count
- total_size += dir_size
-
- return total_files, total_size, directory_stats
-
-
-def format_size(size):
- # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
- for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
- if size < 1024:
- return f"{size:.2f} {unit}"
- size /= 1024