base.py
348 lines
| 11.2 KiB
| text/x-python
|
PythonLexer
r1251 | # Copyright (C) 2015-2024 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import functools | ||||
import logging | ||||
import typing | ||||
import time | ||||
import zlib | ||||
from ...ext_json import json | ||||
from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size | ||||
from ..lock import GenerationLock | ||||
log = logging.getLogger(__name__) | ||||
class BaseShard: | ||||
storage_type: str = '' | ||||
fs = None | ||||
@classmethod | ||||
def hash(cls, key): | ||||
"""Compute portable hash for `key`. | ||||
:param key: key to hash | ||||
:return: hash value | ||||
""" | ||||
mask = 0xFFFFFFFF | ||||
return zlib.adler32(key.encode('utf-8')) & mask # noqa | ||||
def _write_file(self, full_path, read_iterator, mode): | ||||
raise NotImplementedError | ||||
def _get_keyfile(self, key): | ||||
raise NotImplementedError | ||||
def random_filename(self): | ||||
raise NotImplementedError | ||||
def _store(self, key, value_reader, metadata, mode): | ||||
(filename, # hash-name | ||||
full_path # full-path/hash-name | ||||
) = self.random_filename() | ||||
key_file, key_file_path = self._get_keyfile(key) | ||||
# STORE METADATA | ||||
_metadata = { | ||||
"version": "v1", | ||||
"key_file": key_file, # this is the .key.json file storing meta | ||||
"key_file_path": key_file_path, # full path to key_file | ||||
"archive_key": key, # original name we stored archive under, e.g my-archive.zip | ||||
"archive_filename": filename, # the actual filename we stored that file under | ||||
"archive_full_path": full_path, | ||||
"store_time": time.time(), | ||||
"access_count": 0, | ||||
"access_time": 0, | ||||
"size": 0 | ||||
} | ||||
if metadata: | ||||
_metadata.update(metadata) | ||||
read_iterator = iter(functools.partial(value_reader.read, 2**22), b'') | ||||
size, sha256 = self._write_file(full_path, read_iterator, mode) | ||||
_metadata['size'] = size | ||||
_metadata['sha256'] = sha256 | ||||
# after archive is finished, we create a key to save the presence of the binary file | ||||
with self.fs.open(key_file_path, 'wb') as f: | ||||
f.write(json.dumps(_metadata)) | ||||
return key, filename, size, _metadata | ||||
def _fetch(self, key, retry, retry_attempts, retry_backoff): | ||||
if retry is NOT_GIVEN: | ||||
retry = False | ||||
if retry_attempts is NOT_GIVEN: | ||||
retry_attempts = 0 | ||||
if retry and retry_attempts > 0: | ||||
for attempt in range(1, retry_attempts + 1): | ||||
if key in self: | ||||
break | ||||
# we didn't find the key, wait retry_backoff N seconds, and re-check | ||||
time.sleep(retry_backoff) | ||||
if key not in self: | ||||
log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}') | ||||
raise KeyError(key) | ||||
key_file, key_file_path = self._get_keyfile(key) | ||||
with self.fs.open(key_file_path, 'rb') as f: | ||||
metadata = json.loads(f.read()) | ||||
archive_path = metadata['archive_full_path'] | ||||
try: | ||||
return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata | ||||
finally: | ||||
# update usage stats, count and accessed | ||||
metadata["access_count"] = metadata.get("access_count", 0) + 1 | ||||
metadata["access_time"] = time.time() | ||||
log.debug('Updated %s with access snapshot, access_count=%s access_time=%s', | ||||
key_file, metadata['access_count'], metadata['access_time']) | ||||
with self.fs.open(key_file_path, 'wb') as f: | ||||
f.write(json.dumps(metadata)) | ||||
def _remove(self, key): | ||||
if key not in self: | ||||
log.exception(f'requested key={key} not found in {self}') | ||||
raise KeyError(key) | ||||
key_file, key_file_path = self._get_keyfile(key) | ||||
with self.fs.open(key_file_path, 'rb') as f: | ||||
metadata = json.loads(f.read()) | ||||
archive_path = metadata['archive_full_path'] | ||||
self.fs.rm(archive_path) | ||||
self.fs.rm(key_file_path) | ||||
return 1 | ||||
@property | ||||
def storage_medium(self): | ||||
return getattr(self, self.storage_type) | ||||
@property | ||||
def key_suffix(self): | ||||
return 'key.json' | ||||
def __contains__(self, key): | ||||
"""Return `True` if `key` matching item is found in cache. | ||||
:param key: key matching item | ||||
:return: True if key matching item | ||||
""" | ||||
key_file, key_file_path = self._get_keyfile(key) | ||||
return self.fs.exists(key_file_path) | ||||
class BaseCache: | ||||
_locking_url: str = '' | ||||
_storage_path: str = '' | ||||
_config = {} | ||||
retry = False | ||||
retry_attempts = 0 | ||||
retry_backoff = 1 | ||||
_shards = tuple() | ||||
def __contains__(self, key): | ||||
"""Return `True` if `key` matching item is found in cache. | ||||
:param key: key matching item | ||||
:return: True if key matching item | ||||
""" | ||||
return self.has_key(key) | ||||
def __repr__(self): | ||||
return f'<{self.__class__.__name__}(storage={self._storage_path})>' | ||||
@classmethod | ||||
def gb_to_bytes(cls, gb): | ||||
return gb * (1024 ** 3) | ||||
@property | ||||
def storage_path(self): | ||||
return self._storage_path | ||||
@classmethod | ||||
def get_stats_db(cls): | ||||
return StatsDB() | ||||
def get_conf(self, key, pop=False): | ||||
if key not in self._config: | ||||
raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config") | ||||
val = self._config[key] | ||||
if pop: | ||||
del self._config[key] | ||||
return val | ||||
def _get_shard(self, key): | ||||
raise NotImplementedError | ||||
def _get_size(self, shard, archive_path): | ||||
raise NotImplementedError | ||||
def store(self, key, value_reader, metadata=None): | ||||
shard = self._get_shard(key) | ||||
return shard.store(key, value_reader, metadata) | ||||
def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]: | ||||
""" | ||||
Return file handle corresponding to `key` from specific shard cache. | ||||
""" | ||||
if retry is NOT_GIVEN: | ||||
retry = self.retry | ||||
if retry_attempts is NOT_GIVEN: | ||||
retry_attempts = self.retry_attempts | ||||
retry_backoff = self.retry_backoff | ||||
shard = self._get_shard(key) | ||||
return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff) | ||||
def remove(self, key): | ||||
shard = self._get_shard(key) | ||||
return shard.remove(key) | ||||
def has_key(self, archive_key): | ||||
"""Return `True` if `key` matching item is found in cache. | ||||
:param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip | ||||
:return: True if key is found | ||||
""" | ||||
shard = self._get_shard(archive_key) | ||||
return archive_key in shard | ||||
def iter_keys(self): | ||||
for shard in self._shards: | ||||
if shard.fs.exists(shard.storage_medium): | ||||
for path, _dirs, _files in shard.fs.walk(shard.storage_medium): | ||||
for key_file_path in _files: | ||||
if key_file_path.endswith(shard.key_suffix): | ||||
yield shard, key_file_path | ||||
def get_lock(self, lock_key): | ||||
return GenerationLock(lock_key, self._locking_url) | ||||
def evict(self, policy=None, size_limit=None) -> int: | ||||
""" | ||||
Remove old items based on the conditions | ||||
explanation of this algo: | ||||
iterate over each shard, then for each shard iterate over the .key files | ||||
read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and | ||||
access data, time creation, and access counts. | ||||
Store that into a memory DB so we can run different sorting strategies easily. | ||||
Summing the size is a sum sql query. | ||||
Then we run a sorting strategy based on eviction policy. | ||||
We iterate over sorted keys, and remove each checking if we hit the overall limit. | ||||
""" | ||||
policy = policy or self._eviction_policy | ||||
size_limit = size_limit or self._cache_size_limit | ||||
select_policy = EVICTION_POLICY[policy]['evict'] | ||||
log.debug('Running eviction policy \'%s\', and checking for size limit: %s', | ||||
policy, format_size(size_limit)) | ||||
if select_policy is None: | ||||
return 0 | ||||
db = self.get_stats_db() | ||||
data = [] | ||||
cnt = 1 | ||||
for shard, key_file in self.iter_keys(): | ||||
with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f: | ||||
metadata = json.loads(f.read()) | ||||
key_file_path = os.path.join(shard.storage_medium, key_file) | ||||
archive_key = metadata['archive_key'] | ||||
archive_path = metadata['archive_full_path'] | ||||
size = metadata.get('size') | ||||
if not size: | ||||
# in case we don't have size re-calc it... | ||||
size = self._get_size(shard, archive_path) | ||||
data.append([ | ||||
cnt, | ||||
key_file, | ||||
key_file_path, | ||||
archive_key, | ||||
archive_path, | ||||
metadata.get('store_time', 0), | ||||
metadata.get('access_time', 0), | ||||
metadata.get('access_count', 0), | ||||
size, | ||||
]) | ||||
cnt += 1 | ||||
# Insert bulk data using executemany | ||||
db.bulk_insert(data) | ||||
total_size = db.get_total_size() | ||||
log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s', | ||||
len(data), format_size(total_size), format_size(size_limit)) | ||||
removed_items = 0 | ||||
removed_size = 0 | ||||
for key_file, archive_key, size in db.get_sorted_keys(select_policy): | ||||
# simulate removal impact BEFORE removal | ||||
total_size -= size | ||||
if total_size <= size_limit: | ||||
# we obtained what we wanted... | ||||
break | ||||
self.remove(archive_key) | ||||
removed_items += 1 | ||||
removed_size += size | ||||
log.debug('Removed %s cache archives, and reduced size by: %s', | ||||
removed_items, format_size(removed_size)) | ||||
return removed_items | ||||
def get_statistics(self): | ||||
total_files = 0 | ||||
total_size = 0 | ||||
meta = {} | ||||
for shard, key_file in self.iter_keys(): | ||||
json_key = f"{shard.storage_medium}/{key_file}" | ||||
with shard.fs.open(json_key, 'rb') as f: | ||||
total_files += 1 | ||||
metadata = json.loads(f.read()) | ||||
total_size += metadata['size'] | ||||
return total_files, total_size, meta | ||||