# Copyright (C) 2015-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import sqlite3 import s3fs.core NOT_GIVEN = -917 EVICTION_POLICY = { 'none': { 'evict': None, }, 'least-recently-stored': { 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time', }, 'least-recently-used': { 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time', }, 'least-frequently-used': { 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count', }, } def archive_iterator(_reader, block_size: int = 4096 * 512): # 4096 * 64 = 64KB while 1: data = _reader.read(block_size) if not data: break yield data def format_size(size): # Convert size in bytes to a human-readable format (e.g., KB, MB, GB) for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 class StatsDB: def __init__(self): self.connection = sqlite3.connect(':memory:') self._init_db() def _init_db(self): qry = ''' CREATE TABLE IF NOT EXISTS archive_cache ( rowid INTEGER PRIMARY KEY, key_file TEXT, key_file_path TEXT, archive_key TEXT, archive_path TEXT, store_time REAL, access_time REAL, access_count INTEGER DEFAULT 0, size INTEGER DEFAULT 0 ) ''' self.sql(qry) self.connection.commit() @property def sql(self): return self.connection.execute def bulk_insert(self, rows): qry = ''' INSERT INTO archive_cache ( rowid, key_file, key_file_path, archive_key, archive_path, store_time, access_time, access_count, size ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ? ) ''' cursor = self.connection.cursor() cursor.executemany(qry, rows) self.connection.commit() def get_total_size(self): qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache' ((total_size,),) = self.sql(qry).fetchall() return total_size def get_sorted_keys(self, select_policy): select_policy_qry = select_policy.format(fields='key_file, archive_key, size') return self.sql(select_policy_qry).fetchall() class ShardFileReader: def __init__(self, file_like_reader): self._file_like_reader = file_like_reader def __getattr__(self, item): if isinstance(self._file_like_reader, s3fs.core.S3File): match item: case 'name': # S3 FileWrapper doesn't support name attribute, and we use it return self._file_like_reader.full_name case _: return getattr(self._file_like_reader, item) else: return getattr(self._file_like_reader, item) def __repr__(self): return f'<{self.__class__.__name__}={self._file_like_reader}>'