##// END OF EJS Templates
archive-cache: synced with ce
super-admin -
r1252:acea161c default
parent child Browse files
Show More
@@ -1,348 +1,352 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import os
20 20 import functools
21 21 import logging
22 22 import typing
23 23 import time
24 24 import zlib
25 25
26 26 from ...ext_json import json
27 27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 28 from ..lock import GenerationLock
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class BaseShard:
34 34 storage_type: str = ''
35 35 fs = None
36 36
37 37 @classmethod
38 38 def hash(cls, key):
39 39 """Compute portable hash for `key`.
40 40
41 41 :param key: key to hash
42 42 :return: hash value
43 43
44 44 """
45 45 mask = 0xFFFFFFFF
46 46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47 47
48 48 def _write_file(self, full_path, read_iterator, mode):
49 49 raise NotImplementedError
50 50
51 51 def _get_keyfile(self, key):
52 52 raise NotImplementedError
53 53
54 54 def random_filename(self):
55 55 raise NotImplementedError
56 56
57 57 def _store(self, key, value_reader, metadata, mode):
58 58 (filename, # hash-name
59 59 full_path # full-path/hash-name
60 60 ) = self.random_filename()
61 61
62 62 key_file, key_file_path = self._get_keyfile(key)
63 63
64 64 # STORE METADATA
65 65 _metadata = {
66 66 "version": "v1",
67 67
68 68 "key_file": key_file, # this is the .key.json file storing meta
69 69 "key_file_path": key_file_path, # full path to key_file
70 70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 71 "archive_filename": filename, # the actual filename we stored that file under
72 72 "archive_full_path": full_path,
73 73
74 74 "store_time": time.time(),
75 75 "access_count": 0,
76 76 "access_time": 0,
77 77
78 78 "size": 0
79 79 }
80 80 if metadata:
81 81 _metadata.update(metadata)
82 82
83 83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 84 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 85 _metadata['size'] = size
86 86 _metadata['sha256'] = sha256
87 87
88 88 # after archive is finished, we create a key to save the presence of the binary file
89 89 with self.fs.open(key_file_path, 'wb') as f:
90 90 f.write(json.dumps(_metadata))
91 91
92 92 return key, filename, size, _metadata
93 93
94 94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
95 95 if retry is NOT_GIVEN:
96 96 retry = False
97 97 if retry_attempts is NOT_GIVEN:
98 98 retry_attempts = 0
99 99
100 100 if retry and retry_attempts > 0:
101 101 for attempt in range(1, retry_attempts + 1):
102 102 if key in self:
103 103 break
104 104 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 105 time.sleep(retry_backoff)
106 106
107 107 if key not in self:
108 108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 109 raise KeyError(key)
110 110
111 111 key_file, key_file_path = self._get_keyfile(key)
112 112 with self.fs.open(key_file_path, 'rb') as f:
113 113 metadata = json.loads(f.read())
114 114
115 115 archive_path = metadata['archive_full_path']
116 116
117 117 try:
118 118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 119 finally:
120 120 # update usage stats, count and accessed
121 121 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 122 metadata["access_time"] = time.time()
123 123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 124 key_file, metadata['access_count'], metadata['access_time'])
125 125 with self.fs.open(key_file_path, 'wb') as f:
126 126 f.write(json.dumps(metadata))
127 127
128 128 def _remove(self, key):
129 129 if key not in self:
130 130 log.exception(f'requested key={key} not found in {self}')
131 131 raise KeyError(key)
132 132
133 133 key_file, key_file_path = self._get_keyfile(key)
134 134 with self.fs.open(key_file_path, 'rb') as f:
135 135 metadata = json.loads(f.read())
136 136
137 137 archive_path = metadata['archive_full_path']
138 138 self.fs.rm(archive_path)
139 139 self.fs.rm(key_file_path)
140 140 return 1
141 141
142 142 @property
143 143 def storage_medium(self):
144 144 return getattr(self, self.storage_type)
145 145
146 146 @property
147 147 def key_suffix(self):
148 148 return 'key.json'
149 149
150 150 def __contains__(self, key):
151 151 """Return `True` if `key` matching item is found in cache.
152 152
153 153 :param key: key matching item
154 154 :return: True if key matching item
155 155
156 156 """
157 157 key_file, key_file_path = self._get_keyfile(key)
158 158 return self.fs.exists(key_file_path)
159 159
160 160
161 161 class BaseCache:
162 162 _locking_url: str = ''
163 163 _storage_path: str = ''
164 164 _config = {}
165 165 retry = False
166 166 retry_attempts = 0
167 167 retry_backoff = 1
168 168 _shards = tuple()
169 169
170 170 def __contains__(self, key):
171 171 """Return `True` if `key` matching item is found in cache.
172 172
173 173 :param key: key matching item
174 174 :return: True if key matching item
175 175
176 176 """
177 177 return self.has_key(key)
178 178
179 179 def __repr__(self):
180 180 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181 181
182 182 @classmethod
183 183 def gb_to_bytes(cls, gb):
184 184 return gb * (1024 ** 3)
185 185
186 186 @property
187 187 def storage_path(self):
188 188 return self._storage_path
189 189
190 190 @classmethod
191 191 def get_stats_db(cls):
192 192 return StatsDB()
193 193
194 194 def get_conf(self, key, pop=False):
195 195 if key not in self._config:
196 196 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 197 val = self._config[key]
198 198 if pop:
199 199 del self._config[key]
200 200 return val
201 201
202 202 def _get_shard(self, key):
203 203 raise NotImplementedError
204 204
205 205 def _get_size(self, shard, archive_path):
206 206 raise NotImplementedError
207 207
208 208 def store(self, key, value_reader, metadata=None):
209 209 shard = self._get_shard(key)
210 210 return shard.store(key, value_reader, metadata)
211 211
212 212 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
213 213 """
214 214 Return file handle corresponding to `key` from specific shard cache.
215 215 """
216 216 if retry is NOT_GIVEN:
217 217 retry = self.retry
218 218 if retry_attempts is NOT_GIVEN:
219 219 retry_attempts = self.retry_attempts
220 220 retry_backoff = self.retry_backoff
221 221
222 222 shard = self._get_shard(key)
223 223 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
224 224
225 225 def remove(self, key):
226 226 shard = self._get_shard(key)
227 227 return shard.remove(key)
228 228
229 229 def has_key(self, archive_key):
230 230 """Return `True` if `key` matching item is found in cache.
231 231
232 232 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
233 233 :return: True if key is found
234 234
235 235 """
236 236 shard = self._get_shard(archive_key)
237 237 return archive_key in shard
238 238
239 239 def iter_keys(self):
240 240 for shard in self._shards:
241 241 if shard.fs.exists(shard.storage_medium):
242 242 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
243 243 for key_file_path in _files:
244 244 if key_file_path.endswith(shard.key_suffix):
245 245 yield shard, key_file_path
246 246
247 247 def get_lock(self, lock_key):
248 248 return GenerationLock(lock_key, self._locking_url)
249 249
250 def evict(self, policy=None, size_limit=None) -> int:
250 def evict(self, policy=None, size_limit=None) -> dict:
251 251 """
252 252 Remove old items based on the conditions
253 253
254 254
255 255 explanation of this algo:
256 256 iterate over each shard, then for each shard iterate over the .key files
257 257 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
258 258 access data, time creation, and access counts.
259 259
260 Store that into a memory DB so we can run different sorting strategies easily.
260 Store that into a memory DB in order we can run different sorting strategies easily.
261 261 Summing the size is a sum sql query.
262 262
263 263 Then we run a sorting strategy based on eviction policy.
264 264 We iterate over sorted keys, and remove each checking if we hit the overall limit.
265 265 """
266
266 removal_info = {
267 "removed_items": 0,
268 "removed_size": 0
269 }
267 270 policy = policy or self._eviction_policy
268 271 size_limit = size_limit or self._cache_size_limit
269 272
270 273 select_policy = EVICTION_POLICY[policy]['evict']
271 274
272 275 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
273 276 policy, format_size(size_limit))
274 277
275 278 if select_policy is None:
276 return 0
279 return removal_info
277 280
278 281 db = self.get_stats_db()
279 282
280 283 data = []
281 284 cnt = 1
282 285
283 286 for shard, key_file in self.iter_keys():
284 287 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
285 288 metadata = json.loads(f.read())
286 289
287 290 key_file_path = os.path.join(shard.storage_medium, key_file)
288 291
289 292 archive_key = metadata['archive_key']
290 293 archive_path = metadata['archive_full_path']
291 294
292 295 size = metadata.get('size')
293 296 if not size:
294 297 # in case we don't have size re-calc it...
295 298 size = self._get_size(shard, archive_path)
296 299
297 300 data.append([
298 301 cnt,
299 302 key_file,
300 303 key_file_path,
301 304 archive_key,
302 305 archive_path,
303 306 metadata.get('store_time', 0),
304 307 metadata.get('access_time', 0),
305 308 metadata.get('access_count', 0),
306 309 size,
307 310 ])
308 311 cnt += 1
309 312
310 313 # Insert bulk data using executemany
311 314 db.bulk_insert(data)
312 315
313 316 total_size = db.get_total_size()
314 317 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
315 318 len(data), format_size(total_size), format_size(size_limit))
316 319
317 320 removed_items = 0
318 321 removed_size = 0
319 322 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
320 323 # simulate removal impact BEFORE removal
321 324 total_size -= size
322 325
323 326 if total_size <= size_limit:
324 327 # we obtained what we wanted...
325 328 break
326 329
327 330 self.remove(archive_key)
328 331 removed_items += 1
329 332 removed_size += size
330
333 removal_info['removed_items'] = removed_items
334 removal_info['removed_size'] = removed_size
331 335 log.debug('Removed %s cache archives, and reduced size by: %s',
332 336 removed_items, format_size(removed_size))
333 return removed_items
337 return removal_info
334 338
335 339 def get_statistics(self):
336 340 total_files = 0
337 341 total_size = 0
338 342 meta = {}
339 343
340 344 for shard, key_file in self.iter_keys():
341 345 json_key = f"{shard.storage_medium}/{key_file}"
342 346 with shard.fs.open(json_key, 'rb') as f:
343 347 total_files += 1
344 348 metadata = json.loads(f.read())
345 349 total_size += metadata['size']
346 350
347 351 return total_files, total_size, meta
348 352
General Comments 0
You need to be logged in to leave comments. Login now