|
|
# Copyright (C) 2015-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import os
|
|
|
import functools
|
|
|
import logging
|
|
|
import typing
|
|
|
import time
|
|
|
import zlib
|
|
|
|
|
|
from ...ext_json import json
|
|
|
from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
|
|
|
from ..lock import GenerationLock
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class BaseShard:
|
|
|
storage_type: str = ''
|
|
|
fs = None
|
|
|
|
|
|
@classmethod
|
|
|
def hash(cls, key):
|
|
|
"""Compute portable hash for `key`.
|
|
|
|
|
|
:param key: key to hash
|
|
|
:return: hash value
|
|
|
|
|
|
"""
|
|
|
mask = 0xFFFFFFFF
|
|
|
return zlib.adler32(key.encode('utf-8')) & mask # noqa
|
|
|
|
|
|
def _write_file(self, full_path, read_iterator, mode):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def _get_keyfile(self, key):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def random_filename(self):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def store(self, *args, **kwargs):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def _store(self, key, value_reader, metadata, mode):
|
|
|
(filename, # hash-name
|
|
|
full_path # full-path/hash-name
|
|
|
) = self.random_filename()
|
|
|
|
|
|
key_file, key_file_path = self._get_keyfile(key)
|
|
|
|
|
|
# STORE METADATA
|
|
|
_metadata = {
|
|
|
"version": "v1",
|
|
|
|
|
|
"key_file": key_file, # this is the .key.json file storing meta
|
|
|
"key_file_path": key_file_path, # full path to key_file
|
|
|
"archive_key": key, # original name we stored archive under, e.g my-archive.zip
|
|
|
"archive_filename": filename, # the actual filename we stored that file under
|
|
|
"archive_full_path": full_path,
|
|
|
|
|
|
"store_time": time.time(),
|
|
|
"access_count": 0,
|
|
|
"access_time": 0,
|
|
|
|
|
|
"size": 0
|
|
|
}
|
|
|
if metadata:
|
|
|
_metadata.update(metadata)
|
|
|
|
|
|
read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
|
|
|
size, sha256 = self._write_file(full_path, read_iterator, mode)
|
|
|
_metadata['size'] = size
|
|
|
_metadata['sha256'] = sha256
|
|
|
|
|
|
# after archive is finished, we create a key to save the presence of the binary file
|
|
|
with self.fs.open(key_file_path, 'wb') as f:
|
|
|
f.write(json.dumps(_metadata))
|
|
|
|
|
|
return key, filename, size, _metadata
|
|
|
|
|
|
def fetch(self, *args, **kwargs):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def _fetch(self, key, retry, retry_attempts, retry_backoff,
|
|
|
presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
|
|
|
if retry is NOT_GIVEN:
|
|
|
retry = False
|
|
|
if retry_attempts is NOT_GIVEN:
|
|
|
retry_attempts = 0
|
|
|
|
|
|
if retry and retry_attempts > 0:
|
|
|
for attempt in range(1, retry_attempts + 1):
|
|
|
if key in self:
|
|
|
break
|
|
|
# we didn't find the key, wait retry_backoff N seconds, and re-check
|
|
|
time.sleep(retry_backoff)
|
|
|
|
|
|
if key not in self:
|
|
|
log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
|
|
|
raise KeyError(key)
|
|
|
|
|
|
key_file, key_file_path = self._get_keyfile(key)
|
|
|
with self.fs.open(key_file_path, 'rb') as f:
|
|
|
metadata = json.loads(f.read())
|
|
|
|
|
|
archive_path = metadata['archive_full_path']
|
|
|
if presigned_url_expires and presigned_url_expires > 0:
|
|
|
metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires)
|
|
|
|
|
|
try:
|
|
|
return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
|
|
|
finally:
|
|
|
# update usage stats, count and accessed
|
|
|
metadata["access_count"] = metadata.get("access_count", 0) + 1
|
|
|
metadata["access_time"] = time.time()
|
|
|
log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
|
|
|
key_file, metadata['access_count'], metadata['access_time'])
|
|
|
with self.fs.open(key_file_path, 'wb') as f:
|
|
|
f.write(json.dumps(metadata))
|
|
|
|
|
|
def remove(self, *args, **kwargs):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def _remove(self, key):
|
|
|
if key not in self:
|
|
|
log.exception(f'requested key={key} not found in {self}')
|
|
|
raise KeyError(key)
|
|
|
|
|
|
key_file, key_file_path = self._get_keyfile(key)
|
|
|
with self.fs.open(key_file_path, 'rb') as f:
|
|
|
metadata = json.loads(f.read())
|
|
|
|
|
|
archive_path = metadata['archive_full_path']
|
|
|
self.fs.rm(archive_path)
|
|
|
self.fs.rm(key_file_path)
|
|
|
return 1
|
|
|
|
|
|
@property
|
|
|
def storage_medium(self):
|
|
|
return getattr(self, self.storage_type)
|
|
|
|
|
|
@property
|
|
|
def key_suffix(self):
|
|
|
return 'key.json'
|
|
|
|
|
|
def __contains__(self, key):
|
|
|
"""Return `True` if `key` matching item is found in cache.
|
|
|
|
|
|
:param key: key matching item
|
|
|
:return: True if key matching item
|
|
|
|
|
|
"""
|
|
|
key_file, key_file_path = self._get_keyfile(key)
|
|
|
return self.fs.exists(key_file_path)
|
|
|
|
|
|
|
|
|
class BaseCache:
|
|
|
_locking_url: str = ''
|
|
|
_storage_path: str = ''
|
|
|
_config: dict = {}
|
|
|
retry = False
|
|
|
retry_attempts: int = 0
|
|
|
retry_backoff: int | float = 1
|
|
|
_shards = tuple()
|
|
|
shard_cls = BaseShard
|
|
|
# define the presigned url expiration, 0 == disabled
|
|
|
presigned_url_expires: int = 0
|
|
|
|
|
|
def __contains__(self, key):
|
|
|
"""Return `True` if `key` matching item is found in cache.
|
|
|
|
|
|
:param key: key matching item
|
|
|
:return: True if key matching item
|
|
|
|
|
|
"""
|
|
|
return self.has_key(key)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return f'<{self.__class__.__name__}(storage={self._storage_path})>'
|
|
|
|
|
|
@classmethod
|
|
|
def gb_to_bytes(cls, gb):
|
|
|
return gb * (1024 ** 3)
|
|
|
|
|
|
@property
|
|
|
def storage_path(self):
|
|
|
return self._storage_path
|
|
|
|
|
|
@classmethod
|
|
|
def get_stats_db(cls):
|
|
|
return StatsDB()
|
|
|
|
|
|
def get_conf(self, key, pop=False):
|
|
|
if key not in self._config:
|
|
|
raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
|
|
|
val = self._config[key]
|
|
|
if pop:
|
|
|
del self._config[key]
|
|
|
return val
|
|
|
|
|
|
def _get_shard(self, key) -> shard_cls:
|
|
|
index = self._hash(key) % self._shard_count
|
|
|
shard = self._shards[index]
|
|
|
return shard
|
|
|
|
|
|
def _get_size(self, shard, archive_path):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def store(self, key, value_reader, metadata=None):
|
|
|
shard = self._get_shard(key)
|
|
|
return shard.store(key, value_reader, metadata)
|
|
|
|
|
|
def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
|
|
|
"""
|
|
|
Return file handle corresponding to `key` from specific shard cache.
|
|
|
"""
|
|
|
if retry is NOT_GIVEN:
|
|
|
retry = self.retry
|
|
|
if retry_attempts is NOT_GIVEN:
|
|
|
retry_attempts = self.retry_attempts
|
|
|
retry_backoff = self.retry_backoff
|
|
|
presigned_url_expires = self.presigned_url_expires
|
|
|
|
|
|
shard = self._get_shard(key)
|
|
|
return shard.fetch(key, retry=retry,
|
|
|
retry_attempts=retry_attempts,
|
|
|
retry_backoff=retry_backoff,
|
|
|
presigned_url_expires=presigned_url_expires)
|
|
|
|
|
|
def remove(self, key):
|
|
|
shard = self._get_shard(key)
|
|
|
return shard.remove(key)
|
|
|
|
|
|
def has_key(self, archive_key):
|
|
|
"""Return `True` if `key` matching item is found in cache.
|
|
|
|
|
|
:param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
|
|
|
:return: True if key is found
|
|
|
|
|
|
"""
|
|
|
shard = self._get_shard(archive_key)
|
|
|
return archive_key in shard
|
|
|
|
|
|
def iter_keys(self):
|
|
|
for shard in self._shards:
|
|
|
if shard.fs.exists(shard.storage_medium):
|
|
|
for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
|
|
|
for key_file_path in _files:
|
|
|
if key_file_path.endswith(shard.key_suffix):
|
|
|
yield shard, key_file_path
|
|
|
|
|
|
def get_lock(self, lock_key):
|
|
|
return GenerationLock(lock_key, self._locking_url)
|
|
|
|
|
|
def evict(self, policy=None, size_limit=None) -> dict:
|
|
|
"""
|
|
|
Remove old items based on the conditions
|
|
|
|
|
|
|
|
|
explanation of this algo:
|
|
|
iterate over each shard, then for each shard iterate over the .key files
|
|
|
read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
|
|
|
access data, time creation, and access counts.
|
|
|
|
|
|
Store that into a memory DB in order we can run different sorting strategies easily.
|
|
|
Summing the size is a sum sql query.
|
|
|
|
|
|
Then we run a sorting strategy based on eviction policy.
|
|
|
We iterate over sorted keys, and remove each checking if we hit the overall limit.
|
|
|
"""
|
|
|
removal_info = {
|
|
|
"removed_items": 0,
|
|
|
"removed_size": 0
|
|
|
}
|
|
|
policy = policy or self._eviction_policy
|
|
|
size_limit = size_limit or self._cache_size_limit
|
|
|
|
|
|
select_policy = EVICTION_POLICY[policy]['evict']
|
|
|
|
|
|
log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
|
|
|
policy, format_size(size_limit))
|
|
|
|
|
|
if select_policy is None:
|
|
|
return removal_info
|
|
|
|
|
|
db = self.get_stats_db()
|
|
|
|
|
|
data = []
|
|
|
cnt = 1
|
|
|
|
|
|
for shard, key_file in self.iter_keys():
|
|
|
with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
|
|
|
metadata = json.loads(f.read())
|
|
|
|
|
|
key_file_path = os.path.join(shard.storage_medium, key_file)
|
|
|
|
|
|
archive_key = metadata['archive_key']
|
|
|
archive_path = metadata['archive_full_path']
|
|
|
|
|
|
size = metadata.get('size')
|
|
|
if not size:
|
|
|
# in case we don't have size re-calc it...
|
|
|
size = self._get_size(shard, archive_path)
|
|
|
|
|
|
data.append([
|
|
|
cnt,
|
|
|
key_file,
|
|
|
key_file_path,
|
|
|
archive_key,
|
|
|
archive_path,
|
|
|
metadata.get('store_time', 0),
|
|
|
metadata.get('access_time', 0),
|
|
|
metadata.get('access_count', 0),
|
|
|
size,
|
|
|
])
|
|
|
cnt += 1
|
|
|
|
|
|
# Insert bulk data using executemany
|
|
|
db.bulk_insert(data)
|
|
|
|
|
|
total_size = db.get_total_size()
|
|
|
log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
|
|
|
len(data), format_size(total_size), format_size(size_limit))
|
|
|
|
|
|
removed_items = 0
|
|
|
removed_size = 0
|
|
|
for key_file, archive_key, size in db.get_sorted_keys(select_policy):
|
|
|
# simulate removal impact BEFORE removal
|
|
|
total_size -= size
|
|
|
|
|
|
if total_size <= size_limit:
|
|
|
# we obtained what we wanted...
|
|
|
break
|
|
|
|
|
|
self.remove(archive_key)
|
|
|
removed_items += 1
|
|
|
removed_size += size
|
|
|
removal_info['removed_items'] = removed_items
|
|
|
removal_info['removed_size'] = removed_size
|
|
|
log.debug('Removed %s cache archives, and reduced size by: %s',
|
|
|
removed_items, format_size(removed_size))
|
|
|
return removal_info
|
|
|
|
|
|
def get_statistics(self):
|
|
|
total_files = 0
|
|
|
total_size = 0
|
|
|
meta = {}
|
|
|
|
|
|
for shard, key_file in self.iter_keys():
|
|
|
json_key = f"{shard.storage_medium}/{key_file}"
|
|
|
with shard.fs.open(json_key, 'rb') as f:
|
|
|
total_files += 1
|
|
|
metadata = json.loads(f.read())
|
|
|
total_size += metadata['size']
|
|
|
|
|
|
return total_files, total_size, meta
|
|
|
|