diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -23,7 +23,6 @@ celery==5.3.6 tzdata==2024.1 vine==5.1.0 contextlib2==21.6.0 -diskcache==5.6.3 dogpile.cache==1.3.3 decorator==5.1.1 stevedore==5.1.0 diff --git a/vcsserver/base.py b/vcsserver/base.py --- a/vcsserver/base.py +++ b/vcsserver/base.py @@ -120,9 +120,8 @@ def store_archive_in_cache(node_walker, d_cache = get_archival_cache_store(config=cache_config) if archive_key in d_cache: - with d_cache as d_cache_reader: - reader, tag = d_cache_reader.get(archive_key, read=True, tag=True, retry=True) - return reader.name + reader, metadata = d_cache.fetch(archive_key) + return reader.name archive_tmp_path = safe_bytes(tempfile.mkstemp()[1]) log.debug('Creating new temp archive in %s', archive_tmp_path) @@ -139,6 +138,7 @@ def store_archive_in_cache(node_walker, for f in node_walker(commit_id, archive_at_path): f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/')) + try: archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes()) except NoContentException: @@ -146,34 +146,28 @@ def store_archive_in_cache(node_walker, # directories which are not supported by archiver archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'') + metadata = dict([ + ('commit_id', commit_id), + ('mtime', mtime), + ]) + metadata.update(extra_metadata) if write_metadata: - metadata = dict([ - ('commit_id', commit_id), - ('mtime', mtime), - ]) - metadata.update(extra_metadata) - meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()] f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt') archiver.addfile(f_path, 0o644, False, b'\n'.join(meta)) archiver.done() - # ensure set & get are atomic - with d_cache.transact(): - - with open(archive_tmp_path, 'rb') as archive_file: - add_result = d_cache.set(archive_key, archive_file, read=True, tag='db-name', retry=True) - if not add_result: - log.error('Failed to store cache for key=%s', archive_key) + with open(archive_tmp_path, 'rb') as archive_file: + add_result = d_cache.store(archive_key, archive_file, metadata=metadata) + if not add_result: + log.error('Failed to store cache for key=%s', archive_key) - os.remove(archive_tmp_path) + os.remove(archive_tmp_path) - reader, tag = d_cache.get(archive_key, read=True, tag=True, retry=True) - if not reader: - raise AssertionError(f'empty reader on key={archive_key} added={add_result}') + reader, metadata = d_cache.fetch(archive_key) - return reader.name + return reader.name class BinaryEnvelope: diff --git a/vcsserver/lib/rc_cache/archive_cache.py b/vcsserver/lib/rc_cache/archive_cache.py deleted file mode 100644 --- a/vcsserver/lib/rc_cache/archive_cache.py +++ /dev/null @@ -1,87 +0,0 @@ -# RhodeCode VCSServer provides access to different vcs backends via network. -# Copyright (C) 2014-2023 RhodeCode GmbH -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -import logging -import os -import diskcache -from diskcache import RLock - -log = logging.getLogger(__name__) - -cache_meta = None - - -class ReentrantLock(RLock): - def __enter__(self): - reentrant_lock_key = self._key - - log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key) - #self.acquire() - log.debug('Lock for key=%s acquired', reentrant_lock_key) - - def __exit__(self, *exc_info): - #self.release() - pass - - -def get_archival_config(config): - - final_config = { - 'archive_cache.eviction_policy': 'least-frequently-used' - } - - for k, v in config.items(): - if k.startswith('archive_cache'): - final_config[k] = v - - return final_config - - -def get_archival_cache_store(config): - - global cache_meta - if cache_meta is not None: - return cache_meta - - config = get_archival_config(config) - - archive_cache_dir = config['archive_cache.store_dir'] - archive_cache_size_gb = config['archive_cache.cache_size_gb'] - archive_cache_shards = config['archive_cache.cache_shards'] - archive_cache_eviction_policy = config['archive_cache.eviction_policy'] - - log.debug('Initializing archival cache instance under %s', archive_cache_dir) - - # check if it's ok to write, and re-create the archive cache - if not os.path.isdir(archive_cache_dir): - os.makedirs(archive_cache_dir, exist_ok=True) - - d_cache = diskcache.FanoutCache( - archive_cache_dir, shards=archive_cache_shards, - cull_limit=0, # manual eviction required - size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, - eviction_policy=archive_cache_eviction_policy, - timeout=30 - ) - cache_meta = d_cache - return cache_meta - - -def includeme(config): - # init our cache at start, for vcsserver we don't init at runtime - # because our cache config is sent via wire on make archive call, this call just lazy-enables the client - return diff --git a/vcsserver/lib/rc_cache/archive_cache/__init__.py b/vcsserver/lib/rc_cache/archive_cache/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/rc_cache/archive_cache/__init__.py @@ -0,0 +1,31 @@ +# RhodeCode VCSServer provides access to different vcs backends via network. +# Copyright (C) 2014-2024 RhodeCode GmbH +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +from .fanout_cache import get_archival_cache_store +from .fanout_cache import get_archival_config + +from .utils import archive_iterator +from .utils import ArchiveCacheLock + + +def includeme(config): + # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode + return + + # init our cache at start + settings = config.get_settings() + get_archival_cache_store(settings) diff --git a/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py b/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py @@ -0,0 +1,258 @@ +# RhodeCode VCSServer provides access to different vcs backends via network. +# Copyright (C) 2014-2024 RhodeCode GmbH +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import codecs +import contextlib +import functools +import os +import logging +import time +import typing +import zlib + +from vcsserver.lib.rc_json import json +from .lock import GenerationLock + +log = logging.getLogger(__name__) + +cache_meta = None + +UNKNOWN = -241 +NO_VAL = -917 + +MODE_BINARY = 'BINARY' + + +class FileSystemCache: + + def __init__(self, index, directory, **settings): + self._index = index + self._directory = directory + + def _write_file(self, full_path, iterator, mode, encoding=None): + full_dir, _ = os.path.split(full_path) + + for count in range(1, 11): + with contextlib.suppress(OSError): + os.makedirs(full_dir) + + try: + # Another cache may have deleted the directory before + # the file could be opened. + writer = open(full_path, mode, encoding=encoding) + except OSError: + if count == 10: + # Give up after 10 tries to open the file. + raise + continue + + with writer: + size = 0 + for chunk in iterator: + size += len(chunk) + writer.write(chunk) + return size + + def _get_keyfile(self, key): + return os.path.join(self._directory, f'{key}.key') + + def store(self, key, value_reader, metadata): + filename, full_path = self.random_filename() + key_file = self._get_keyfile(key) + + # STORE METADATA + _metadata = { + "version": "v1", + "timestamp": time.time(), + "filename": filename, + "full_path": full_path, + "key_file": key_file, + } + if metadata: + _metadata.update(metadata) + + reader = functools.partial(value_reader.read, 2**22) + + iterator = iter(reader, b'') + size = self._write_file(full_path, iterator, 'xb') + + # after archive is finished, we create a key to save the presence of the binary file + with open(key_file, 'wb') as f: + f.write(json.dumps(_metadata)) + + return key, size, MODE_BINARY, filename, _metadata + + def fetch(self, key) -> tuple[typing.BinaryIO, dict]: + if key not in self: + raise KeyError(key) + + key_file = self._get_keyfile(key) + with open(key_file, 'rb') as f: + metadata = json.loads(f.read()) + + filename = metadata['filename'] + + return open(os.path.join(self._directory, filename), 'rb'), metadata + + def random_filename(self): + """Return filename and full-path tuple for file storage. + + Filename will be a randomly generated 28 character hexadecimal string + with ".archive_cache" suffixed. Two levels of sub-directories will be used to + reduce the size of directories. On older filesystems, lookups in + directories with many files may be slow. + """ + + hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') + sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) + name = hex_name[4:] + '.archive_cache' + filename = os.path.join(sub_dir, name) + full_path = os.path.join(self._directory, filename) + return filename, full_path + + def hash(self, key): + """Compute portable hash for `key`. + + :param key: key to hash + :return: hash value + + """ + mask = 0xFFFFFFFF + return zlib.adler32(key.encode('utf-8')) & mask # noqa + + def __contains__(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key matching item + :return: True if key matching item + + """ + key_file = self._get_keyfile(key) + return os.path.exists(key_file) + + +class FanoutCache: + """Cache that shards keys and values.""" + + def __init__( + self, directory=None, **settings + ): + """Initialize cache instance. + + :param str directory: cache directory + :param settings: settings dict + + """ + if directory is None: + raise ValueError('directory cannot be None') + + directory = str(directory) + directory = os.path.expanduser(directory) + directory = os.path.expandvars(directory) + self._directory = directory + + self._count = settings.pop('cache_shards') + self._locking_url = settings.pop('locking_url') + + self._shards = tuple( + FileSystemCache( + index=num, + directory=os.path.join(directory, 'shard_%03d' % num), + **settings, + ) + for num in range(self._count) + ) + self._hash = self._shards[0].hash + + def get_lock(self, lock_key): + return GenerationLock(lock_key, self._locking_url) + + def _get_shard(self, key) -> FileSystemCache: + index = self._hash(key) % self._count + shard = self._shards[index] + return shard + + def store(self, key, value_reader, metadata=None): + shard = self._get_shard(key) + return shard.store(key, value_reader, metadata) + + def fetch(self, key): + """Return file handle corresponding to `key` from cache. + """ + shard = self._get_shard(key) + return shard.fetch(key) + + def has_key(self, key): + """Return `True` if `key` matching item is found in cache. + + :param key: key for item + :return: True if key is found + + """ + shard = self._get_shard(key) + return key in shard + + def __contains__(self, item): + return self.has_key(item) + + +def get_archival_config(config): + + final_config = { + + } + + for k, v in config.items(): + if k.startswith('archive_cache'): + final_config[k] = v + + return final_config + + +def get_archival_cache_store(config): + + global cache_meta + if cache_meta is not None: + return cache_meta + + config = get_archival_config(config) + backend = config['archive_cache.backend.type'] + if backend != 'filesystem': + raise ValueError('archive_cache.backend.type only supports "filesystem"') + + archive_cache_locking_url = config['archive_cache.locking.url'] + archive_cache_dir = config['archive_cache.filesystem.store_dir'] + archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] + archive_cache_shards = config['archive_cache.filesystem.cache_shards'] + archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] + + log.debug('Initializing archival cache instance under %s', archive_cache_dir) + + # check if it's ok to write, and re-create the archive cache + if not os.path.isdir(archive_cache_dir): + os.makedirs(archive_cache_dir, exist_ok=True) + + d_cache = FanoutCache( + archive_cache_dir, + locking_url=archive_cache_locking_url, + cache_shards=archive_cache_shards, + cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, + cache_eviction_policy=archive_cache_eviction_policy + ) + cache_meta = d_cache + return cache_meta + diff --git a/vcsserver/lib/rc_cache/archive_cache/lock.py b/vcsserver/lib/rc_cache/archive_cache/lock.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/rc_cache/archive_cache/lock.py @@ -0,0 +1,59 @@ +# RhodeCode VCSServer provides access to different vcs backends via network. +# Copyright (C) 2014-2024 RhodeCode GmbH +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import redis +from vcsserver.lib._vendor import redis_lock + +from .utils import ArchiveCacheLock + + +class GenerationLock: + """ + Locking mechanism that detects if a lock is acquired + + with GenerationLock(lock_key): + compute_archive() + """ + lock_timeout = 7200 + + def __init__(self, lock_key, url): + self.lock_key = lock_key + self._create_client(url) + self.lock = self.get_lock() + + def _create_client(self, url): + connection_pool = redis.ConnectionPool.from_url(url) + self.writer_client = redis.StrictRedis( + connection_pool=connection_pool + ) + self.reader_client = self.writer_client + + def get_lock(self): + return redis_lock.Lock( + redis_client=self.writer_client, + name=self.lock_key, + expire=self.lock_timeout, + strict=True + ) + + def __enter__(self): + acquired = self.lock.acquire(blocking=False) + if not acquired: + raise ArchiveCacheLock('Failed to create a lock') + + def __exit__(self, exc_type, exc_val, exc_tb): + self.lock.release() diff --git a/vcsserver/lib/rc_cache/archive_cache/utils.py b/vcsserver/lib/rc_cache/archive_cache/utils.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/rc_cache/archive_cache/utils.py @@ -0,0 +1,29 @@ +# RhodeCode VCSServer provides access to different vcs backends via network. +# Copyright (C) 2014-2024 RhodeCode GmbH +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + + +class ArchiveCacheLock(Exception): + pass + + +def archive_iterator(_reader, block_size: int = 4096 * 512): + # 4096 * 64 = 64KB + while 1: + data = _reader.read(block_size) + if not data: + break + yield data