diff --git a/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py b/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py --- a/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py +++ b/rhodecode/lib/rc_cache/archive_cache/fanout_cache.py @@ -24,6 +24,7 @@ import logging import time import typing import zlib +import sqlite3 from rhodecode.lib.ext_json import json from .lock import GenerationLock @@ -38,6 +39,72 @@ NO_VAL = -917 MODE_BINARY = 'BINARY' +EVICTION_POLICY = { + 'none': { + 'evict': None, + }, + 'least-recently-stored': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', + }, + 'least-recently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', + }, + 'least-frequently-used': { + 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', + }, +} + + +class DB: + + def __init__(self): + self.connection = sqlite3.connect(':memory:') + self._init_db() + + def _init_db(self): + qry = ''' + CREATE TABLE IF NOT EXISTS archive_cache ( + rowid INTEGER PRIMARY KEY, + key_file TEXT, + key_file_path TEXT, + filename TEXT, + full_path TEXT, + store_time REAL, + access_time REAL, + access_count INTEGER DEFAULT 0, + size INTEGER DEFAULT 0 + ) + ''' + + self.sql(qry) + self.connection.commit() + + @property + def sql(self): + return self.connection.execute + + def bulk_insert(self, rows): + qry = ''' + INSERT INTO archive_cache ( + rowid, + key_file, + key_file_path, + filename, + full_path, + store_time, + access_time, + access_count, + size + ) + VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ''' + cursor = self.connection.cursor() + cursor.executemany(qry, rows) + self.connection.commit() + + class FileSystemCache: def __init__(self, index, directory, **settings): @@ -225,9 +292,76 @@ class FanoutCache: def __contains__(self, item): return self.has_key(item) - def evict(self): - """Remove old items based on the conditions""" - # TODO: Implement this... + def evict(self, policy=None, size_limit=None): + """ + Remove old items based on the conditions + + + explanation of this algo: + iterate over each shard, then for each shard iterate over the .key files + read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and + access data, time creation, and access counts. + + Store that into a memory DB so we can run different sorting strategies easily. + Summing the size is a sum sql query. + + Then we run a sorting strategy based on eviction policy. + We iterate over sorted keys, and remove each checking if we hit the overall limit. + """ + + policy = policy or self._eviction_policy + size_limit = size_limit or self._cache_size_limit + + select_policy = EVICTION_POLICY[policy]['evict'] + + if select_policy is None: + return 0 + + db = DB() + + data = [] + cnt = 1 + for shard in self._shards: + for key_file in os.listdir(shard._directory): + if key_file.endswith('.key'): + key_file_path = os.path.join(shard._directory, key_file) + with open(key_file_path, 'rb') as f: + metadata = json.loads(f.read()) + # in case we don't have size re-calc it... + if not metadata.get('size'): + fn = metadata.get('full_path') + size = os.stat(fn).st_size + + data.append([ + cnt, + key_file, + key_file_path, + metadata.get('filename'), + metadata.get('full_path'), + metadata.get('store_time', 0), + metadata.get('access_time', 0), + metadata.get('access_count', 0), + metadata.get('size', size), + ]) + cnt += 1 + + # Insert bulk data using executemany + db.bulk_insert(data) + + ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall() + + select_policy_qry = select_policy.format(fields='key_file_path, full_path, size') + sorted_keys = db.sql(select_policy_qry).fetchall() + + for key, cached_file, size in sorted_keys: + # simulate removal impact BEFORE removal + total_size -= size + if total_size <= size_limit: + # we obtained what we wanted... + break + + os.remove(cached_file) + os.remove(key) return