# Copyright (C) 2015-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import codecs import contextlib import functools import os import logging import time import typing import zlib import sqlite3 from ...ext_json import json from .lock import GenerationLock from .utils import format_size log = logging.getLogger(__name__) cache_meta = None UNKNOWN = -241 NO_VAL = -917 MODE_BINARY = 'BINARY' EVICTION_POLICY = { 'none': { 'evict': None, }, 'least-recently-stored': { 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', }, 'least-recently-used': { 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', }, 'least-frequently-used': { 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', }, } class DB: def __init__(self): self.connection = sqlite3.connect(':memory:') self._init_db() def _init_db(self): qry = ''' CREATE TABLE IF NOT EXISTS archive_cache ( rowid INTEGER PRIMARY KEY, key_file TEXT, key_file_path TEXT, filename TEXT, full_path TEXT, store_time REAL, access_time REAL, access_count INTEGER DEFAULT 0, size INTEGER DEFAULT 0 ) ''' self.sql(qry) self.connection.commit() @property def sql(self): return self.connection.execute def bulk_insert(self, rows): qry = ''' INSERT INTO archive_cache ( rowid, key_file, key_file_path, filename, full_path, store_time, access_time, access_count, size ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ? ) ''' cursor = self.connection.cursor() cursor.executemany(qry, rows) self.connection.commit() class FileSystemCache: def __init__(self, index, directory, **settings): self._index = index self._directory = directory @property def directory(self): """Cache directory.""" return self._directory def _write_file(self, full_path, iterator, mode, encoding=None): full_dir, _ = os.path.split(full_path) for count in range(1, 11): with contextlib.suppress(OSError): os.makedirs(full_dir) try: # Another cache may have deleted the directory before # the file could be opened. writer = open(full_path, mode, encoding=encoding) except OSError: if count == 10: # Give up after 10 tries to open the file. raise continue with writer: size = 0 for chunk in iterator: size += len(chunk) writer.write(chunk) return size def _get_keyfile(self, key): return os.path.join(self._directory, f'{key}.key') def store(self, key, value_reader, metadata): filename, full_path = self.random_filename() key_file = self._get_keyfile(key) # STORE METADATA _metadata = { "version": "v1", "filename": filename, "full_path": full_path, "key_file": key_file, "store_time": time.time(), "access_count": 1, "access_time": 0, "size": 0 } if metadata: _metadata.update(metadata) reader = functools.partial(value_reader.read, 2**22) iterator = iter(reader, b'') size = self._write_file(full_path, iterator, 'xb') metadata['size'] = size # after archive is finished, we create a key to save the presence of the binary file with open(key_file, 'wb') as f: f.write(json.dumps(_metadata)) return key, size, MODE_BINARY, filename, _metadata def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]: if retry: for attempt in range(retry_attempts): if key in self: break # we dind't find the key, wait 1s, and re-check time.sleep(1) if key not in self: log.exception('requested {key} not found in {self}', key, self) raise KeyError(key) key_file = self._get_keyfile(key) with open(key_file, 'rb') as f: metadata = json.loads(f.read()) filename = metadata['filename'] try: return open(os.path.join(self.directory, filename), 'rb'), metadata finally: # update usage stats, count and accessed metadata["access_count"] = metadata.get("access_count", 0) + 1 metadata["access_time"] = time.time() with open(key_file, 'wb') as f: f.write(json.dumps(metadata)) def random_filename(self): """Return filename and full-path tuple for file storage. Filename will be a randomly generated 28 character hexadecimal string with ".archive_cache" suffixed. Two levels of sub-directories will be used to reduce the size of directories. On older filesystems, lookups in directories with many files may be slow. """ hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8') sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) name = hex_name[4:] + '.archive_cache' filename = os.path.join(sub_dir, name) full_path = os.path.join(self.directory, filename) return filename, full_path def hash(self, key): """Compute portable hash for `key`. :param key: key to hash :return: hash value """ mask = 0xFFFFFFFF return zlib.adler32(key.encode('utf-8')) & mask # noqa def __contains__(self, key): """Return `True` if `key` matching item is found in cache. :param key: key matching item :return: True if key matching item """ key_file = self._get_keyfile(key) return os.path.exists(key_file) def __repr__(self): return f'FileSystemCache(index={self._index}, dir={self.directory})' class FanoutCache: """Cache that shards keys and values.""" def __init__( self, directory=None, **settings ): """Initialize cache instance. :param str directory: cache directory :param settings: settings dict """ if directory is None: raise ValueError('directory cannot be None') directory = str(directory) directory = os.path.expanduser(directory) directory = os.path.expandvars(directory) self._directory = directory self._count = settings.pop('cache_shards') self._locking_url = settings.pop('locking_url') self._eviction_policy = settings['cache_eviction_policy'] self._cache_size_limit = settings['cache_size_limit'] self._shards = tuple( FileSystemCache( index=num, directory=os.path.join(directory, 'shard_%03d' % num), **settings, ) for num in range(self._count) ) self._hash = self._shards[0].hash @property def directory(self): """Cache directory.""" return self._directory def get_lock(self, lock_key): return GenerationLock(lock_key, self._locking_url) def _get_shard(self, key) -> FileSystemCache: index = self._hash(key) % self._count shard = self._shards[index] return shard def store(self, key, value_reader, metadata=None): shard = self._get_shard(key) return shard.store(key, value_reader, metadata) def fetch(self, key, retry=False, retry_attempts=10): """Return file handle corresponding to `key` from cache. """ shard = self._get_shard(key) return shard.fetch(key, retry=retry, retry_attempts=retry_attempts) def has_key(self, key): """Return `True` if `key` matching item is found in cache. :param key: key for item :return: True if key is found """ shard = self._get_shard(key) return key in shard def __contains__(self, item): return self.has_key(item) def evict(self, policy=None, size_limit=None): """ Remove old items based on the conditions explanation of this algo: iterate over each shard, then for each shard iterate over the .key files read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and access data, time creation, and access counts. Store that into a memory DB so we can run different sorting strategies easily. Summing the size is a sum sql query. Then we run a sorting strategy based on eviction policy. We iterate over sorted keys, and remove each checking if we hit the overall limit. """ policy = policy or self._eviction_policy size_limit = size_limit or self._cache_size_limit select_policy = EVICTION_POLICY[policy]['evict'] log.debug('Running eviction policy \'%s\', and checking for size limit: %s', policy, format_size(size_limit)) if select_policy is None: return 0 db = DB() data = [] cnt = 1 for shard in self._shards: for key_file in os.listdir(shard.directory): if key_file.endswith('.key'): key_file_path = os.path.join(shard.directory, key_file) with open(key_file_path, 'rb') as f: metadata = json.loads(f.read()) size = metadata.get('size') filename = metadata.get('filename') full_path = metadata.get('full_path') if not size: # in case we don't have size re-calc it... size = os.stat(full_path).st_size data.append([ cnt, key_file, key_file_path, filename, full_path, metadata.get('store_time', 0), metadata.get('access_time', 0), metadata.get('access_count', 0), size, ]) cnt += 1 # Insert bulk data using executemany db.bulk_insert(data) ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall() log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size)) select_policy_qry = select_policy.format(fields='key_file_path, full_path, size') sorted_keys = db.sql(select_policy_qry).fetchall() removed_items = 0 removed_size = 0 for key, cached_file, size in sorted_keys: # simulate removal impact BEFORE removal total_size -= size if total_size <= size_limit: # we obtained what we wanted... break os.remove(cached_file) os.remove(key) removed_items += 1 removed_size += size log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size)) return removed_items def get_archival_config(config): final_config = { } for k, v in config.items(): if k.startswith('archive_cache'): final_config[k] = v return final_config def get_archival_cache_store(config): global cache_meta if cache_meta is not None: return cache_meta config = get_archival_config(config) backend = config['archive_cache.backend.type'] if backend != 'filesystem': raise ValueError('archive_cache.backend.type only supports "filesystem"') archive_cache_locking_url = config['archive_cache.locking.url'] archive_cache_dir = config['archive_cache.filesystem.store_dir'] archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb'] archive_cache_shards = config['archive_cache.filesystem.cache_shards'] archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy'] log.debug('Initializing archival cache instance under %s', archive_cache_dir) # check if it's ok to write, and re-create the archive cache if not os.path.isdir(archive_cache_dir): os.makedirs(archive_cache_dir, exist_ok=True) d_cache = FanoutCache( archive_cache_dir, locking_url=archive_cache_locking_url, cache_shards=archive_cache_shards, cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024, cache_eviction_policy=archive_cache_eviction_policy ) cache_meta = d_cache return cache_meta