|
|
# RhodeCode VCSServer provides access to different vcs backends via network.
|
|
|
# Copyright (C) 2014-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
|
# (at your option) any later version.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
|
|
import codecs
|
|
|
import contextlib
|
|
|
import functools
|
|
|
import os
|
|
|
import logging
|
|
|
import time
|
|
|
import typing
|
|
|
import zlib
|
|
|
import sqlite3
|
|
|
|
|
|
from ...ext_json import json
|
|
|
from .lock import GenerationLock
|
|
|
from .utils import format_size
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
cache_meta = None
|
|
|
|
|
|
UNKNOWN = -241
|
|
|
NO_VAL = -917
|
|
|
|
|
|
MODE_BINARY = 'BINARY'
|
|
|
|
|
|
|
|
|
EVICTION_POLICY = {
|
|
|
'none': {
|
|
|
'evict': None,
|
|
|
},
|
|
|
'least-recently-stored': {
|
|
|
'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
|
|
|
},
|
|
|
'least-recently-used': {
|
|
|
'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
|
|
|
},
|
|
|
'least-frequently-used': {
|
|
|
'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
|
|
|
},
|
|
|
}
|
|
|
|
|
|
|
|
|
class DB:
|
|
|
|
|
|
def __init__(self):
|
|
|
self.connection = sqlite3.connect(':memory:')
|
|
|
self._init_db()
|
|
|
|
|
|
def _init_db(self):
|
|
|
qry = '''
|
|
|
CREATE TABLE IF NOT EXISTS archive_cache (
|
|
|
rowid INTEGER PRIMARY KEY,
|
|
|
key_file TEXT,
|
|
|
key_file_path TEXT,
|
|
|
filename TEXT,
|
|
|
full_path TEXT,
|
|
|
store_time REAL,
|
|
|
access_time REAL,
|
|
|
access_count INTEGER DEFAULT 0,
|
|
|
size INTEGER DEFAULT 0
|
|
|
)
|
|
|
'''
|
|
|
|
|
|
self.sql(qry)
|
|
|
self.connection.commit()
|
|
|
|
|
|
@property
|
|
|
def sql(self):
|
|
|
return self.connection.execute
|
|
|
|
|
|
def bulk_insert(self, rows):
|
|
|
qry = '''
|
|
|
INSERT INTO archive_cache (
|
|
|
rowid,
|
|
|
key_file,
|
|
|
key_file_path,
|
|
|
filename,
|
|
|
full_path,
|
|
|
store_time,
|
|
|
access_time,
|
|
|
access_count,
|
|
|
size
|
|
|
)
|
|
|
VALUES (
|
|
|
?, ?, ?, ?, ?, ?, ?, ?, ?
|
|
|
)
|
|
|
'''
|
|
|
cursor = self.connection.cursor()
|
|
|
cursor.executemany(qry, rows)
|
|
|
self.connection.commit()
|
|
|
|
|
|
|
|
|
class FileSystemCache:
|
|
|
|
|
|
def __init__(self, index, directory, **settings):
|
|
|
self._index = index
|
|
|
self._directory = directory
|
|
|
|
|
|
@property
|
|
|
def directory(self):
|
|
|
"""Cache directory."""
|
|
|
return self._directory
|
|
|
|
|
|
def _write_file(self, full_path, iterator, mode, encoding=None):
|
|
|
full_dir, _ = os.path.split(full_path)
|
|
|
|
|
|
for count in range(1, 11):
|
|
|
with contextlib.suppress(OSError):
|
|
|
os.makedirs(full_dir)
|
|
|
|
|
|
try:
|
|
|
# Another cache may have deleted the directory before
|
|
|
# the file could be opened.
|
|
|
writer = open(full_path, mode, encoding=encoding)
|
|
|
except OSError:
|
|
|
if count == 10:
|
|
|
# Give up after 10 tries to open the file.
|
|
|
raise
|
|
|
continue
|
|
|
|
|
|
with writer:
|
|
|
size = 0
|
|
|
for chunk in iterator:
|
|
|
size += len(chunk)
|
|
|
writer.write(chunk)
|
|
|
writer.flush()
|
|
|
# Get the file descriptor
|
|
|
fd = writer.fileno()
|
|
|
|
|
|
# Sync the file descriptor to disk, helps with NFS cases...
|
|
|
os.fsync(fd)
|
|
|
log.debug('written new archive cache under %s', full_path)
|
|
|
return size
|
|
|
|
|
|
def _get_keyfile(self, key):
|
|
|
return os.path.join(self._directory, f'{key}.key')
|
|
|
|
|
|
def store(self, key, value_reader, metadata):
|
|
|
filename, full_path = self.random_filename()
|
|
|
key_file = self._get_keyfile(key)
|
|
|
|
|
|
# STORE METADATA
|
|
|
_metadata = {
|
|
|
"version": "v1",
|
|
|
"filename": filename,
|
|
|
"full_path": full_path,
|
|
|
"key_file": key_file,
|
|
|
"store_time": time.time(),
|
|
|
"access_count": 1,
|
|
|
"access_time": 0,
|
|
|
"size": 0
|
|
|
}
|
|
|
if metadata:
|
|
|
_metadata.update(metadata)
|
|
|
|
|
|
reader = functools.partial(value_reader.read, 2**22)
|
|
|
|
|
|
iterator = iter(reader, b'')
|
|
|
size = self._write_file(full_path, iterator, 'xb')
|
|
|
metadata['size'] = size
|
|
|
|
|
|
# after archive is finished, we create a key to save the presence of the binary file
|
|
|
with open(key_file, 'wb') as f:
|
|
|
f.write(json.dumps(_metadata))
|
|
|
|
|
|
return key, size, MODE_BINARY, filename, _metadata
|
|
|
|
|
|
def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
|
|
|
|
|
|
if retry:
|
|
|
for attempt in range(retry_attempts):
|
|
|
if key in self:
|
|
|
break
|
|
|
# we dind't find the key, wait 1s, and re-check
|
|
|
time.sleep(1)
|
|
|
|
|
|
if key not in self:
|
|
|
log.exception('requested {key} not found in {self}', key, self)
|
|
|
raise KeyError(key)
|
|
|
|
|
|
key_file = self._get_keyfile(key)
|
|
|
with open(key_file, 'rb') as f:
|
|
|
metadata = json.loads(f.read())
|
|
|
|
|
|
filename = metadata['filename']
|
|
|
|
|
|
try:
|
|
|
return open(os.path.join(self.directory, filename), 'rb'), metadata
|
|
|
finally:
|
|
|
# update usage stats, count and accessed
|
|
|
metadata["access_count"] = metadata.get("access_count", 0) + 1
|
|
|
metadata["access_time"] = time.time()
|
|
|
|
|
|
with open(key_file, 'wb') as f:
|
|
|
f.write(json.dumps(metadata))
|
|
|
|
|
|
def random_filename(self):
|
|
|
"""Return filename and full-path tuple for file storage.
|
|
|
|
|
|
Filename will be a randomly generated 28 character hexadecimal string
|
|
|
with ".archive_cache" suffixed. Two levels of sub-directories will be used to
|
|
|
reduce the size of directories. On older filesystems, lookups in
|
|
|
directories with many files may be slow.
|
|
|
"""
|
|
|
|
|
|
hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
|
|
|
sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
|
|
|
name = hex_name[4:] + '.archive_cache'
|
|
|
filename = os.path.join(sub_dir, name)
|
|
|
full_path = os.path.join(self.directory, filename)
|
|
|
return filename, full_path
|
|
|
|
|
|
def hash(self, key):
|
|
|
"""Compute portable hash for `key`.
|
|
|
|
|
|
:param key: key to hash
|
|
|
:return: hash value
|
|
|
|
|
|
"""
|
|
|
mask = 0xFFFFFFFF
|
|
|
return zlib.adler32(key.encode('utf-8')) & mask # noqa
|
|
|
|
|
|
def __contains__(self, key):
|
|
|
"""Return `True` if `key` matching item is found in cache.
|
|
|
|
|
|
:param key: key matching item
|
|
|
:return: True if key matching item
|
|
|
|
|
|
"""
|
|
|
key_file = self._get_keyfile(key)
|
|
|
return os.path.exists(key_file)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return f'FileSystemCache(index={self._index}, dir={self.directory})'
|
|
|
|
|
|
|
|
|
class FanoutCache:
|
|
|
"""Cache that shards keys and values."""
|
|
|
|
|
|
def __init__(
|
|
|
self, directory=None, **settings
|
|
|
):
|
|
|
"""Initialize cache instance.
|
|
|
|
|
|
:param str directory: cache directory
|
|
|
:param settings: settings dict
|
|
|
|
|
|
"""
|
|
|
if directory is None:
|
|
|
raise ValueError('directory cannot be None')
|
|
|
|
|
|
directory = str(directory)
|
|
|
directory = os.path.expanduser(directory)
|
|
|
directory = os.path.expandvars(directory)
|
|
|
self._directory = directory
|
|
|
|
|
|
self._count = settings.pop('cache_shards')
|
|
|
self._locking_url = settings.pop('locking_url')
|
|
|
|
|
|
self._eviction_policy = settings['cache_eviction_policy']
|
|
|
self._cache_size_limit = settings['cache_size_limit']
|
|
|
|
|
|
self._shards = tuple(
|
|
|
FileSystemCache(
|
|
|
index=num,
|
|
|
directory=os.path.join(directory, 'shard_%03d' % num),
|
|
|
**settings,
|
|
|
)
|
|
|
for num in range(self._count)
|
|
|
)
|
|
|
self._hash = self._shards[0].hash
|
|
|
|
|
|
@property
|
|
|
def directory(self):
|
|
|
"""Cache directory."""
|
|
|
return self._directory
|
|
|
|
|
|
def get_lock(self, lock_key):
|
|
|
return GenerationLock(lock_key, self._locking_url)
|
|
|
|
|
|
def _get_shard(self, key) -> FileSystemCache:
|
|
|
index = self._hash(key) % self._count
|
|
|
shard = self._shards[index]
|
|
|
return shard
|
|
|
|
|
|
def store(self, key, value_reader, metadata=None):
|
|
|
shard = self._get_shard(key)
|
|
|
return shard.store(key, value_reader, metadata)
|
|
|
|
|
|
def fetch(self, key, retry=False, retry_attempts=10):
|
|
|
"""Return file handle corresponding to `key` from cache.
|
|
|
"""
|
|
|
shard = self._get_shard(key)
|
|
|
return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
|
|
|
|
|
|
def has_key(self, key):
|
|
|
"""Return `True` if `key` matching item is found in cache.
|
|
|
|
|
|
:param key: key for item
|
|
|
:return: True if key is found
|
|
|
|
|
|
"""
|
|
|
shard = self._get_shard(key)
|
|
|
return key in shard
|
|
|
|
|
|
def __contains__(self, item):
|
|
|
return self.has_key(item)
|
|
|
|
|
|
def evict(self, policy=None, size_limit=None):
|
|
|
"""
|
|
|
Remove old items based on the conditions
|
|
|
|
|
|
|
|
|
explanation of this algo:
|
|
|
iterate over each shard, then for each shard iterate over the .key files
|
|
|
read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
|
|
|
access data, time creation, and access counts.
|
|
|
|
|
|
Store that into a memory DB so we can run different sorting strategies easily.
|
|
|
Summing the size is a sum sql query.
|
|
|
|
|
|
Then we run a sorting strategy based on eviction policy.
|
|
|
We iterate over sorted keys, and remove each checking if we hit the overall limit.
|
|
|
"""
|
|
|
|
|
|
policy = policy or self._eviction_policy
|
|
|
size_limit = size_limit or self._cache_size_limit
|
|
|
|
|
|
select_policy = EVICTION_POLICY[policy]['evict']
|
|
|
|
|
|
log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
|
|
|
policy, format_size(size_limit))
|
|
|
|
|
|
if select_policy is None:
|
|
|
return 0
|
|
|
|
|
|
db = DB()
|
|
|
|
|
|
data = []
|
|
|
cnt = 1
|
|
|
for shard in self._shards:
|
|
|
for key_file in os.listdir(shard.directory):
|
|
|
if key_file.endswith('.key'):
|
|
|
key_file_path = os.path.join(shard.directory, key_file)
|
|
|
with open(key_file_path, 'rb') as f:
|
|
|
metadata = json.loads(f.read())
|
|
|
|
|
|
size = metadata.get('size')
|
|
|
filename = metadata.get('filename')
|
|
|
full_path = metadata.get('full_path')
|
|
|
|
|
|
if not size:
|
|
|
# in case we don't have size re-calc it...
|
|
|
size = os.stat(full_path).st_size
|
|
|
|
|
|
data.append([
|
|
|
cnt,
|
|
|
key_file,
|
|
|
key_file_path,
|
|
|
filename,
|
|
|
full_path,
|
|
|
metadata.get('store_time', 0),
|
|
|
metadata.get('access_time', 0),
|
|
|
metadata.get('access_count', 0),
|
|
|
size,
|
|
|
])
|
|
|
cnt += 1
|
|
|
|
|
|
# Insert bulk data using executemany
|
|
|
db.bulk_insert(data)
|
|
|
|
|
|
((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
|
|
|
log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
|
|
|
select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
|
|
|
sorted_keys = db.sql(select_policy_qry).fetchall()
|
|
|
|
|
|
removed_items = 0
|
|
|
removed_size = 0
|
|
|
for key, cached_file, size in sorted_keys:
|
|
|
# simulate removal impact BEFORE removal
|
|
|
total_size -= size
|
|
|
|
|
|
if total_size <= size_limit:
|
|
|
# we obtained what we wanted...
|
|
|
break
|
|
|
|
|
|
os.remove(cached_file)
|
|
|
os.remove(key)
|
|
|
removed_items += 1
|
|
|
removed_size += size
|
|
|
|
|
|
log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
|
|
|
return removed_items
|
|
|
|
|
|
|
|
|
def get_archival_config(config):
|
|
|
|
|
|
final_config = {
|
|
|
|
|
|
}
|
|
|
|
|
|
for k, v in config.items():
|
|
|
if k.startswith('archive_cache'):
|
|
|
final_config[k] = v
|
|
|
|
|
|
return final_config
|
|
|
|
|
|
|
|
|
def get_archival_cache_store(config):
|
|
|
|
|
|
global cache_meta
|
|
|
if cache_meta is not None:
|
|
|
return cache_meta
|
|
|
|
|
|
config = get_archival_config(config)
|
|
|
backend = config['archive_cache.backend.type']
|
|
|
if backend != 'filesystem':
|
|
|
raise ValueError('archive_cache.backend.type only supports "filesystem"')
|
|
|
|
|
|
archive_cache_locking_url = config['archive_cache.locking.url']
|
|
|
archive_cache_dir = config['archive_cache.filesystem.store_dir']
|
|
|
archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
|
|
|
archive_cache_shards = config['archive_cache.filesystem.cache_shards']
|
|
|
archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
|
|
|
|
|
|
log.debug('Initializing archival cache instance under %s', archive_cache_dir)
|
|
|
|
|
|
# check if it's ok to write, and re-create the archive cache
|
|
|
if not os.path.isdir(archive_cache_dir):
|
|
|
os.makedirs(archive_cache_dir, exist_ok=True)
|
|
|
|
|
|
d_cache = FanoutCache(
|
|
|
archive_cache_dir,
|
|
|
locking_url=archive_cache_locking_url,
|
|
|
cache_shards=archive_cache_shards,
|
|
|
cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
|
|
|
cache_eviction_policy=archive_cache_eviction_policy
|
|
|
)
|
|
|
cache_meta = d_cache
|
|
|
return cache_meta
|
|
|
|