##// END OF EJS Templates
core: synced vendor/ext_json with ce for better compatability
core: synced vendor/ext_json with ce for better compatability

File last commit:

r1247:35680f51 default
r1250:2c57bb5b default
Show More
fanout_cache.py
455 lines | 14.0 KiB | text/x-python | PythonLexer
# RhodeCode VCSServer provides access to different vcs backends via network.
# Copyright (C) 2014-2024 RhodeCode GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import codecs
import contextlib
import functools
import os
import logging
import time
import typing
import zlib
import sqlite3
from ...ext_json import json
from .lock import GenerationLock
from .utils import format_size
log = logging.getLogger(__name__)
cache_meta = None
UNKNOWN = -241
NO_VAL = -917
MODE_BINARY = 'BINARY'
EVICTION_POLICY = {
'none': {
'evict': None,
},
'least-recently-stored': {
'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
},
'least-recently-used': {
'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
},
'least-frequently-used': {
'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
},
}
class DB:
def __init__(self):
self.connection = sqlite3.connect(':memory:')
self._init_db()
def _init_db(self):
qry = '''
CREATE TABLE IF NOT EXISTS archive_cache (
rowid INTEGER PRIMARY KEY,
key_file TEXT,
key_file_path TEXT,
filename TEXT,
full_path TEXT,
store_time REAL,
access_time REAL,
access_count INTEGER DEFAULT 0,
size INTEGER DEFAULT 0
)
'''
self.sql(qry)
self.connection.commit()
@property
def sql(self):
return self.connection.execute
def bulk_insert(self, rows):
qry = '''
INSERT INTO archive_cache (
rowid,
key_file,
key_file_path,
filename,
full_path,
store_time,
access_time,
access_count,
size
)
VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?
)
'''
cursor = self.connection.cursor()
cursor.executemany(qry, rows)
self.connection.commit()
class FileSystemCache:
def __init__(self, index, directory, **settings):
self._index = index
self._directory = directory
@property
def directory(self):
"""Cache directory."""
return self._directory
def _write_file(self, full_path, iterator, mode, encoding=None):
full_dir, _ = os.path.split(full_path)
for count in range(1, 11):
with contextlib.suppress(OSError):
os.makedirs(full_dir)
try:
# Another cache may have deleted the directory before
# the file could be opened.
writer = open(full_path, mode, encoding=encoding)
except OSError:
if count == 10:
# Give up after 10 tries to open the file.
raise
continue
with writer:
size = 0
for chunk in iterator:
size += len(chunk)
writer.write(chunk)
writer.flush()
# Get the file descriptor
fd = writer.fileno()
# Sync the file descriptor to disk, helps with NFS cases...
os.fsync(fd)
log.debug('written new archive cache under %s', full_path)
return size
def _get_keyfile(self, key):
return os.path.join(self._directory, f'{key}.key')
def store(self, key, value_reader, metadata):
filename, full_path = self.random_filename()
key_file = self._get_keyfile(key)
# STORE METADATA
_metadata = {
"version": "v1",
"filename": filename,
"full_path": full_path,
"key_file": key_file,
"store_time": time.time(),
"access_count": 1,
"access_time": 0,
"size": 0
}
if metadata:
_metadata.update(metadata)
reader = functools.partial(value_reader.read, 2**22)
iterator = iter(reader, b'')
size = self._write_file(full_path, iterator, 'xb')
metadata['size'] = size
# after archive is finished, we create a key to save the presence of the binary file
with open(key_file, 'wb') as f:
f.write(json.dumps(_metadata))
return key, size, MODE_BINARY, filename, _metadata
def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
if retry:
for attempt in range(retry_attempts):
if key in self:
break
# we dind't find the key, wait 1s, and re-check
time.sleep(1)
if key not in self:
log.exception('requested {key} not found in {self}', key, self)
raise KeyError(key)
key_file = self._get_keyfile(key)
with open(key_file, 'rb') as f:
metadata = json.loads(f.read())
filename = metadata['filename']
try:
return open(os.path.join(self.directory, filename), 'rb'), metadata
finally:
# update usage stats, count and accessed
metadata["access_count"] = metadata.get("access_count", 0) + 1
metadata["access_time"] = time.time()
with open(key_file, 'wb') as f:
f.write(json.dumps(metadata))
def random_filename(self):
"""Return filename and full-path tuple for file storage.
Filename will be a randomly generated 28 character hexadecimal string
with ".archive_cache" suffixed. Two levels of sub-directories will be used to
reduce the size of directories. On older filesystems, lookups in
directories with many files may be slow.
"""
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
name = hex_name[4:] + '.archive_cache'
filename = os.path.join(sub_dir, name)
full_path = os.path.join(self.directory, filename)
return filename, full_path
def hash(self, key):
"""Compute portable hash for `key`.
:param key: key to hash
:return: hash value
"""
mask = 0xFFFFFFFF
return zlib.adler32(key.encode('utf-8')) & mask # noqa
def __contains__(self, key):
"""Return `True` if `key` matching item is found in cache.
:param key: key matching item
:return: True if key matching item
"""
key_file = self._get_keyfile(key)
return os.path.exists(key_file)
def __repr__(self):
return f'FileSystemCache(index={self._index}, dir={self.directory})'
class FanoutCache:
"""Cache that shards keys and values."""
def __init__(
self, directory=None, **settings
):
"""Initialize cache instance.
:param str directory: cache directory
:param settings: settings dict
"""
if directory is None:
raise ValueError('directory cannot be None')
directory = str(directory)
directory = os.path.expanduser(directory)
directory = os.path.expandvars(directory)
self._directory = directory
self._count = settings.pop('cache_shards')
self._locking_url = settings.pop('locking_url')
self._eviction_policy = settings['cache_eviction_policy']
self._cache_size_limit = settings['cache_size_limit']
self._shards = tuple(
FileSystemCache(
index=num,
directory=os.path.join(directory, 'shard_%03d' % num),
**settings,
)
for num in range(self._count)
)
self._hash = self._shards[0].hash
@property
def directory(self):
"""Cache directory."""
return self._directory
def get_lock(self, lock_key):
return GenerationLock(lock_key, self._locking_url)
def _get_shard(self, key) -> FileSystemCache:
index = self._hash(key) % self._count
shard = self._shards[index]
return shard
def store(self, key, value_reader, metadata=None):
shard = self._get_shard(key)
return shard.store(key, value_reader, metadata)
def fetch(self, key, retry=False, retry_attempts=10):
"""Return file handle corresponding to `key` from cache.
"""
shard = self._get_shard(key)
return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
def has_key(self, key):
"""Return `True` if `key` matching item is found in cache.
:param key: key for item
:return: True if key is found
"""
shard = self._get_shard(key)
return key in shard
def __contains__(self, item):
return self.has_key(item)
def evict(self, policy=None, size_limit=None):
"""
Remove old items based on the conditions
explanation of this algo:
iterate over each shard, then for each shard iterate over the .key files
read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
access data, time creation, and access counts.
Store that into a memory DB so we can run different sorting strategies easily.
Summing the size is a sum sql query.
Then we run a sorting strategy based on eviction policy.
We iterate over sorted keys, and remove each checking if we hit the overall limit.
"""
policy = policy or self._eviction_policy
size_limit = size_limit or self._cache_size_limit
select_policy = EVICTION_POLICY[policy]['evict']
log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
policy, format_size(size_limit))
if select_policy is None:
return 0
db = DB()
data = []
cnt = 1
for shard in self._shards:
for key_file in os.listdir(shard.directory):
if key_file.endswith('.key'):
key_file_path = os.path.join(shard.directory, key_file)
with open(key_file_path, 'rb') as f:
metadata = json.loads(f.read())
size = metadata.get('size')
filename = metadata.get('filename')
full_path = metadata.get('full_path')
if not size:
# in case we don't have size re-calc it...
size = os.stat(full_path).st_size
data.append([
cnt,
key_file,
key_file_path,
filename,
full_path,
metadata.get('store_time', 0),
metadata.get('access_time', 0),
metadata.get('access_count', 0),
size,
])
cnt += 1
# Insert bulk data using executemany
db.bulk_insert(data)
((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
sorted_keys = db.sql(select_policy_qry).fetchall()
removed_items = 0
removed_size = 0
for key, cached_file, size in sorted_keys:
# simulate removal impact BEFORE removal
total_size -= size
if total_size <= size_limit:
# we obtained what we wanted...
break
os.remove(cached_file)
os.remove(key)
removed_items += 1
removed_size += size
log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
return removed_items
def get_archival_config(config):
final_config = {
}
for k, v in config.items():
if k.startswith('archive_cache'):
final_config[k] = v
return final_config
def get_archival_cache_store(config):
global cache_meta
if cache_meta is not None:
return cache_meta
config = get_archival_config(config)
backend = config['archive_cache.backend.type']
if backend != 'filesystem':
raise ValueError('archive_cache.backend.type only supports "filesystem"')
archive_cache_locking_url = config['archive_cache.locking.url']
archive_cache_dir = config['archive_cache.filesystem.store_dir']
archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
archive_cache_shards = config['archive_cache.filesystem.cache_shards']
archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
log.debug('Initializing archival cache instance under %s', archive_cache_dir)
# check if it's ok to write, and re-create the archive cache
if not os.path.isdir(archive_cache_dir):
os.makedirs(archive_cache_dir, exist_ok=True)
d_cache = FanoutCache(
archive_cache_dir,
locking_url=archive_cache_locking_url,
cache_shards=archive_cache_shards,
cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
cache_eviction_policy=archive_cache_eviction_policy
)
cache_meta = d_cache
return cache_meta