##// END OF EJS Templates
archive-cache: synced with ce
super-admin -
r1247:35680f51 default
parent child Browse files
Show More
@@ -1,448 +1,455 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import codecs
19 19 import contextlib
20 20 import functools
21 21 import os
22 22 import logging
23 23 import time
24 24 import typing
25 25 import zlib
26 26 import sqlite3
27 27
28 28 from ...ext_json import json
29 29 from .lock import GenerationLock
30 30 from .utils import format_size
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34 cache_meta = None
35 35
36 36 UNKNOWN = -241
37 37 NO_VAL = -917
38 38
39 39 MODE_BINARY = 'BINARY'
40 40
41 41
42 42 EVICTION_POLICY = {
43 43 'none': {
44 44 'evict': None,
45 45 },
46 46 'least-recently-stored': {
47 47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 48 },
49 49 'least-recently-used': {
50 50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 51 },
52 52 'least-frequently-used': {
53 53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 54 },
55 55 }
56 56
57 57
58 58 class DB:
59 59
60 60 def __init__(self):
61 61 self.connection = sqlite3.connect(':memory:')
62 62 self._init_db()
63 63
64 64 def _init_db(self):
65 65 qry = '''
66 66 CREATE TABLE IF NOT EXISTS archive_cache (
67 67 rowid INTEGER PRIMARY KEY,
68 68 key_file TEXT,
69 69 key_file_path TEXT,
70 70 filename TEXT,
71 71 full_path TEXT,
72 72 store_time REAL,
73 73 access_time REAL,
74 74 access_count INTEGER DEFAULT 0,
75 75 size INTEGER DEFAULT 0
76 76 )
77 77 '''
78 78
79 79 self.sql(qry)
80 80 self.connection.commit()
81 81
82 82 @property
83 83 def sql(self):
84 84 return self.connection.execute
85 85
86 86 def bulk_insert(self, rows):
87 87 qry = '''
88 88 INSERT INTO archive_cache (
89 89 rowid,
90 90 key_file,
91 91 key_file_path,
92 92 filename,
93 93 full_path,
94 94 store_time,
95 95 access_time,
96 96 access_count,
97 97 size
98 98 )
99 99 VALUES (
100 100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 101 )
102 102 '''
103 103 cursor = self.connection.cursor()
104 104 cursor.executemany(qry, rows)
105 105 self.connection.commit()
106 106
107 107
108 108 class FileSystemCache:
109 109
110 110 def __init__(self, index, directory, **settings):
111 111 self._index = index
112 112 self._directory = directory
113 113
114 114 @property
115 115 def directory(self):
116 116 """Cache directory."""
117 117 return self._directory
118 118
119 119 def _write_file(self, full_path, iterator, mode, encoding=None):
120 120 full_dir, _ = os.path.split(full_path)
121 121
122 122 for count in range(1, 11):
123 123 with contextlib.suppress(OSError):
124 124 os.makedirs(full_dir)
125 125
126 126 try:
127 127 # Another cache may have deleted the directory before
128 128 # the file could be opened.
129 129 writer = open(full_path, mode, encoding=encoding)
130 130 except OSError:
131 131 if count == 10:
132 132 # Give up after 10 tries to open the file.
133 133 raise
134 134 continue
135 135
136 136 with writer:
137 137 size = 0
138 138 for chunk in iterator:
139 139 size += len(chunk)
140 140 writer.write(chunk)
141 writer.flush()
142 # Get the file descriptor
143 fd = writer.fileno()
144
145 # Sync the file descriptor to disk, helps with NFS cases...
146 os.fsync(fd)
147 log.debug('written new archive cache under %s', full_path)
141 148 return size
142 149
143 150 def _get_keyfile(self, key):
144 151 return os.path.join(self._directory, f'{key}.key')
145 152
146 153 def store(self, key, value_reader, metadata):
147 154 filename, full_path = self.random_filename()
148 155 key_file = self._get_keyfile(key)
149 156
150 157 # STORE METADATA
151 158 _metadata = {
152 159 "version": "v1",
153 160 "filename": filename,
154 161 "full_path": full_path,
155 162 "key_file": key_file,
156 163 "store_time": time.time(),
157 164 "access_count": 1,
158 165 "access_time": 0,
159 166 "size": 0
160 167 }
161 168 if metadata:
162 169 _metadata.update(metadata)
163 170
164 171 reader = functools.partial(value_reader.read, 2**22)
165 172
166 173 iterator = iter(reader, b'')
167 174 size = self._write_file(full_path, iterator, 'xb')
168 175 metadata['size'] = size
169 176
170 177 # after archive is finished, we create a key to save the presence of the binary file
171 178 with open(key_file, 'wb') as f:
172 179 f.write(json.dumps(_metadata))
173 180
174 181 return key, size, MODE_BINARY, filename, _metadata
175 182
176 183 def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
177 184
178 185 if retry:
179 186 for attempt in range(retry_attempts):
180 187 if key in self:
181 188 break
182 189 # we dind't find the key, wait 1s, and re-check
183 190 time.sleep(1)
184 191
185 192 if key not in self:
186 193 log.exception('requested {key} not found in {self}', key, self)
187 194 raise KeyError(key)
188 195
189 196 key_file = self._get_keyfile(key)
190 197 with open(key_file, 'rb') as f:
191 198 metadata = json.loads(f.read())
192 199
193 200 filename = metadata['filename']
194 201
195 202 try:
196 203 return open(os.path.join(self.directory, filename), 'rb'), metadata
197 204 finally:
198 205 # update usage stats, count and accessed
199 206 metadata["access_count"] = metadata.get("access_count", 0) + 1
200 207 metadata["access_time"] = time.time()
201 208
202 209 with open(key_file, 'wb') as f:
203 210 f.write(json.dumps(metadata))
204 211
205 212 def random_filename(self):
206 213 """Return filename and full-path tuple for file storage.
207 214
208 215 Filename will be a randomly generated 28 character hexadecimal string
209 216 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
210 217 reduce the size of directories. On older filesystems, lookups in
211 218 directories with many files may be slow.
212 219 """
213 220
214 221 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
215 222 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
216 223 name = hex_name[4:] + '.archive_cache'
217 224 filename = os.path.join(sub_dir, name)
218 225 full_path = os.path.join(self.directory, filename)
219 226 return filename, full_path
220 227
221 228 def hash(self, key):
222 229 """Compute portable hash for `key`.
223 230
224 231 :param key: key to hash
225 232 :return: hash value
226 233
227 234 """
228 235 mask = 0xFFFFFFFF
229 236 return zlib.adler32(key.encode('utf-8')) & mask # noqa
230 237
231 238 def __contains__(self, key):
232 239 """Return `True` if `key` matching item is found in cache.
233 240
234 241 :param key: key matching item
235 242 :return: True if key matching item
236 243
237 244 """
238 245 key_file = self._get_keyfile(key)
239 246 return os.path.exists(key_file)
240 247
241 248 def __repr__(self):
242 249 return f'FileSystemCache(index={self._index}, dir={self.directory})'
243 250
244 251
245 252 class FanoutCache:
246 253 """Cache that shards keys and values."""
247 254
248 255 def __init__(
249 256 self, directory=None, **settings
250 257 ):
251 258 """Initialize cache instance.
252 259
253 260 :param str directory: cache directory
254 261 :param settings: settings dict
255 262
256 263 """
257 264 if directory is None:
258 265 raise ValueError('directory cannot be None')
259 266
260 267 directory = str(directory)
261 268 directory = os.path.expanduser(directory)
262 269 directory = os.path.expandvars(directory)
263 270 self._directory = directory
264 271
265 272 self._count = settings.pop('cache_shards')
266 273 self._locking_url = settings.pop('locking_url')
267 274
268 275 self._eviction_policy = settings['cache_eviction_policy']
269 276 self._cache_size_limit = settings['cache_size_limit']
270 277
271 278 self._shards = tuple(
272 279 FileSystemCache(
273 280 index=num,
274 281 directory=os.path.join(directory, 'shard_%03d' % num),
275 282 **settings,
276 283 )
277 284 for num in range(self._count)
278 285 )
279 286 self._hash = self._shards[0].hash
280 287
281 288 @property
282 289 def directory(self):
283 290 """Cache directory."""
284 291 return self._directory
285 292
286 293 def get_lock(self, lock_key):
287 294 return GenerationLock(lock_key, self._locking_url)
288 295
289 296 def _get_shard(self, key) -> FileSystemCache:
290 297 index = self._hash(key) % self._count
291 298 shard = self._shards[index]
292 299 return shard
293 300
294 301 def store(self, key, value_reader, metadata=None):
295 302 shard = self._get_shard(key)
296 303 return shard.store(key, value_reader, metadata)
297 304
298 305 def fetch(self, key, retry=False, retry_attempts=10):
299 306 """Return file handle corresponding to `key` from cache.
300 307 """
301 308 shard = self._get_shard(key)
302 309 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
303 310
304 311 def has_key(self, key):
305 312 """Return `True` if `key` matching item is found in cache.
306 313
307 314 :param key: key for item
308 315 :return: True if key is found
309 316
310 317 """
311 318 shard = self._get_shard(key)
312 319 return key in shard
313 320
314 321 def __contains__(self, item):
315 322 return self.has_key(item)
316 323
317 324 def evict(self, policy=None, size_limit=None):
318 325 """
319 326 Remove old items based on the conditions
320 327
321 328
322 329 explanation of this algo:
323 330 iterate over each shard, then for each shard iterate over the .key files
324 331 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
325 332 access data, time creation, and access counts.
326 333
327 334 Store that into a memory DB so we can run different sorting strategies easily.
328 335 Summing the size is a sum sql query.
329 336
330 337 Then we run a sorting strategy based on eviction policy.
331 338 We iterate over sorted keys, and remove each checking if we hit the overall limit.
332 339 """
333 340
334 341 policy = policy or self._eviction_policy
335 342 size_limit = size_limit or self._cache_size_limit
336 343
337 344 select_policy = EVICTION_POLICY[policy]['evict']
338 345
339 346 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
340 347 policy, format_size(size_limit))
341 348
342 349 if select_policy is None:
343 350 return 0
344 351
345 352 db = DB()
346 353
347 354 data = []
348 355 cnt = 1
349 356 for shard in self._shards:
350 357 for key_file in os.listdir(shard.directory):
351 358 if key_file.endswith('.key'):
352 359 key_file_path = os.path.join(shard.directory, key_file)
353 360 with open(key_file_path, 'rb') as f:
354 361 metadata = json.loads(f.read())
355 362
356 363 size = metadata.get('size')
357 364 filename = metadata.get('filename')
358 365 full_path = metadata.get('full_path')
359 366
360 367 if not size:
361 368 # in case we don't have size re-calc it...
362 369 size = os.stat(full_path).st_size
363 370
364 371 data.append([
365 372 cnt,
366 373 key_file,
367 374 key_file_path,
368 375 filename,
369 376 full_path,
370 377 metadata.get('store_time', 0),
371 378 metadata.get('access_time', 0),
372 379 metadata.get('access_count', 0),
373 380 size,
374 381 ])
375 382 cnt += 1
376 383
377 384 # Insert bulk data using executemany
378 385 db.bulk_insert(data)
379 386
380 387 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
381 388 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
382 389 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
383 390 sorted_keys = db.sql(select_policy_qry).fetchall()
384 391
385 392 removed_items = 0
386 393 removed_size = 0
387 394 for key, cached_file, size in sorted_keys:
388 395 # simulate removal impact BEFORE removal
389 396 total_size -= size
390 397
391 398 if total_size <= size_limit:
392 399 # we obtained what we wanted...
393 400 break
394 401
395 402 os.remove(cached_file)
396 403 os.remove(key)
397 404 removed_items += 1
398 405 removed_size += size
399 406
400 407 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
401 408 return removed_items
402 409
403 410
404 411 def get_archival_config(config):
405 412
406 413 final_config = {
407 414
408 415 }
409 416
410 417 for k, v in config.items():
411 418 if k.startswith('archive_cache'):
412 419 final_config[k] = v
413 420
414 421 return final_config
415 422
416 423
417 424 def get_archival_cache_store(config):
418 425
419 426 global cache_meta
420 427 if cache_meta is not None:
421 428 return cache_meta
422 429
423 430 config = get_archival_config(config)
424 431 backend = config['archive_cache.backend.type']
425 432 if backend != 'filesystem':
426 433 raise ValueError('archive_cache.backend.type only supports "filesystem"')
427 434
428 435 archive_cache_locking_url = config['archive_cache.locking.url']
429 436 archive_cache_dir = config['archive_cache.filesystem.store_dir']
430 437 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
431 438 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
432 439 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
433 440
434 441 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
435 442
436 443 # check if it's ok to write, and re-create the archive cache
437 444 if not os.path.isdir(archive_cache_dir):
438 445 os.makedirs(archive_cache_dir, exist_ok=True)
439 446
440 447 d_cache = FanoutCache(
441 448 archive_cache_dir,
442 449 locking_url=archive_cache_locking_url,
443 450 cache_shards=archive_cache_shards,
444 451 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
445 452 cache_eviction_policy=archive_cache_eviction_policy
446 453 )
447 454 cache_meta = d_cache
448 455 return cache_meta
General Comments 0
You need to be logged in to leave comments. Login now