##// END OF EJS Templates
archive-cache: synced with ce
super-admin -
r1252:acea161c default
parent child Browse files
Show More
@@ -1,348 +1,352 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import os
19 import os
20 import functools
20 import functools
21 import logging
21 import logging
22 import typing
22 import typing
23 import time
23 import time
24 import zlib
24 import zlib
25
25
26 from ...ext_json import json
26 from ...ext_json import json
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 from ..lock import GenerationLock
28 from ..lock import GenerationLock
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class BaseShard:
33 class BaseShard:
34 storage_type: str = ''
34 storage_type: str = ''
35 fs = None
35 fs = None
36
36
37 @classmethod
37 @classmethod
38 def hash(cls, key):
38 def hash(cls, key):
39 """Compute portable hash for `key`.
39 """Compute portable hash for `key`.
40
40
41 :param key: key to hash
41 :param key: key to hash
42 :return: hash value
42 :return: hash value
43
43
44 """
44 """
45 mask = 0xFFFFFFFF
45 mask = 0xFFFFFFFF
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47
47
48 def _write_file(self, full_path, read_iterator, mode):
48 def _write_file(self, full_path, read_iterator, mode):
49 raise NotImplementedError
49 raise NotImplementedError
50
50
51 def _get_keyfile(self, key):
51 def _get_keyfile(self, key):
52 raise NotImplementedError
52 raise NotImplementedError
53
53
54 def random_filename(self):
54 def random_filename(self):
55 raise NotImplementedError
55 raise NotImplementedError
56
56
57 def _store(self, key, value_reader, metadata, mode):
57 def _store(self, key, value_reader, metadata, mode):
58 (filename, # hash-name
58 (filename, # hash-name
59 full_path # full-path/hash-name
59 full_path # full-path/hash-name
60 ) = self.random_filename()
60 ) = self.random_filename()
61
61
62 key_file, key_file_path = self._get_keyfile(key)
62 key_file, key_file_path = self._get_keyfile(key)
63
63
64 # STORE METADATA
64 # STORE METADATA
65 _metadata = {
65 _metadata = {
66 "version": "v1",
66 "version": "v1",
67
67
68 "key_file": key_file, # this is the .key.json file storing meta
68 "key_file": key_file, # this is the .key.json file storing meta
69 "key_file_path": key_file_path, # full path to key_file
69 "key_file_path": key_file_path, # full path to key_file
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 "archive_filename": filename, # the actual filename we stored that file under
71 "archive_filename": filename, # the actual filename we stored that file under
72 "archive_full_path": full_path,
72 "archive_full_path": full_path,
73
73
74 "store_time": time.time(),
74 "store_time": time.time(),
75 "access_count": 0,
75 "access_count": 0,
76 "access_time": 0,
76 "access_time": 0,
77
77
78 "size": 0
78 "size": 0
79 }
79 }
80 if metadata:
80 if metadata:
81 _metadata.update(metadata)
81 _metadata.update(metadata)
82
82
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 _metadata['size'] = size
85 _metadata['size'] = size
86 _metadata['sha256'] = sha256
86 _metadata['sha256'] = sha256
87
87
88 # after archive is finished, we create a key to save the presence of the binary file
88 # after archive is finished, we create a key to save the presence of the binary file
89 with self.fs.open(key_file_path, 'wb') as f:
89 with self.fs.open(key_file_path, 'wb') as f:
90 f.write(json.dumps(_metadata))
90 f.write(json.dumps(_metadata))
91
91
92 return key, filename, size, _metadata
92 return key, filename, size, _metadata
93
93
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
95 if retry is NOT_GIVEN:
95 if retry is NOT_GIVEN:
96 retry = False
96 retry = False
97 if retry_attempts is NOT_GIVEN:
97 if retry_attempts is NOT_GIVEN:
98 retry_attempts = 0
98 retry_attempts = 0
99
99
100 if retry and retry_attempts > 0:
100 if retry and retry_attempts > 0:
101 for attempt in range(1, retry_attempts + 1):
101 for attempt in range(1, retry_attempts + 1):
102 if key in self:
102 if key in self:
103 break
103 break
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 time.sleep(retry_backoff)
105 time.sleep(retry_backoff)
106
106
107 if key not in self:
107 if key not in self:
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 raise KeyError(key)
109 raise KeyError(key)
110
110
111 key_file, key_file_path = self._get_keyfile(key)
111 key_file, key_file_path = self._get_keyfile(key)
112 with self.fs.open(key_file_path, 'rb') as f:
112 with self.fs.open(key_file_path, 'rb') as f:
113 metadata = json.loads(f.read())
113 metadata = json.loads(f.read())
114
114
115 archive_path = metadata['archive_full_path']
115 archive_path = metadata['archive_full_path']
116
116
117 try:
117 try:
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 finally:
119 finally:
120 # update usage stats, count and accessed
120 # update usage stats, count and accessed
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 metadata["access_time"] = time.time()
122 metadata["access_time"] = time.time()
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 key_file, metadata['access_count'], metadata['access_time'])
124 key_file, metadata['access_count'], metadata['access_time'])
125 with self.fs.open(key_file_path, 'wb') as f:
125 with self.fs.open(key_file_path, 'wb') as f:
126 f.write(json.dumps(metadata))
126 f.write(json.dumps(metadata))
127
127
128 def _remove(self, key):
128 def _remove(self, key):
129 if key not in self:
129 if key not in self:
130 log.exception(f'requested key={key} not found in {self}')
130 log.exception(f'requested key={key} not found in {self}')
131 raise KeyError(key)
131 raise KeyError(key)
132
132
133 key_file, key_file_path = self._get_keyfile(key)
133 key_file, key_file_path = self._get_keyfile(key)
134 with self.fs.open(key_file_path, 'rb') as f:
134 with self.fs.open(key_file_path, 'rb') as f:
135 metadata = json.loads(f.read())
135 metadata = json.loads(f.read())
136
136
137 archive_path = metadata['archive_full_path']
137 archive_path = metadata['archive_full_path']
138 self.fs.rm(archive_path)
138 self.fs.rm(archive_path)
139 self.fs.rm(key_file_path)
139 self.fs.rm(key_file_path)
140 return 1
140 return 1
141
141
142 @property
142 @property
143 def storage_medium(self):
143 def storage_medium(self):
144 return getattr(self, self.storage_type)
144 return getattr(self, self.storage_type)
145
145
146 @property
146 @property
147 def key_suffix(self):
147 def key_suffix(self):
148 return 'key.json'
148 return 'key.json'
149
149
150 def __contains__(self, key):
150 def __contains__(self, key):
151 """Return `True` if `key` matching item is found in cache.
151 """Return `True` if `key` matching item is found in cache.
152
152
153 :param key: key matching item
153 :param key: key matching item
154 :return: True if key matching item
154 :return: True if key matching item
155
155
156 """
156 """
157 key_file, key_file_path = self._get_keyfile(key)
157 key_file, key_file_path = self._get_keyfile(key)
158 return self.fs.exists(key_file_path)
158 return self.fs.exists(key_file_path)
159
159
160
160
161 class BaseCache:
161 class BaseCache:
162 _locking_url: str = ''
162 _locking_url: str = ''
163 _storage_path: str = ''
163 _storage_path: str = ''
164 _config = {}
164 _config = {}
165 retry = False
165 retry = False
166 retry_attempts = 0
166 retry_attempts = 0
167 retry_backoff = 1
167 retry_backoff = 1
168 _shards = tuple()
168 _shards = tuple()
169
169
170 def __contains__(self, key):
170 def __contains__(self, key):
171 """Return `True` if `key` matching item is found in cache.
171 """Return `True` if `key` matching item is found in cache.
172
172
173 :param key: key matching item
173 :param key: key matching item
174 :return: True if key matching item
174 :return: True if key matching item
175
175
176 """
176 """
177 return self.has_key(key)
177 return self.has_key(key)
178
178
179 def __repr__(self):
179 def __repr__(self):
180 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
180 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181
181
182 @classmethod
182 @classmethod
183 def gb_to_bytes(cls, gb):
183 def gb_to_bytes(cls, gb):
184 return gb * (1024 ** 3)
184 return gb * (1024 ** 3)
185
185
186 @property
186 @property
187 def storage_path(self):
187 def storage_path(self):
188 return self._storage_path
188 return self._storage_path
189
189
190 @classmethod
190 @classmethod
191 def get_stats_db(cls):
191 def get_stats_db(cls):
192 return StatsDB()
192 return StatsDB()
193
193
194 def get_conf(self, key, pop=False):
194 def get_conf(self, key, pop=False):
195 if key not in self._config:
195 if key not in self._config:
196 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
196 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 val = self._config[key]
197 val = self._config[key]
198 if pop:
198 if pop:
199 del self._config[key]
199 del self._config[key]
200 return val
200 return val
201
201
202 def _get_shard(self, key):
202 def _get_shard(self, key):
203 raise NotImplementedError
203 raise NotImplementedError
204
204
205 def _get_size(self, shard, archive_path):
205 def _get_size(self, shard, archive_path):
206 raise NotImplementedError
206 raise NotImplementedError
207
207
208 def store(self, key, value_reader, metadata=None):
208 def store(self, key, value_reader, metadata=None):
209 shard = self._get_shard(key)
209 shard = self._get_shard(key)
210 return shard.store(key, value_reader, metadata)
210 return shard.store(key, value_reader, metadata)
211
211
212 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
212 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
213 """
213 """
214 Return file handle corresponding to `key` from specific shard cache.
214 Return file handle corresponding to `key` from specific shard cache.
215 """
215 """
216 if retry is NOT_GIVEN:
216 if retry is NOT_GIVEN:
217 retry = self.retry
217 retry = self.retry
218 if retry_attempts is NOT_GIVEN:
218 if retry_attempts is NOT_GIVEN:
219 retry_attempts = self.retry_attempts
219 retry_attempts = self.retry_attempts
220 retry_backoff = self.retry_backoff
220 retry_backoff = self.retry_backoff
221
221
222 shard = self._get_shard(key)
222 shard = self._get_shard(key)
223 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
223 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
224
224
225 def remove(self, key):
225 def remove(self, key):
226 shard = self._get_shard(key)
226 shard = self._get_shard(key)
227 return shard.remove(key)
227 return shard.remove(key)
228
228
229 def has_key(self, archive_key):
229 def has_key(self, archive_key):
230 """Return `True` if `key` matching item is found in cache.
230 """Return `True` if `key` matching item is found in cache.
231
231
232 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
232 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
233 :return: True if key is found
233 :return: True if key is found
234
234
235 """
235 """
236 shard = self._get_shard(archive_key)
236 shard = self._get_shard(archive_key)
237 return archive_key in shard
237 return archive_key in shard
238
238
239 def iter_keys(self):
239 def iter_keys(self):
240 for shard in self._shards:
240 for shard in self._shards:
241 if shard.fs.exists(shard.storage_medium):
241 if shard.fs.exists(shard.storage_medium):
242 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
242 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
243 for key_file_path in _files:
243 for key_file_path in _files:
244 if key_file_path.endswith(shard.key_suffix):
244 if key_file_path.endswith(shard.key_suffix):
245 yield shard, key_file_path
245 yield shard, key_file_path
246
246
247 def get_lock(self, lock_key):
247 def get_lock(self, lock_key):
248 return GenerationLock(lock_key, self._locking_url)
248 return GenerationLock(lock_key, self._locking_url)
249
249
250 def evict(self, policy=None, size_limit=None) -> int:
250 def evict(self, policy=None, size_limit=None) -> dict:
251 """
251 """
252 Remove old items based on the conditions
252 Remove old items based on the conditions
253
253
254
254
255 explanation of this algo:
255 explanation of this algo:
256 iterate over each shard, then for each shard iterate over the .key files
256 iterate over each shard, then for each shard iterate over the .key files
257 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
257 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
258 access data, time creation, and access counts.
258 access data, time creation, and access counts.
259
259
260 Store that into a memory DB so we can run different sorting strategies easily.
260 Store that into a memory DB in order we can run different sorting strategies easily.
261 Summing the size is a sum sql query.
261 Summing the size is a sum sql query.
262
262
263 Then we run a sorting strategy based on eviction policy.
263 Then we run a sorting strategy based on eviction policy.
264 We iterate over sorted keys, and remove each checking if we hit the overall limit.
264 We iterate over sorted keys, and remove each checking if we hit the overall limit.
265 """
265 """
266
266 removal_info = {
267 "removed_items": 0,
268 "removed_size": 0
269 }
267 policy = policy or self._eviction_policy
270 policy = policy or self._eviction_policy
268 size_limit = size_limit or self._cache_size_limit
271 size_limit = size_limit or self._cache_size_limit
269
272
270 select_policy = EVICTION_POLICY[policy]['evict']
273 select_policy = EVICTION_POLICY[policy]['evict']
271
274
272 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
275 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
273 policy, format_size(size_limit))
276 policy, format_size(size_limit))
274
277
275 if select_policy is None:
278 if select_policy is None:
276 return 0
279 return removal_info
277
280
278 db = self.get_stats_db()
281 db = self.get_stats_db()
279
282
280 data = []
283 data = []
281 cnt = 1
284 cnt = 1
282
285
283 for shard, key_file in self.iter_keys():
286 for shard, key_file in self.iter_keys():
284 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
287 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
285 metadata = json.loads(f.read())
288 metadata = json.loads(f.read())
286
289
287 key_file_path = os.path.join(shard.storage_medium, key_file)
290 key_file_path = os.path.join(shard.storage_medium, key_file)
288
291
289 archive_key = metadata['archive_key']
292 archive_key = metadata['archive_key']
290 archive_path = metadata['archive_full_path']
293 archive_path = metadata['archive_full_path']
291
294
292 size = metadata.get('size')
295 size = metadata.get('size')
293 if not size:
296 if not size:
294 # in case we don't have size re-calc it...
297 # in case we don't have size re-calc it...
295 size = self._get_size(shard, archive_path)
298 size = self._get_size(shard, archive_path)
296
299
297 data.append([
300 data.append([
298 cnt,
301 cnt,
299 key_file,
302 key_file,
300 key_file_path,
303 key_file_path,
301 archive_key,
304 archive_key,
302 archive_path,
305 archive_path,
303 metadata.get('store_time', 0),
306 metadata.get('store_time', 0),
304 metadata.get('access_time', 0),
307 metadata.get('access_time', 0),
305 metadata.get('access_count', 0),
308 metadata.get('access_count', 0),
306 size,
309 size,
307 ])
310 ])
308 cnt += 1
311 cnt += 1
309
312
310 # Insert bulk data using executemany
313 # Insert bulk data using executemany
311 db.bulk_insert(data)
314 db.bulk_insert(data)
312
315
313 total_size = db.get_total_size()
316 total_size = db.get_total_size()
314 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
317 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
315 len(data), format_size(total_size), format_size(size_limit))
318 len(data), format_size(total_size), format_size(size_limit))
316
319
317 removed_items = 0
320 removed_items = 0
318 removed_size = 0
321 removed_size = 0
319 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
322 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
320 # simulate removal impact BEFORE removal
323 # simulate removal impact BEFORE removal
321 total_size -= size
324 total_size -= size
322
325
323 if total_size <= size_limit:
326 if total_size <= size_limit:
324 # we obtained what we wanted...
327 # we obtained what we wanted...
325 break
328 break
326
329
327 self.remove(archive_key)
330 self.remove(archive_key)
328 removed_items += 1
331 removed_items += 1
329 removed_size += size
332 removed_size += size
330
333 removal_info['removed_items'] = removed_items
334 removal_info['removed_size'] = removed_size
331 log.debug('Removed %s cache archives, and reduced size by: %s',
335 log.debug('Removed %s cache archives, and reduced size by: %s',
332 removed_items, format_size(removed_size))
336 removed_items, format_size(removed_size))
333 return removed_items
337 return removal_info
334
338
335 def get_statistics(self):
339 def get_statistics(self):
336 total_files = 0
340 total_files = 0
337 total_size = 0
341 total_size = 0
338 meta = {}
342 meta = {}
339
343
340 for shard, key_file in self.iter_keys():
344 for shard, key_file in self.iter_keys():
341 json_key = f"{shard.storage_medium}/{key_file}"
345 json_key = f"{shard.storage_medium}/{key_file}"
342 with shard.fs.open(json_key, 'rb') as f:
346 with shard.fs.open(json_key, 'rb') as f:
343 total_files += 1
347 total_files += 1
344 metadata = json.loads(f.read())
348 metadata = json.loads(f.read())
345 total_size += metadata['size']
349 total_size += metadata['size']
346
350
347 return total_files, total_size, meta
351 return total_files, total_size, meta
348
352
General Comments 0
You need to be logged in to leave comments. Login now