diff --git a/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py b/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py --- a/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py +++ b/vcsserver/lib/rc_cache/archive_cache/fanout_cache.py @@ -23,6 +23,7 @@ import logging import time import typing import zlib +import sqlite3 from vcsserver.lib.rc_json import json from .lock import GenerationLock @@ -37,6 +38,72 @@ NO_VAL = -917 MODE_BINARY = 'BINARY' +EVICTION_POLICY = { + 'none': { + 'evict': None, + }, + 'least-recently-stored': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', + }, + 'least-recently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', + }, + 'least-frequently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', + }, +} + + +class DB: + + def __init__(self): + self.connection = sqlite3.connect(':memory:') + self._init_db() + + def _init_db(self): + qry = ''' + CREATE TABLE IF NOT EXISTS archive_cache ( + rowid INTEGER PRIMARY KEY, + key_file TEXT, + key_file_path TEXT, + filename TEXT, + full_path TEXT, + store_time REAL, + access_time REAL, + access_count INTEGER DEFAULT 0, + size INTEGER DEFAULT 0 + ) + ''' + + self.sql(qry) + self.connection.commit() + + @property + def sql(self): + return self.connection.execute + + def bulk_insert(self, rows): + qry = ''' + INSERT INTO archive_cache ( + rowid, + key_file, + key_file_path, + filename, + full_path, + store_time, + access_time, + access_count, + size + ) + VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ''' + cursor = self.connection.cursor() + cursor.executemany(qry, rows) + self.connection.commit() + + class FileSystemCache: def __init__(self, index, directory, **settings): @@ -77,10 +144,13 @@ class FileSystemCache: # STORE METADATA _metadata = { "version": "v1", - "timestamp": time.time(), "filename": filename, "full_path": full_path, "key_file": key_file, + "store_time": time.time(), + "access_count": 1, + "access_time": 0, + "size": 0 } if metadata: _metadata.update(metadata) @@ -89,6 +159,7 @@ class FileSystemCache: iterator = iter(reader, b'') size = self._write_file(full_path, iterator, 'xb') + metadata['size'] = size # after archive is finished, we create a key to save the presence of the binary file with open(key_file, 'wb') as f: @@ -106,7 +177,15 @@ class FileSystemCache: filename = metadata['filename'] - return open(os.path.join(self._directory, filename), 'rb'), metadata + try: + return open(os.path.join(self._directory, filename), 'rb'), metadata + finally: + # update usage stats, count and accessed + metadata["access_count"] = metadata.get("access_count", 0) + 1 + metadata["access_time"] = time.time() + + with open(key_file, 'wb') as f: + f.write(json.dumps(metadata)) def random_filename(self): """Return filename and full-path tuple for file storage. @@ -168,6 +247,9 @@ class FanoutCache: self._count = settings.pop('cache_shards') self._locking_url = settings.pop('locking_url') + self._eviction_policy = settings['cache_eviction_policy'] + self._cache_size_limit = settings['cache_size_limit'] + self._shards = tuple( FileSystemCache( index=num, @@ -209,6 +291,78 @@ class FanoutCache: def __contains__(self, item): return self.has_key(item) + def evict(self, policy=None, size_limit=None): + """ + Remove old items based on the conditions + + + explanation of this algo: + iterate over each shard, then for each shard iterate over the .key files + read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and + access data, time creation, and access counts. + + Store that into a memory DB so we can run different sorting strategies easily. + Summing the size is a sum sql query. + + Then we run a sorting strategy based on eviction policy. + We iterate over sorted keys, and remove each checking if we hit the overall limit. + """ + + policy = policy or self._eviction_policy + size_limit = size_limit or self._cache_size_limit + + select_policy = EVICTION_POLICY[policy]['evict'] + + if select_policy is None: + return 0 + + db = DB() + + data = [] + cnt = 1 + for shard in self._shards: + for key_file in os.listdir(shard._directory): + if key_file.endswith('.key'): + key_file_path = os.path.join(shard._directory, key_file) + with open(key_file_path, 'rb') as f: + metadata = json.loads(f.read()) + # in case we don't have size re-calc it... + if not metadata.get('size'): + fn = metadata.get('full_path') + size = os.stat(fn).st_size + + data.append([ + cnt, + key_file, + key_file_path, + metadata.get('filename'), + metadata.get('full_path'), + metadata.get('store_time', 0), + metadata.get('access_time', 0), + metadata.get('access_count', 0), + metadata.get('size', size), + ]) + cnt += 1 + + # Insert bulk data using executemany + db.bulk_insert(data) + + ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall() + + select_policy_qry = select_policy.format(fields='key_file_path, full_path, size') + sorted_keys = db.sql(select_policy_qry).fetchall() + + for key, cached_file, size in sorted_keys: + # simulate removal impact BEFORE removal + total_size -= size + if total_size <= size_limit: + # we obtained what we wanted... + break + + os.remove(cached_file) + os.remove(key) + return + def get_archival_config(config): @@ -255,4 +409,3 @@ def get_archival_cache_store(config): ) cache_meta = d_cache return cache_meta - diff --git a/vcsserver/lib/rc_cache/archive_cache/utils.py b/vcsserver/lib/rc_cache/archive_cache/utils.py --- a/vcsserver/lib/rc_cache/archive_cache/utils.py +++ b/vcsserver/lib/rc_cache/archive_cache/utils.py @@ -15,6 +15,8 @@ # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +import os + class ArchiveCacheLock(Exception): pass @@ -27,3 +29,43 @@ def archive_iterator(_reader, block_size if not data: break yield data + + +def get_directory_statistics(start_path): + """ + total_files, total_size, directory_stats = get_directory_statistics(start_path) + + print(f"Directory statistics for: {start_path}\n") + print(f"Total files: {total_files}") + print(f"Total size: {format_size(total_size)}\n") + + :param start_path: + :return: + """ + + total_files = 0 + total_size = 0 + directory_stats = {} + + for dir_path, dir_names, file_names in os.walk(start_path): + dir_size = 0 + file_count = len(file_names) + + for file in file_names: + filepath = os.path.join(dir_path, file) + file_size = os.path.getsize(filepath) + dir_size += file_size + + directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size} + total_files += file_count + total_size += dir_size + + return total_files, total_size, directory_stats + + +def format_size(size): + # Convert size in bytes to a human-readable format (e.g., KB, MB, GB) + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size < 1024: + return f"{size:.2f} {unit}" + size /= 1024