##// END OF EJS Templates
fixed import
fixed import

File last commit:

r5450:6c0bdadc default
r5659:822bcfab default
Show More
base.py
372 lines | 12.3 KiB | text/x-python | PythonLexer
# Copyright (C) 2015-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import functools
import logging
import typing
import time
import zlib
from ...ext_json import json
from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
from ..lock import GenerationLock
log = logging.getLogger(__name__)
class BaseShard:
storage_type: str = ''
fs = None
@classmethod
def hash(cls, key):
"""Compute portable hash for `key`.
:param key: key to hash
:return: hash value
"""
mask = 0xFFFFFFFF
return zlib.adler32(key.encode('utf-8')) & mask # noqa
def _write_file(self, full_path, read_iterator, mode):
raise NotImplementedError
def _get_keyfile(self, key):
raise NotImplementedError
def random_filename(self):
raise NotImplementedError
def store(self, *args, **kwargs):
raise NotImplementedError
def _store(self, key, value_reader, metadata, mode):
(filename, # hash-name
full_path # full-path/hash-name
) = self.random_filename()
key_file, key_file_path = self._get_keyfile(key)
# STORE METADATA
_metadata = {
"version": "v1",
"key_file": key_file, # this is the .key.json file storing meta
"key_file_path": key_file_path, # full path to key_file
"archive_key": key, # original name we stored archive under, e.g my-archive.zip
"archive_filename": filename, # the actual filename we stored that file under
"archive_full_path": full_path,
"store_time": time.time(),
"access_count": 0,
"access_time": 0,
"size": 0
}
if metadata:
_metadata.update(metadata)
read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
size, sha256 = self._write_file(full_path, read_iterator, mode)
_metadata['size'] = size
_metadata['sha256'] = sha256
# after archive is finished, we create a key to save the presence of the binary file
with self.fs.open(key_file_path, 'wb') as f:
f.write(json.dumps(_metadata))
return key, filename, size, _metadata
def fetch(self, *args, **kwargs):
raise NotImplementedError
def _fetch(self, key, retry, retry_attempts, retry_backoff,
presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
if retry is NOT_GIVEN:
retry = False
if retry_attempts is NOT_GIVEN:
retry_attempts = 0
if retry and retry_attempts > 0:
for attempt in range(1, retry_attempts + 1):
if key in self:
break
# we didn't find the key, wait retry_backoff N seconds, and re-check
time.sleep(retry_backoff)
if key not in self:
log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
raise KeyError(key)
key_file, key_file_path = self._get_keyfile(key)
with self.fs.open(key_file_path, 'rb') as f:
metadata = json.loads(f.read())
archive_path = metadata['archive_full_path']
if presigned_url_expires and presigned_url_expires > 0:
metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires)
try:
return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
finally:
# update usage stats, count and accessed
metadata["access_count"] = metadata.get("access_count", 0) + 1
metadata["access_time"] = time.time()
log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
key_file, metadata['access_count'], metadata['access_time'])
with self.fs.open(key_file_path, 'wb') as f:
f.write(json.dumps(metadata))
def remove(self, *args, **kwargs):
raise NotImplementedError
def _remove(self, key):
if key not in self:
log.exception(f'requested key={key} not found in {self}')
raise KeyError(key)
key_file, key_file_path = self._get_keyfile(key)
with self.fs.open(key_file_path, 'rb') as f:
metadata = json.loads(f.read())
archive_path = metadata['archive_full_path']
self.fs.rm(archive_path)
self.fs.rm(key_file_path)
return 1
@property
def storage_medium(self):
return getattr(self, self.storage_type)
@property
def key_suffix(self):
return 'key.json'
def __contains__(self, key):
"""Return `True` if `key` matching item is found in cache.
:param key: key matching item
:return: True if key matching item
"""
key_file, key_file_path = self._get_keyfile(key)
return self.fs.exists(key_file_path)
class BaseCache:
_locking_url: str = ''
_storage_path: str = ''
_config: dict = {}
retry = False
retry_attempts: int = 0
retry_backoff: int | float = 1
_shards = tuple()
shard_cls = BaseShard
# define the presigned url expiration, 0 == disabled
presigned_url_expires: int = 0
def __contains__(self, key):
"""Return `True` if `key` matching item is found in cache.
:param key: key matching item
:return: True if key matching item
"""
return self.has_key(key)
def __repr__(self):
return f'<{self.__class__.__name__}(storage={self._storage_path})>'
@classmethod
def gb_to_bytes(cls, gb):
return gb * (1024 ** 3)
@property
def storage_path(self):
return self._storage_path
@classmethod
def get_stats_db(cls):
return StatsDB()
def get_conf(self, key, pop=False):
if key not in self._config:
raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
val = self._config[key]
if pop:
del self._config[key]
return val
def _get_shard(self, key) -> shard_cls:
index = self._hash(key) % self._shard_count
shard = self._shards[index]
return shard
def _get_size(self, shard, archive_path):
raise NotImplementedError
def store(self, key, value_reader, metadata=None):
shard = self._get_shard(key)
return shard.store(key, value_reader, metadata)
def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
"""
Return file handle corresponding to `key` from specific shard cache.
"""
if retry is NOT_GIVEN:
retry = self.retry
if retry_attempts is NOT_GIVEN:
retry_attempts = self.retry_attempts
retry_backoff = self.retry_backoff
presigned_url_expires = self.presigned_url_expires
shard = self._get_shard(key)
return shard.fetch(key, retry=retry,
retry_attempts=retry_attempts,
retry_backoff=retry_backoff,
presigned_url_expires=presigned_url_expires)
def remove(self, key):
shard = self._get_shard(key)
return shard.remove(key)
def has_key(self, archive_key):
"""Return `True` if `key` matching item is found in cache.
:param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
:return: True if key is found
"""
shard = self._get_shard(archive_key)
return archive_key in shard
def iter_keys(self):
for shard in self._shards:
if shard.fs.exists(shard.storage_medium):
for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
for key_file_path in _files:
if key_file_path.endswith(shard.key_suffix):
yield shard, key_file_path
def get_lock(self, lock_key):
return GenerationLock(lock_key, self._locking_url)
def evict(self, policy=None, size_limit=None) -> dict:
"""
Remove old items based on the conditions
explanation of this algo:
iterate over each shard, then for each shard iterate over the .key files
read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
access data, time creation, and access counts.
Store that into a memory DB in order we can run different sorting strategies easily.
Summing the size is a sum sql query.
Then we run a sorting strategy based on eviction policy.
We iterate over sorted keys, and remove each checking if we hit the overall limit.
"""
removal_info = {
"removed_items": 0,
"removed_size": 0
}
policy = policy or self._eviction_policy
size_limit = size_limit or self._cache_size_limit
select_policy = EVICTION_POLICY[policy]['evict']
log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
policy, format_size(size_limit))
if select_policy is None:
return removal_info
db = self.get_stats_db()
data = []
cnt = 1
for shard, key_file in self.iter_keys():
with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
metadata = json.loads(f.read())
key_file_path = os.path.join(shard.storage_medium, key_file)
archive_key = metadata['archive_key']
archive_path = metadata['archive_full_path']
size = metadata.get('size')
if not size:
# in case we don't have size re-calc it...
size = self._get_size(shard, archive_path)
data.append([
cnt,
key_file,
key_file_path,
archive_key,
archive_path,
metadata.get('store_time', 0),
metadata.get('access_time', 0),
metadata.get('access_count', 0),
size,
])
cnt += 1
# Insert bulk data using executemany
db.bulk_insert(data)
total_size = db.get_total_size()
log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
len(data), format_size(total_size), format_size(size_limit))
removed_items = 0
removed_size = 0
for key_file, archive_key, size in db.get_sorted_keys(select_policy):
# simulate removal impact BEFORE removal
total_size -= size
if total_size <= size_limit:
# we obtained what we wanted...
break
self.remove(archive_key)
removed_items += 1
removed_size += size
removal_info['removed_items'] = removed_items
removal_info['removed_size'] = removed_size
log.debug('Removed %s cache archives, and reduced size by: %s',
removed_items, format_size(removed_size))
return removal_info
def get_statistics(self):
total_files = 0
total_size = 0
meta = {}
for shard, key_file in self.iter_keys():
json_key = f"{shard.storage_medium}/{key_file}"
with shard.fs.open(json_key, 'rb') as f:
total_files += 1
metadata = json.loads(f.read())
total_size += metadata['size']
return total_files, total_size, meta