##// END OF EJS Templates
feat(disk-cache): rewrite diskcache backend to be k8s and NFS safe....
feat(disk-cache): rewrite diskcache backend to be k8s and NFS safe. - no longer depend on diskcache - use simpler version with Redis lock, and filesystem storage - fixes RCCE-78

File last commit:

r5420:9cce0276 default
r5420:9cce0276 default
Show More
fanout_cache.py
264 lines | 7.9 KiB | text/x-python | PythonLexer
# Copyright (C) 2015-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import codecs
import contextlib
import functools
import os
import logging
import time
import typing
import zlib
from rhodecode.lib.ext_json import json
from .lock import GenerationLock
log = logging.getLogger(__name__)
cache_meta = None
UNKNOWN = -241
NO_VAL = -917
MODE_BINARY = 'BINARY'
class FileSystemCache:
def __init__(self, index, directory, **settings):
self._index = index
self._directory = directory
def _write_file(self, full_path, iterator, mode, encoding=None):
full_dir, _ = os.path.split(full_path)
for count in range(1, 11):
with contextlib.suppress(OSError):
os.makedirs(full_dir)
try:
# Another cache may have deleted the directory before
# the file could be opened.
writer = open(full_path, mode, encoding=encoding)
except OSError:
if count == 10:
# Give up after 10 tries to open the file.
raise
continue
with writer:
size = 0
for chunk in iterator:
size += len(chunk)
writer.write(chunk)
return size
def _get_keyfile(self, key):
return os.path.join(self._directory, f'{key}.key')
def store(self, key, value_reader, metadata):
filename, full_path = self.random_filename()
key_file = self._get_keyfile(key)
# STORE METADATA
_metadata = {
"version": "v1",
"timestamp": time.time(),
"filename": filename,
"full_path": full_path,
"key_file": key_file,
}
if metadata:
_metadata.update(metadata)
reader = functools.partial(value_reader.read, 2**22)
iterator = iter(reader, b'')
size = self._write_file(full_path, iterator, 'xb')
# after archive is finished, we create a key to save the presence of the binary file
with open(key_file, 'wb') as f:
f.write(json.dumps(_metadata))
return key, size, MODE_BINARY, filename, _metadata
def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
if key not in self:
raise KeyError(key)
key_file = self._get_keyfile(key)
with open(key_file, 'rb') as f:
metadata = json.loads(f.read())
filename = metadata['filename']
return open(os.path.join(self._directory, filename), 'rb'), metadata
def random_filename(self):
"""Return filename and full-path tuple for file storage.
Filename will be a randomly generated 28 character hexadecimal string
with ".archive_cache" suffixed. Two levels of sub-directories will be used to
reduce the size of directories. On older filesystems, lookups in
directories with many files may be slow.
"""
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
name = hex_name[4:] + '.archive_cache'
filename = os.path.join(sub_dir, name)
full_path = os.path.join(self._directory, filename)
return filename, full_path
def hash(self, key):
"""Compute portable hash for `key`.
:param key: key to hash
:return: hash value
"""
mask = 0xFFFFFFFF
return zlib.adler32(key.encode('utf-8')) & mask # noqa
def __contains__(self, key):
"""Return `True` if `key` matching item is found in cache.
:param key: key matching item
:return: True if key matching item
"""
key_file = self._get_keyfile(key)
return os.path.exists(key_file)
class FanoutCache:
"""Cache that shards keys and values."""
def __init__(
self, directory=None, **settings
):
"""Initialize cache instance.
:param str directory: cache directory
:param settings: settings dict
"""
if directory is None:
raise ValueError('directory cannot be None')
directory = str(directory)
directory = os.path.expanduser(directory)
directory = os.path.expandvars(directory)
self._directory = directory
self._count = settings.pop('cache_shards')
self._locking_url = settings.pop('locking_url')
self._shards = tuple(
FileSystemCache(
index=num,
directory=os.path.join(directory, 'shard_%03d' % num),
**settings,
)
for num in range(self._count)
)
self._hash = self._shards[0].hash
def get_lock(self, lock_key):
return GenerationLock(lock_key, self._locking_url)
def _get_shard(self, key) -> FileSystemCache:
index = self._hash(key) % self._count
shard = self._shards[index]
return shard
def store(self, key, value_reader, metadata=None):
shard = self._get_shard(key)
return shard.store(key, value_reader, metadata)
def fetch(self, key):
"""Return file handle corresponding to `key` from cache.
"""
shard = self._get_shard(key)
return shard.fetch(key)
def has_key(self, key):
"""Return `True` if `key` matching item is found in cache.
:param key: key for item
:return: True if key is found
"""
shard = self._get_shard(key)
return key in shard
def __contains__(self, item):
return self.has_key(item)
def evict(self):
"""Remove old items based on the conditions"""
# TODO: Implement this...
return
def get_archival_config(config):
final_config = {
}
for k, v in config.items():
if k.startswith('archive_cache'):
final_config[k] = v
return final_config
def get_archival_cache_store(config):
global cache_meta
if cache_meta is not None:
return cache_meta
config = get_archival_config(config)
backend = config['archive_cache.backend.type']
if backend != 'filesystem':
raise ValueError('archive_cache.backend.type only supports "filesystem"')
archive_cache_locking_url = config['archive_cache.locking.url']
archive_cache_dir = config['archive_cache.filesystem.store_dir']
archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
archive_cache_shards = config['archive_cache.filesystem.cache_shards']
archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
log.debug('Initializing archival cache instance under %s', archive_cache_dir)
# check if it's ok to write, and re-create the archive cache
if not os.path.isdir(archive_cache_dir):
os.makedirs(archive_cache_dir, exist_ok=True)
d_cache = FanoutCache(
archive_cache_dir,
locking_url=archive_cache_locking_url,
cache_shards=archive_cache_shards,
cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
cache_eviction_policy=archive_cache_eviction_policy
)
cache_meta = d_cache
return cache_meta