# Copyright (C) 2015-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import functools import logging import typing import time import zlib from ...ext_json import json from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size from ..lock import GenerationLock log = logging.getLogger(__name__) class BaseShard: storage_type: str = '' fs = None @classmethod def hash(cls, key): """Compute portable hash for `key`. :param key: key to hash :return: hash value """ mask = 0xFFFFFFFF return zlib.adler32(key.encode('utf-8')) & mask # noqa def _write_file(self, full_path, read_iterator, mode): raise NotImplementedError def _get_keyfile(self, key): raise NotImplementedError def random_filename(self): raise NotImplementedError def store(self, *args, **kwargs): raise NotImplementedError def _store(self, key, value_reader, metadata, mode): (filename, # hash-name full_path # full-path/hash-name ) = self.random_filename() key_file, key_file_path = self._get_keyfile(key) # STORE METADATA _metadata = { "version": "v1", "key_file": key_file, # this is the .key.json file storing meta "key_file_path": key_file_path, # full path to key_file "archive_key": key, # original name we stored archive under, e.g my-archive.zip "archive_filename": filename, # the actual filename we stored that file under "archive_full_path": full_path, "store_time": time.time(), "access_count": 0, "access_time": 0, "size": 0 } if metadata: _metadata.update(metadata) read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') size, sha256 = self._write_file(full_path, read_iterator, mode) _metadata['size'] = size _metadata['sha256'] = sha256 # after archive is finished, we create a key to save the presence of the binary file with self.fs.open(key_file_path, 'wb') as f: f.write(json.dumps(_metadata)) return key, filename, size, _metadata def fetch(self, *args, **kwargs): raise NotImplementedError def _fetch(self, key, retry, retry_attempts, retry_backoff, presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: if retry is NOT_GIVEN: retry = False if retry_attempts is NOT_GIVEN: retry_attempts = 0 if retry and retry_attempts > 0: for attempt in range(1, retry_attempts + 1): if key in self: break # we didn't find the key, wait retry_backoff N seconds, and re-check time.sleep(retry_backoff) if key not in self: log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}') raise KeyError(key) key_file, key_file_path = self._get_keyfile(key) with self.fs.open(key_file_path, 'rb') as f: metadata = json.loads(f.read()) archive_path = metadata['archive_full_path'] if presigned_url_expires and presigned_url_expires > 0: metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires) try: return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata finally: # update usage stats, count and accessed metadata["access_count"] = metadata.get("access_count", 0) + 1 metadata["access_time"] = time.time() log.debug('Updated %s with access snapshot, access_count=%s access_time=%s', key_file, metadata['access_count'], metadata['access_time']) with self.fs.open(key_file_path, 'wb') as f: f.write(json.dumps(metadata)) def remove(self, *args, **kwargs): raise NotImplementedError def _remove(self, key): if key not in self: log.exception(f'requested key={key} not found in {self}') raise KeyError(key) key_file, key_file_path = self._get_keyfile(key) with self.fs.open(key_file_path, 'rb') as f: metadata = json.loads(f.read()) archive_path = metadata['archive_full_path'] self.fs.rm(archive_path) self.fs.rm(key_file_path) return 1 @property def storage_medium(self): return getattr(self, self.storage_type) @property def key_suffix(self): return 'key.json' def __contains__(self, key): """Return `True` if `key` matching item is found in cache. :param key: key matching item :return: True if key matching item """ key_file, key_file_path = self._get_keyfile(key) return self.fs.exists(key_file_path) class BaseCache: _locking_url: str = '' _storage_path: str = '' _config: dict = {} retry = False retry_attempts: int = 0 retry_backoff: int | float = 1 _shards = tuple() shard_cls = BaseShard # define the presigned url expiration, 0 == disabled presigned_url_expires: int = 0 def __contains__(self, key): """Return `True` if `key` matching item is found in cache. :param key: key matching item :return: True if key matching item """ return self.has_key(key) def __repr__(self): return f'<{self.__class__.__name__}(storage={self._storage_path})>' @classmethod def gb_to_bytes(cls, gb): return gb * (1024 ** 3) @property def storage_path(self): return self._storage_path @classmethod def get_stats_db(cls): return StatsDB() def get_conf(self, key, pop=False): if key not in self._config: raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config") val = self._config[key] if pop: del self._config[key] return val def _get_shard(self, key) -> shard_cls: index = self._hash(key) % self._shard_count shard = self._shards[index] return shard def _get_size(self, shard, archive_path): raise NotImplementedError def store(self, key, value_reader, metadata=None): shard = self._get_shard(key) return shard.store(key, value_reader, metadata) def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]: """ Return file handle corresponding to `key` from specific shard cache. """ if retry is NOT_GIVEN: retry = self.retry if retry_attempts is NOT_GIVEN: retry_attempts = self.retry_attempts retry_backoff = self.retry_backoff presigned_url_expires = self.presigned_url_expires shard = self._get_shard(key) return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff, presigned_url_expires=presigned_url_expires) def remove(self, key): shard = self._get_shard(key) return shard.remove(key) def has_key(self, archive_key): """Return `True` if `key` matching item is found in cache. :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip :return: True if key is found """ shard = self._get_shard(archive_key) return archive_key in shard def iter_keys(self): for shard in self._shards: if shard.fs.exists(shard.storage_medium): for path, _dirs, _files in shard.fs.walk(shard.storage_medium): for key_file_path in _files: if key_file_path.endswith(shard.key_suffix): yield shard, key_file_path def get_lock(self, lock_key): return GenerationLock(lock_key, self._locking_url) def evict(self, policy=None, size_limit=None) -> dict: """ Remove old items based on the conditions explanation of this algo: iterate over each shard, then for each shard iterate over the .key files read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and access data, time creation, and access counts. Store that into a memory DB in order we can run different sorting strategies easily. Summing the size is a sum sql query. Then we run a sorting strategy based on eviction policy. We iterate over sorted keys, and remove each checking if we hit the overall limit. """ removal_info = { "removed_items": 0, "removed_size": 0 } policy = policy or self._eviction_policy size_limit = size_limit or self._cache_size_limit select_policy = EVICTION_POLICY[policy]['evict'] log.debug('Running eviction policy \'%s\', and checking for size limit: %s', policy, format_size(size_limit)) if select_policy is None: return removal_info db = self.get_stats_db() data = [] cnt = 1 for shard, key_file in self.iter_keys(): with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f: metadata = json.loads(f.read()) key_file_path = os.path.join(shard.storage_medium, key_file) archive_key = metadata['archive_key'] archive_path = metadata['archive_full_path'] size = metadata.get('size') if not size: # in case we don't have size re-calc it... size = self._get_size(shard, archive_path) data.append([ cnt, key_file, key_file_path, archive_key, archive_path, metadata.get('store_time', 0), metadata.get('access_time', 0), metadata.get('access_count', 0), size, ]) cnt += 1 # Insert bulk data using executemany db.bulk_insert(data) total_size = db.get_total_size() log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s', len(data), format_size(total_size), format_size(size_limit)) removed_items = 0 removed_size = 0 for key_file, archive_key, size in db.get_sorted_keys(select_policy): # simulate removal impact BEFORE removal total_size -= size if total_size <= size_limit: # we obtained what we wanted... break self.remove(archive_key) removed_items += 1 removed_size += size removal_info['removed_items'] = removed_items removal_info['removed_size'] = removed_size log.debug('Removed %s cache archives, and reduced size by: %s', removed_items, format_size(removed_size)) return removal_info def get_statistics(self): total_files = 0 total_size = 0 meta = {} for shard, key_file in self.iter_keys(): json_key = f"{shard.storage_medium}/{key_file}" with shard.fs.open(json_key, 'rb') as f: total_files += 1 metadata = json.loads(f.read()) total_size += metadata['size'] return total_files, total_size, meta