fanout_cache.py
456 lines
| 14.0 KiB
| text/x-python
|
PythonLexer
r5420 | # Copyright (C) 2015-2024 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import codecs | ||||
import contextlib | ||||
import functools | ||||
import os | ||||
import logging | ||||
import time | ||||
import typing | ||||
import zlib | ||||
r5423 | import sqlite3 | |||
r5420 | ||||
r5424 | from ...ext_json import json | |||
r5420 | from .lock import GenerationLock | |||
r5424 | from .utils import format_size | |||
r5420 | ||||
log = logging.getLogger(__name__) | ||||
cache_meta = None | ||||
UNKNOWN = -241 | ||||
NO_VAL = -917 | ||||
MODE_BINARY = 'BINARY' | ||||
r5423 | EVICTION_POLICY = { | |||
'none': { | ||||
'evict': None, | ||||
}, | ||||
'least-recently-stored': { | ||||
'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', | ||||
}, | ||||
'least-recently-used': { | ||||
'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', | ||||
}, | ||||
'least-frequently-used': { | ||||
'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', | ||||
}, | ||||
} | ||||
class DB: | ||||
def __init__(self): | ||||
self.connection = sqlite3.connect(':memory:') | ||||
self._init_db() | ||||
def _init_db(self): | ||||
qry = ''' | ||||
CREATE TABLE IF NOT EXISTS archive_cache ( | ||||
rowid INTEGER PRIMARY KEY, | ||||
key_file TEXT, | ||||
key_file_path TEXT, | ||||
filename TEXT, | ||||
full_path TEXT, | ||||
store_time REAL, | ||||
access_time REAL, | ||||
access_count INTEGER DEFAULT 0, | ||||
size INTEGER DEFAULT 0 | ||||
) | ||||
''' | ||||
self.sql(qry) | ||||
self.connection.commit() | ||||
@property | ||||
def sql(self): | ||||
return self.connection.execute | ||||
def bulk_insert(self, rows): | ||||
qry = ''' | ||||
INSERT INTO archive_cache ( | ||||
rowid, | ||||
key_file, | ||||
key_file_path, | ||||
filename, | ||||
full_path, | ||||
store_time, | ||||
access_time, | ||||
access_count, | ||||
size | ||||
) | ||||
VALUES ( | ||||
?, ?, ?, ?, ?, ?, ?, ?, ? | ||||
) | ||||
''' | ||||
cursor = self.connection.cursor() | ||||
cursor.executemany(qry, rows) | ||||
self.connection.commit() | ||||
r5420 | class FileSystemCache: | |||
def __init__(self, index, directory, **settings): | ||||
self._index = index | ||||
self._directory = directory | ||||
r5426 | @property | |||
def directory(self): | ||||
"""Cache directory.""" | ||||
return self._directory | ||||
r5420 | def _write_file(self, full_path, iterator, mode, encoding=None): | |||
full_dir, _ = os.path.split(full_path) | ||||
for count in range(1, 11): | ||||
with contextlib.suppress(OSError): | ||||
os.makedirs(full_dir) | ||||
try: | ||||
# Another cache may have deleted the directory before | ||||
# the file could be opened. | ||||
writer = open(full_path, mode, encoding=encoding) | ||||
except OSError: | ||||
if count == 10: | ||||
# Give up after 10 tries to open the file. | ||||
raise | ||||
continue | ||||
with writer: | ||||
size = 0 | ||||
for chunk in iterator: | ||||
size += len(chunk) | ||||
writer.write(chunk) | ||||
r5427 | writer.flush() | |||
# Get the file descriptor | ||||
fd = writer.fileno() | ||||
# Sync the file descriptor to disk, helps with NFS cases... | ||||
os.fsync(fd) | ||||
log.debug('written new archive cache under %s', full_path) | ||||
r5420 | return size | |||
def _get_keyfile(self, key): | ||||
return os.path.join(self._directory, f'{key}.key') | ||||
def store(self, key, value_reader, metadata): | ||||
filename, full_path = self.random_filename() | ||||
key_file = self._get_keyfile(key) | ||||
# STORE METADATA | ||||
_metadata = { | ||||
"version": "v1", | ||||
"filename": filename, | ||||
"full_path": full_path, | ||||
"key_file": key_file, | ||||
r5422 | "store_time": time.time(), | |||
"access_count": 1, | ||||
"access_time": 0, | ||||
"size": 0 | ||||
r5420 | } | |||
if metadata: | ||||
_metadata.update(metadata) | ||||
reader = functools.partial(value_reader.read, 2**22) | ||||
iterator = iter(reader, b'') | ||||
size = self._write_file(full_path, iterator, 'xb') | ||||
r5422 | metadata['size'] = size | |||
r5420 | ||||
# after archive is finished, we create a key to save the presence of the binary file | ||||
with open(key_file, 'wb') as f: | ||||
f.write(json.dumps(_metadata)) | ||||
return key, size, MODE_BINARY, filename, _metadata | ||||
r5426 | def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]: | |||
if retry: | ||||
for attempt in range(retry_attempts): | ||||
if key in self: | ||||
break | ||||
# we dind't find the key, wait 1s, and re-check | ||||
time.sleep(1) | ||||
r5420 | if key not in self: | |||
r5426 | log.exception('requested {key} not found in {self}', key, self) | |||
r5420 | raise KeyError(key) | |||
key_file = self._get_keyfile(key) | ||||
with open(key_file, 'rb') as f: | ||||
metadata = json.loads(f.read()) | ||||
filename = metadata['filename'] | ||||
r5422 | try: | |||
r5426 | return open(os.path.join(self.directory, filename), 'rb'), metadata | |||
r5422 | finally: | |||
# update usage stats, count and accessed | ||||
metadata["access_count"] = metadata.get("access_count", 0) + 1 | ||||
metadata["access_time"] = time.time() | ||||
with open(key_file, 'wb') as f: | ||||
f.write(json.dumps(metadata)) | ||||
r5420 | ||||
def random_filename(self): | ||||
"""Return filename and full-path tuple for file storage. | ||||
Filename will be a randomly generated 28 character hexadecimal string | ||||
with ".archive_cache" suffixed. Two levels of sub-directories will be used to | ||||
reduce the size of directories. On older filesystems, lookups in | ||||
directories with many files may be slow. | ||||
""" | ||||
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') | ||||
sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) | ||||
name = hex_name[4:] + '.archive_cache' | ||||
filename = os.path.join(sub_dir, name) | ||||
r5426 | full_path = os.path.join(self.directory, filename) | |||
r5420 | return filename, full_path | |||
def hash(self, key): | ||||
"""Compute portable hash for `key`. | ||||
:param key: key to hash | ||||
:return: hash value | ||||
""" | ||||
mask = 0xFFFFFFFF | ||||
return zlib.adler32(key.encode('utf-8')) & mask # noqa | ||||
def __contains__(self, key): | ||||
"""Return `True` if `key` matching item is found in cache. | ||||
:param key: key matching item | ||||
:return: True if key matching item | ||||
""" | ||||
key_file = self._get_keyfile(key) | ||||
return os.path.exists(key_file) | ||||
r5426 | def __repr__(self): | |||
return f'FileSystemCache(index={self._index}, dir={self.directory})' | ||||
r5420 | ||||
class FanoutCache: | ||||
"""Cache that shards keys and values.""" | ||||
def __init__( | ||||
self, directory=None, **settings | ||||
): | ||||
"""Initialize cache instance. | ||||
:param str directory: cache directory | ||||
:param settings: settings dict | ||||
""" | ||||
if directory is None: | ||||
raise ValueError('directory cannot be None') | ||||
directory = str(directory) | ||||
directory = os.path.expanduser(directory) | ||||
directory = os.path.expandvars(directory) | ||||
self._directory = directory | ||||
self._count = settings.pop('cache_shards') | ||||
self._locking_url = settings.pop('locking_url') | ||||
r5422 | self._eviction_policy = settings['cache_eviction_policy'] | |||
self._cache_size_limit = settings['cache_size_limit'] | ||||
r5420 | self._shards = tuple( | |||
FileSystemCache( | ||||
index=num, | ||||
directory=os.path.join(directory, 'shard_%03d' % num), | ||||
**settings, | ||||
) | ||||
for num in range(self._count) | ||||
) | ||||
self._hash = self._shards[0].hash | ||||
r5426 | @property | |||
def directory(self): | ||||
"""Cache directory.""" | ||||
return self._directory | ||||
r5420 | def get_lock(self, lock_key): | |||
return GenerationLock(lock_key, self._locking_url) | ||||
def _get_shard(self, key) -> FileSystemCache: | ||||
index = self._hash(key) % self._count | ||||
shard = self._shards[index] | ||||
return shard | ||||
def store(self, key, value_reader, metadata=None): | ||||
shard = self._get_shard(key) | ||||
return shard.store(key, value_reader, metadata) | ||||
r5426 | def fetch(self, key, retry=False, retry_attempts=10): | |||
r5420 | """Return file handle corresponding to `key` from cache. | |||
""" | ||||
shard = self._get_shard(key) | ||||
r5426 | return shard.fetch(key, retry=retry, retry_attempts=retry_attempts) | |||
r5420 | ||||
def has_key(self, key): | ||||
"""Return `True` if `key` matching item is found in cache. | ||||
:param key: key for item | ||||
:return: True if key is found | ||||
""" | ||||
shard = self._get_shard(key) | ||||
return key in shard | ||||
def __contains__(self, item): | ||||
return self.has_key(item) | ||||
r5423 | def evict(self, policy=None, size_limit=None): | |||
""" | ||||
Remove old items based on the conditions | ||||
explanation of this algo: | ||||
iterate over each shard, then for each shard iterate over the .key files | ||||
read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and | ||||
access data, time creation, and access counts. | ||||
Store that into a memory DB so we can run different sorting strategies easily. | ||||
Summing the size is a sum sql query. | ||||
Then we run a sorting strategy based on eviction policy. | ||||
We iterate over sorted keys, and remove each checking if we hit the overall limit. | ||||
""" | ||||
policy = policy or self._eviction_policy | ||||
size_limit = size_limit or self._cache_size_limit | ||||
select_policy = EVICTION_POLICY[policy]['evict'] | ||||
r5424 | log.debug('Running eviction policy \'%s\', and checking for size limit: %s', | |||
policy, format_size(size_limit)) | ||||
r5423 | if select_policy is None: | |||
return 0 | ||||
db = DB() | ||||
data = [] | ||||
cnt = 1 | ||||
for shard in self._shards: | ||||
r5426 | for key_file in os.listdir(shard.directory): | |||
r5423 | if key_file.endswith('.key'): | |||
r5426 | key_file_path = os.path.join(shard.directory, key_file) | |||
r5423 | with open(key_file_path, 'rb') as f: | |||
metadata = json.loads(f.read()) | ||||
r5425 | ||||
size = metadata.get('size') | ||||
filename = metadata.get('filename') | ||||
full_path = metadata.get('full_path') | ||||
if not size: | ||||
# in case we don't have size re-calc it... | ||||
size = os.stat(full_path).st_size | ||||
r5423 | ||||
data.append([ | ||||
cnt, | ||||
key_file, | ||||
key_file_path, | ||||
r5425 | filename, | |||
full_path, | ||||
r5423 | metadata.get('store_time', 0), | |||
metadata.get('access_time', 0), | ||||
metadata.get('access_count', 0), | ||||
r5425 | size, | |||
r5423 | ]) | |||
cnt += 1 | ||||
# Insert bulk data using executemany | ||||
db.bulk_insert(data) | ||||
((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall() | ||||
r5424 | log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size)) | |||
r5423 | select_policy_qry = select_policy.format(fields='key_file_path, full_path, size') | |||
sorted_keys = db.sql(select_policy_qry).fetchall() | ||||
r5424 | removed_items = 0 | |||
removed_size = 0 | ||||
r5423 | for key, cached_file, size in sorted_keys: | |||
# simulate removal impact BEFORE removal | ||||
total_size -= size | ||||
r5424 | ||||
r5423 | if total_size <= size_limit: | |||
# we obtained what we wanted... | ||||
break | ||||
os.remove(cached_file) | ||||
os.remove(key) | ||||
r5424 | removed_items += 1 | |||
removed_size += size | ||||
log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size)) | ||||
return removed_items | ||||
r5420 | ||||
def get_archival_config(config): | ||||
final_config = { | ||||
} | ||||
for k, v in config.items(): | ||||
if k.startswith('archive_cache'): | ||||
final_config[k] = v | ||||
return final_config | ||||
def get_archival_cache_store(config): | ||||
global cache_meta | ||||
if cache_meta is not None: | ||||
return cache_meta | ||||
config = get_archival_config(config) | ||||
backend = config['archive_cache.backend.type'] | ||||
if backend != 'filesystem': | ||||
raise ValueError('archive_cache.backend.type only supports "filesystem"') | ||||
archive_cache_locking_url = config['archive_cache.locking.url'] | ||||
archive_cache_dir = config['archive_cache.filesystem.store_dir'] | ||||
archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] | ||||
archive_cache_shards = config['archive_cache.filesystem.cache_shards'] | ||||
archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] | ||||
log.debug('Initializing archival cache instance under %s', archive_cache_dir) | ||||
# check if it's ok to write, and re-create the archive cache | ||||
if not os.path.isdir(archive_cache_dir): | ||||
os.makedirs(archive_cache_dir, exist_ok=True) | ||||
d_cache = FanoutCache( | ||||
archive_cache_dir, | ||||
locking_url=archive_cache_locking_url, | ||||
cache_shards=archive_cache_shards, | ||||
cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, | ||||
cache_eviction_policy=archive_cache_eviction_policy | ||||
) | ||||
cache_meta = d_cache | ||||
return cache_meta | ||||