|
|
# Copyright (C) 2015-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import codecs
|
|
|
import contextlib
|
|
|
import functools
|
|
|
import os
|
|
|
import logging
|
|
|
import time
|
|
|
import typing
|
|
|
import zlib
|
|
|
|
|
|
from rhodecode.lib.ext_json import json
|
|
|
from .lock import GenerationLock
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
cache_meta = None
|
|
|
|
|
|
UNKNOWN = -241
|
|
|
NO_VAL = -917
|
|
|
|
|
|
MODE_BINARY = 'BINARY'
|
|
|
|
|
|
|
|
|
class FileSystemCache:
|
|
|
|
|
|
def __init__(self, index, directory, **settings):
|
|
|
self._index = index
|
|
|
self._directory = directory
|
|
|
|
|
|
def _write_file(self, full_path, iterator, mode, encoding=None):
|
|
|
full_dir, _ = os.path.split(full_path)
|
|
|
|
|
|
for count in range(1, 11):
|
|
|
with contextlib.suppress(OSError):
|
|
|
os.makedirs(full_dir)
|
|
|
|
|
|
try:
|
|
|
# Another cache may have deleted the directory before
|
|
|
# the file could be opened.
|
|
|
writer = open(full_path, mode, encoding=encoding)
|
|
|
except OSError:
|
|
|
if count == 10:
|
|
|
# Give up after 10 tries to open the file.
|
|
|
raise
|
|
|
continue
|
|
|
|
|
|
with writer:
|
|
|
size = 0
|
|
|
for chunk in iterator:
|
|
|
size += len(chunk)
|
|
|
writer.write(chunk)
|
|
|
return size
|
|
|
|
|
|
def _get_keyfile(self, key):
|
|
|
return os.path.join(self._directory, f'{key}.key')
|
|
|
|
|
|
def store(self, key, value_reader, metadata):
|
|
|
filename, full_path = self.random_filename()
|
|
|
key_file = self._get_keyfile(key)
|
|
|
|
|
|
# STORE METADATA
|
|
|
_metadata = {
|
|
|
"version": "v1",
|
|
|
"timestamp": time.time(),
|
|
|
"filename": filename,
|
|
|
"full_path": full_path,
|
|
|
"key_file": key_file,
|
|
|
}
|
|
|
if metadata:
|
|
|
_metadata.update(metadata)
|
|
|
|
|
|
reader = functools.partial(value_reader.read, 2**22)
|
|
|
|
|
|
iterator = iter(reader, b'')
|
|
|
size = self._write_file(full_path, iterator, 'xb')
|
|
|
|
|
|
# after archive is finished, we create a key to save the presence of the binary file
|
|
|
with open(key_file, 'wb') as f:
|
|
|
f.write(json.dumps(_metadata))
|
|
|
|
|
|
return key, size, MODE_BINARY, filename, _metadata
|
|
|
|
|
|
def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
|
|
|
if key not in self:
|
|
|
raise KeyError(key)
|
|
|
|
|
|
key_file = self._get_keyfile(key)
|
|
|
with open(key_file, 'rb') as f:
|
|
|
metadata = json.loads(f.read())
|
|
|
|
|
|
filename = metadata['filename']
|
|
|
|
|
|
return open(os.path.join(self._directory, filename), 'rb'), metadata
|
|
|
|
|
|
def random_filename(self):
|
|
|
"""Return filename and full-path tuple for file storage.
|
|
|
|
|
|
Filename will be a randomly generated 28 character hexadecimal string
|
|
|
with ".archive_cache" suffixed. Two levels of sub-directories will be used to
|
|
|
reduce the size of directories. On older filesystems, lookups in
|
|
|
directories with many files may be slow.
|
|
|
"""
|
|
|
|
|
|
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
|
|
|
sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
|
|
|
name = hex_name[4:] + '.archive_cache'
|
|
|
filename = os.path.join(sub_dir, name)
|
|
|
full_path = os.path.join(self._directory, filename)
|
|
|
return filename, full_path
|
|
|
|
|
|
def hash(self, key):
|
|
|
"""Compute portable hash for `key`.
|
|
|
|
|
|
:param key: key to hash
|
|
|
:return: hash value
|
|
|
|
|
|
"""
|
|
|
mask = 0xFFFFFFFF
|
|
|
return zlib.adler32(key.encode('utf-8')) & mask # noqa
|
|
|
|
|
|
def __contains__(self, key):
|
|
|
"""Return `True` if `key` matching item is found in cache.
|
|
|
|
|
|
:param key: key matching item
|
|
|
:return: True if key matching item
|
|
|
|
|
|
"""
|
|
|
key_file = self._get_keyfile(key)
|
|
|
return os.path.exists(key_file)
|
|
|
|
|
|
|
|
|
class FanoutCache:
|
|
|
"""Cache that shards keys and values."""
|
|
|
|
|
|
def __init__(
|
|
|
self, directory=None, **settings
|
|
|
):
|
|
|
"""Initialize cache instance.
|
|
|
|
|
|
:param str directory: cache directory
|
|
|
:param settings: settings dict
|
|
|
|
|
|
"""
|
|
|
if directory is None:
|
|
|
raise ValueError('directory cannot be None')
|
|
|
|
|
|
directory = str(directory)
|
|
|
directory = os.path.expanduser(directory)
|
|
|
directory = os.path.expandvars(directory)
|
|
|
self._directory = directory
|
|
|
|
|
|
self._count = settings.pop('cache_shards')
|
|
|
self._locking_url = settings.pop('locking_url')
|
|
|
|
|
|
self._shards = tuple(
|
|
|
FileSystemCache(
|
|
|
index=num,
|
|
|
directory=os.path.join(directory, 'shard_%03d' % num),
|
|
|
**settings,
|
|
|
)
|
|
|
for num in range(self._count)
|
|
|
)
|
|
|
self._hash = self._shards[0].hash
|
|
|
|
|
|
def get_lock(self, lock_key):
|
|
|
return GenerationLock(lock_key, self._locking_url)
|
|
|
|
|
|
def _get_shard(self, key) -> FileSystemCache:
|
|
|
index = self._hash(key) % self._count
|
|
|
shard = self._shards[index]
|
|
|
return shard
|
|
|
|
|
|
def store(self, key, value_reader, metadata=None):
|
|
|
shard = self._get_shard(key)
|
|
|
return shard.store(key, value_reader, metadata)
|
|
|
|
|
|
def fetch(self, key):
|
|
|
"""Return file handle corresponding to `key` from cache.
|
|
|
"""
|
|
|
shard = self._get_shard(key)
|
|
|
return shard.fetch(key)
|
|
|
|
|
|
def has_key(self, key):
|
|
|
"""Return `True` if `key` matching item is found in cache.
|
|
|
|
|
|
:param key: key for item
|
|
|
:return: True if key is found
|
|
|
|
|
|
"""
|
|
|
shard = self._get_shard(key)
|
|
|
return key in shard
|
|
|
|
|
|
def __contains__(self, item):
|
|
|
return self.has_key(item)
|
|
|
|
|
|
def evict(self):
|
|
|
"""Remove old items based on the conditions"""
|
|
|
# TODO: Implement this...
|
|
|
return
|
|
|
|
|
|
|
|
|
def get_archival_config(config):
|
|
|
|
|
|
final_config = {
|
|
|
|
|
|
}
|
|
|
|
|
|
for k, v in config.items():
|
|
|
if k.startswith('archive_cache'):
|
|
|
final_config[k] = v
|
|
|
|
|
|
return final_config
|
|
|
|
|
|
|
|
|
def get_archival_cache_store(config):
|
|
|
|
|
|
global cache_meta
|
|
|
if cache_meta is not None:
|
|
|
return cache_meta
|
|
|
|
|
|
config = get_archival_config(config)
|
|
|
backend = config['archive_cache.backend.type']
|
|
|
if backend != 'filesystem':
|
|
|
raise ValueError('archive_cache.backend.type only supports "filesystem"')
|
|
|
|
|
|
archive_cache_locking_url = config['archive_cache.locking.url']
|
|
|
archive_cache_dir = config['archive_cache.filesystem.store_dir']
|
|
|
archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
|
|
|
archive_cache_shards = config['archive_cache.filesystem.cache_shards']
|
|
|
archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
|
|
|
|
|
|
log.debug('Initializing archival cache instance under %s', archive_cache_dir)
|
|
|
|
|
|
# check if it's ok to write, and re-create the archive cache
|
|
|
if not os.path.isdir(archive_cache_dir):
|
|
|
os.makedirs(archive_cache_dir, exist_ok=True)
|
|
|
|
|
|
d_cache = FanoutCache(
|
|
|
archive_cache_dir,
|
|
|
locking_url=archive_cache_locking_url,
|
|
|
cache_shards=archive_cache_shards,
|
|
|
cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
|
|
|
cache_eviction_policy=archive_cache_eviction_policy
|
|
|
)
|
|
|
cache_meta = d_cache
|
|
|
return cache_meta
|
|
|
|
|
|
|