##// END OF EJS Templates
feat(archive-cache): re-calculate size better if we miss it in stats
super-admin -
r5425:9c658c9d default
parent child Browse files
Show More
@@ -1,423 +1,427 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import contextlib
21 21 import functools
22 22 import os
23 23 import logging
24 24 import time
25 25 import typing
26 26 import zlib
27 27 import sqlite3
28 28
29 29 from ...ext_json import json
30 30 from .lock import GenerationLock
31 31 from .utils import format_size
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35 cache_meta = None
36 36
37 37 UNKNOWN = -241
38 38 NO_VAL = -917
39 39
40 40 MODE_BINARY = 'BINARY'
41 41
42 42
43 43 EVICTION_POLICY = {
44 44 'none': {
45 45 'evict': None,
46 46 },
47 47 'least-recently-stored': {
48 48 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
49 49 },
50 50 'least-recently-used': {
51 51 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
52 52 },
53 53 'least-frequently-used': {
54 54 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
55 55 },
56 56 }
57 57
58 58
59 59 class DB:
60 60
61 61 def __init__(self):
62 62 self.connection = sqlite3.connect(':memory:')
63 63 self._init_db()
64 64
65 65 def _init_db(self):
66 66 qry = '''
67 67 CREATE TABLE IF NOT EXISTS archive_cache (
68 68 rowid INTEGER PRIMARY KEY,
69 69 key_file TEXT,
70 70 key_file_path TEXT,
71 71 filename TEXT,
72 72 full_path TEXT,
73 73 store_time REAL,
74 74 access_time REAL,
75 75 access_count INTEGER DEFAULT 0,
76 76 size INTEGER DEFAULT 0
77 77 )
78 78 '''
79 79
80 80 self.sql(qry)
81 81 self.connection.commit()
82 82
83 83 @property
84 84 def sql(self):
85 85 return self.connection.execute
86 86
87 87 def bulk_insert(self, rows):
88 88 qry = '''
89 89 INSERT INTO archive_cache (
90 90 rowid,
91 91 key_file,
92 92 key_file_path,
93 93 filename,
94 94 full_path,
95 95 store_time,
96 96 access_time,
97 97 access_count,
98 98 size
99 99 )
100 100 VALUES (
101 101 ?, ?, ?, ?, ?, ?, ?, ?, ?
102 102 )
103 103 '''
104 104 cursor = self.connection.cursor()
105 105 cursor.executemany(qry, rows)
106 106 self.connection.commit()
107 107
108 108
109 109 class FileSystemCache:
110 110
111 111 def __init__(self, index, directory, **settings):
112 112 self._index = index
113 113 self._directory = directory
114 114
115 115 def _write_file(self, full_path, iterator, mode, encoding=None):
116 116 full_dir, _ = os.path.split(full_path)
117 117
118 118 for count in range(1, 11):
119 119 with contextlib.suppress(OSError):
120 120 os.makedirs(full_dir)
121 121
122 122 try:
123 123 # Another cache may have deleted the directory before
124 124 # the file could be opened.
125 125 writer = open(full_path, mode, encoding=encoding)
126 126 except OSError:
127 127 if count == 10:
128 128 # Give up after 10 tries to open the file.
129 129 raise
130 130 continue
131 131
132 132 with writer:
133 133 size = 0
134 134 for chunk in iterator:
135 135 size += len(chunk)
136 136 writer.write(chunk)
137 137 return size
138 138
139 139 def _get_keyfile(self, key):
140 140 return os.path.join(self._directory, f'{key}.key')
141 141
142 142 def store(self, key, value_reader, metadata):
143 143 filename, full_path = self.random_filename()
144 144 key_file = self._get_keyfile(key)
145 145
146 146 # STORE METADATA
147 147 _metadata = {
148 148 "version": "v1",
149 149 "filename": filename,
150 150 "full_path": full_path,
151 151 "key_file": key_file,
152 152 "store_time": time.time(),
153 153 "access_count": 1,
154 154 "access_time": 0,
155 155 "size": 0
156 156 }
157 157 if metadata:
158 158 _metadata.update(metadata)
159 159
160 160 reader = functools.partial(value_reader.read, 2**22)
161 161
162 162 iterator = iter(reader, b'')
163 163 size = self._write_file(full_path, iterator, 'xb')
164 164 metadata['size'] = size
165 165
166 166 # after archive is finished, we create a key to save the presence of the binary file
167 167 with open(key_file, 'wb') as f:
168 168 f.write(json.dumps(_metadata))
169 169
170 170 return key, size, MODE_BINARY, filename, _metadata
171 171
172 172 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
173 173 if key not in self:
174 174 raise KeyError(key)
175 175
176 176 key_file = self._get_keyfile(key)
177 177 with open(key_file, 'rb') as f:
178 178 metadata = json.loads(f.read())
179 179
180 180 filename = metadata['filename']
181 181
182 182 try:
183 183 return open(os.path.join(self._directory, filename), 'rb'), metadata
184 184 finally:
185 185 # update usage stats, count and accessed
186 186 metadata["access_count"] = metadata.get("access_count", 0) + 1
187 187 metadata["access_time"] = time.time()
188 188
189 189 with open(key_file, 'wb') as f:
190 190 f.write(json.dumps(metadata))
191 191
192 192 def random_filename(self):
193 193 """Return filename and full-path tuple for file storage.
194 194
195 195 Filename will be a randomly generated 28 character hexadecimal string
196 196 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
197 197 reduce the size of directories. On older filesystems, lookups in
198 198 directories with many files may be slow.
199 199 """
200 200
201 201 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
202 202 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
203 203 name = hex_name[4:] + '.archive_cache'
204 204 filename = os.path.join(sub_dir, name)
205 205 full_path = os.path.join(self._directory, filename)
206 206 return filename, full_path
207 207
208 208 def hash(self, key):
209 209 """Compute portable hash for `key`.
210 210
211 211 :param key: key to hash
212 212 :return: hash value
213 213
214 214 """
215 215 mask = 0xFFFFFFFF
216 216 return zlib.adler32(key.encode('utf-8')) & mask # noqa
217 217
218 218 def __contains__(self, key):
219 219 """Return `True` if `key` matching item is found in cache.
220 220
221 221 :param key: key matching item
222 222 :return: True if key matching item
223 223
224 224 """
225 225 key_file = self._get_keyfile(key)
226 226 return os.path.exists(key_file)
227 227
228 228
229 229 class FanoutCache:
230 230 """Cache that shards keys and values."""
231 231
232 232 def __init__(
233 233 self, directory=None, **settings
234 234 ):
235 235 """Initialize cache instance.
236 236
237 237 :param str directory: cache directory
238 238 :param settings: settings dict
239 239
240 240 """
241 241 if directory is None:
242 242 raise ValueError('directory cannot be None')
243 243
244 244 directory = str(directory)
245 245 directory = os.path.expanduser(directory)
246 246 directory = os.path.expandvars(directory)
247 247 self._directory = directory
248 248
249 249 self._count = settings.pop('cache_shards')
250 250 self._locking_url = settings.pop('locking_url')
251 251
252 252 self._eviction_policy = settings['cache_eviction_policy']
253 253 self._cache_size_limit = settings['cache_size_limit']
254 254
255 255 self._shards = tuple(
256 256 FileSystemCache(
257 257 index=num,
258 258 directory=os.path.join(directory, 'shard_%03d' % num),
259 259 **settings,
260 260 )
261 261 for num in range(self._count)
262 262 )
263 263 self._hash = self._shards[0].hash
264 264
265 265 def get_lock(self, lock_key):
266 266 return GenerationLock(lock_key, self._locking_url)
267 267
268 268 def _get_shard(self, key) -> FileSystemCache:
269 269 index = self._hash(key) % self._count
270 270 shard = self._shards[index]
271 271 return shard
272 272
273 273 def store(self, key, value_reader, metadata=None):
274 274 shard = self._get_shard(key)
275 275 return shard.store(key, value_reader, metadata)
276 276
277 277 def fetch(self, key):
278 278 """Return file handle corresponding to `key` from cache.
279 279 """
280 280 shard = self._get_shard(key)
281 281 return shard.fetch(key)
282 282
283 283 def has_key(self, key):
284 284 """Return `True` if `key` matching item is found in cache.
285 285
286 286 :param key: key for item
287 287 :return: True if key is found
288 288
289 289 """
290 290 shard = self._get_shard(key)
291 291 return key in shard
292 292
293 293 def __contains__(self, item):
294 294 return self.has_key(item)
295 295
296 296 def evict(self, policy=None, size_limit=None):
297 297 """
298 298 Remove old items based on the conditions
299 299
300 300
301 301 explanation of this algo:
302 302 iterate over each shard, then for each shard iterate over the .key files
303 303 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
304 304 access data, time creation, and access counts.
305 305
306 306 Store that into a memory DB so we can run different sorting strategies easily.
307 307 Summing the size is a sum sql query.
308 308
309 309 Then we run a sorting strategy based on eviction policy.
310 310 We iterate over sorted keys, and remove each checking if we hit the overall limit.
311 311 """
312 312
313 313 policy = policy or self._eviction_policy
314 314 size_limit = size_limit or self._cache_size_limit
315 315
316 316 select_policy = EVICTION_POLICY[policy]['evict']
317 317
318 318 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
319 319 policy, format_size(size_limit))
320 320
321 321 if select_policy is None:
322 322 return 0
323 323
324 324 db = DB()
325 325
326 326 data = []
327 327 cnt = 1
328 328 for shard in self._shards:
329 329 for key_file in os.listdir(shard._directory):
330 330 if key_file.endswith('.key'):
331 331 key_file_path = os.path.join(shard._directory, key_file)
332 332 with open(key_file_path, 'rb') as f:
333 333 metadata = json.loads(f.read())
334
335 size = metadata.get('size')
336 filename = metadata.get('filename')
337 full_path = metadata.get('full_path')
338
339 if not size:
334 340 # in case we don't have size re-calc it...
335 if not metadata.get('size'):
336 fn = metadata.get('full_path')
337 size = os.stat(fn).st_size
341 size = os.stat(full_path).st_size
338 342
339 343 data.append([
340 344 cnt,
341 345 key_file,
342 346 key_file_path,
343 metadata.get('filename'),
344 metadata.get('full_path'),
347 filename,
348 full_path,
345 349 metadata.get('store_time', 0),
346 350 metadata.get('access_time', 0),
347 351 metadata.get('access_count', 0),
348 metadata.get('size', size),
352 size,
349 353 ])
350 354 cnt += 1
351 355
352 356 # Insert bulk data using executemany
353 357 db.bulk_insert(data)
354 358
355 359 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
356 360 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
357 361 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
358 362 sorted_keys = db.sql(select_policy_qry).fetchall()
359 363
360 364 removed_items = 0
361 365 removed_size = 0
362 366 for key, cached_file, size in sorted_keys:
363 367 # simulate removal impact BEFORE removal
364 368 total_size -= size
365 369
366 370 if total_size <= size_limit:
367 371 # we obtained what we wanted...
368 372 break
369 373
370 374 os.remove(cached_file)
371 375 os.remove(key)
372 376 removed_items += 1
373 377 removed_size += size
374 378
375 379 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
376 380 return removed_items
377 381
378 382
379 383 def get_archival_config(config):
380 384
381 385 final_config = {
382 386
383 387 }
384 388
385 389 for k, v in config.items():
386 390 if k.startswith('archive_cache'):
387 391 final_config[k] = v
388 392
389 393 return final_config
390 394
391 395
392 396 def get_archival_cache_store(config):
393 397
394 398 global cache_meta
395 399 if cache_meta is not None:
396 400 return cache_meta
397 401
398 402 config = get_archival_config(config)
399 403 backend = config['archive_cache.backend.type']
400 404 if backend != 'filesystem':
401 405 raise ValueError('archive_cache.backend.type only supports "filesystem"')
402 406
403 407 archive_cache_locking_url = config['archive_cache.locking.url']
404 408 archive_cache_dir = config['archive_cache.filesystem.store_dir']
405 409 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
406 410 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
407 411 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
408 412
409 413 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
410 414
411 415 # check if it's ok to write, and re-create the archive cache
412 416 if not os.path.isdir(archive_cache_dir):
413 417 os.makedirs(archive_cache_dir, exist_ok=True)
414 418
415 419 d_cache = FanoutCache(
416 420 archive_cache_dir,
417 421 locking_url=archive_cache_locking_url,
418 422 cache_shards=archive_cache_shards,
419 423 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
420 424 cache_eviction_policy=archive_cache_eviction_policy
421 425 )
422 426 cache_meta = d_cache
423 427 return cache_meta
General Comments 0
You need to be logged in to leave comments. Login now