##// END OF EJS Templates
archive-cache: synced with ce changes
super-admin -
r1245:3da621d7 default
parent child Browse files
Show More
@@ -1,31 +1,31 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 from .fanout_cache import get_archival_cache_store
19 19 from .fanout_cache import get_archival_config
20 20
21 21 from .utils import archive_iterator
22 from .utils import ArchiveCacheLock
22 from .utils import ArchiveCacheGenerationLock
23 23
24 24
25 25 def includeme(config):
26 26 # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode
27 27 return
28 28
29 29 # init our cache at start
30 30 settings = config.get_settings()
31 31 get_archival_cache_store(settings)
@@ -1,426 +1,448 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import codecs
19 19 import contextlib
20 20 import functools
21 21 import os
22 22 import logging
23 23 import time
24 24 import typing
25 25 import zlib
26 26 import sqlite3
27 27
28 28 from ...ext_json import json
29 29 from .lock import GenerationLock
30 30 from .utils import format_size
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34 cache_meta = None
35 35
36 36 UNKNOWN = -241
37 37 NO_VAL = -917
38 38
39 39 MODE_BINARY = 'BINARY'
40 40
41 41
42 42 EVICTION_POLICY = {
43 43 'none': {
44 44 'evict': None,
45 45 },
46 46 'least-recently-stored': {
47 47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 48 },
49 49 'least-recently-used': {
50 50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 51 },
52 52 'least-frequently-used': {
53 53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 54 },
55 55 }
56 56
57 57
58 58 class DB:
59 59
60 60 def __init__(self):
61 61 self.connection = sqlite3.connect(':memory:')
62 62 self._init_db()
63 63
64 64 def _init_db(self):
65 65 qry = '''
66 66 CREATE TABLE IF NOT EXISTS archive_cache (
67 67 rowid INTEGER PRIMARY KEY,
68 68 key_file TEXT,
69 69 key_file_path TEXT,
70 70 filename TEXT,
71 71 full_path TEXT,
72 72 store_time REAL,
73 73 access_time REAL,
74 74 access_count INTEGER DEFAULT 0,
75 75 size INTEGER DEFAULT 0
76 76 )
77 77 '''
78 78
79 79 self.sql(qry)
80 80 self.connection.commit()
81 81
82 82 @property
83 83 def sql(self):
84 84 return self.connection.execute
85 85
86 86 def bulk_insert(self, rows):
87 87 qry = '''
88 88 INSERT INTO archive_cache (
89 89 rowid,
90 90 key_file,
91 91 key_file_path,
92 92 filename,
93 93 full_path,
94 94 store_time,
95 95 access_time,
96 96 access_count,
97 97 size
98 98 )
99 99 VALUES (
100 100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 101 )
102 102 '''
103 103 cursor = self.connection.cursor()
104 104 cursor.executemany(qry, rows)
105 105 self.connection.commit()
106 106
107 107
108 108 class FileSystemCache:
109 109
110 110 def __init__(self, index, directory, **settings):
111 111 self._index = index
112 112 self._directory = directory
113 113
114 @property
115 def directory(self):
116 """Cache directory."""
117 return self._directory
118
114 119 def _write_file(self, full_path, iterator, mode, encoding=None):
115 120 full_dir, _ = os.path.split(full_path)
116 121
117 122 for count in range(1, 11):
118 123 with contextlib.suppress(OSError):
119 124 os.makedirs(full_dir)
120 125
121 126 try:
122 127 # Another cache may have deleted the directory before
123 128 # the file could be opened.
124 129 writer = open(full_path, mode, encoding=encoding)
125 130 except OSError:
126 131 if count == 10:
127 132 # Give up after 10 tries to open the file.
128 133 raise
129 134 continue
130 135
131 136 with writer:
132 137 size = 0
133 138 for chunk in iterator:
134 139 size += len(chunk)
135 140 writer.write(chunk)
136 141 return size
137 142
138 143 def _get_keyfile(self, key):
139 144 return os.path.join(self._directory, f'{key}.key')
140 145
141 146 def store(self, key, value_reader, metadata):
142 147 filename, full_path = self.random_filename()
143 148 key_file = self._get_keyfile(key)
144 149
145 150 # STORE METADATA
146 151 _metadata = {
147 152 "version": "v1",
148 153 "filename": filename,
149 154 "full_path": full_path,
150 155 "key_file": key_file,
151 156 "store_time": time.time(),
152 157 "access_count": 1,
153 158 "access_time": 0,
154 159 "size": 0
155 160 }
156 161 if metadata:
157 162 _metadata.update(metadata)
158 163
159 164 reader = functools.partial(value_reader.read, 2**22)
160 165
161 166 iterator = iter(reader, b'')
162 167 size = self._write_file(full_path, iterator, 'xb')
163 168 metadata['size'] = size
164 169
165 170 # after archive is finished, we create a key to save the presence of the binary file
166 171 with open(key_file, 'wb') as f:
167 172 f.write(json.dumps(_metadata))
168 173
169 174 return key, size, MODE_BINARY, filename, _metadata
170 175
171 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
176 def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
177
178 if retry:
179 for attempt in range(retry_attempts):
180 if key in self:
181 break
182 # we dind't find the key, wait 1s, and re-check
183 time.sleep(1)
184
172 185 if key not in self:
186 log.exception('requested {key} not found in {self}', key, self)
173 187 raise KeyError(key)
174 188
175 189 key_file = self._get_keyfile(key)
176 190 with open(key_file, 'rb') as f:
177 191 metadata = json.loads(f.read())
178 192
179 193 filename = metadata['filename']
180 194
181 195 try:
182 return open(os.path.join(self._directory, filename), 'rb'), metadata
196 return open(os.path.join(self.directory, filename), 'rb'), metadata
183 197 finally:
184 198 # update usage stats, count and accessed
185 199 metadata["access_count"] = metadata.get("access_count", 0) + 1
186 200 metadata["access_time"] = time.time()
187 201
188 202 with open(key_file, 'wb') as f:
189 203 f.write(json.dumps(metadata))
190 204
191 205 def random_filename(self):
192 206 """Return filename and full-path tuple for file storage.
193 207
194 208 Filename will be a randomly generated 28 character hexadecimal string
195 209 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
196 210 reduce the size of directories. On older filesystems, lookups in
197 211 directories with many files may be slow.
198 212 """
199 213
200 214 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
201 215 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
202 216 name = hex_name[4:] + '.archive_cache'
203 217 filename = os.path.join(sub_dir, name)
204 full_path = os.path.join(self._directory, filename)
218 full_path = os.path.join(self.directory, filename)
205 219 return filename, full_path
206 220
207 221 def hash(self, key):
208 222 """Compute portable hash for `key`.
209 223
210 224 :param key: key to hash
211 225 :return: hash value
212 226
213 227 """
214 228 mask = 0xFFFFFFFF
215 229 return zlib.adler32(key.encode('utf-8')) & mask # noqa
216 230
217 231 def __contains__(self, key):
218 232 """Return `True` if `key` matching item is found in cache.
219 233
220 234 :param key: key matching item
221 235 :return: True if key matching item
222 236
223 237 """
224 238 key_file = self._get_keyfile(key)
225 239 return os.path.exists(key_file)
226 240
241 def __repr__(self):
242 return f'FileSystemCache(index={self._index}, dir={self.directory})'
243
227 244
228 245 class FanoutCache:
229 246 """Cache that shards keys and values."""
230 247
231 248 def __init__(
232 249 self, directory=None, **settings
233 250 ):
234 251 """Initialize cache instance.
235 252
236 253 :param str directory: cache directory
237 254 :param settings: settings dict
238 255
239 256 """
240 257 if directory is None:
241 258 raise ValueError('directory cannot be None')
242 259
243 260 directory = str(directory)
244 261 directory = os.path.expanduser(directory)
245 262 directory = os.path.expandvars(directory)
246 263 self._directory = directory
247 264
248 265 self._count = settings.pop('cache_shards')
249 266 self._locking_url = settings.pop('locking_url')
250 267
251 268 self._eviction_policy = settings['cache_eviction_policy']
252 269 self._cache_size_limit = settings['cache_size_limit']
253 270
254 271 self._shards = tuple(
255 272 FileSystemCache(
256 273 index=num,
257 274 directory=os.path.join(directory, 'shard_%03d' % num),
258 275 **settings,
259 276 )
260 277 for num in range(self._count)
261 278 )
262 279 self._hash = self._shards[0].hash
263 280
281 @property
282 def directory(self):
283 """Cache directory."""
284 return self._directory
285
264 286 def get_lock(self, lock_key):
265 287 return GenerationLock(lock_key, self._locking_url)
266 288
267 289 def _get_shard(self, key) -> FileSystemCache:
268 290 index = self._hash(key) % self._count
269 291 shard = self._shards[index]
270 292 return shard
271 293
272 294 def store(self, key, value_reader, metadata=None):
273 295 shard = self._get_shard(key)
274 296 return shard.store(key, value_reader, metadata)
275 297
276 def fetch(self, key):
298 def fetch(self, key, retry=False, retry_attempts=10):
277 299 """Return file handle corresponding to `key` from cache.
278 300 """
279 301 shard = self._get_shard(key)
280 return shard.fetch(key)
302 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
281 303
282 304 def has_key(self, key):
283 305 """Return `True` if `key` matching item is found in cache.
284 306
285 307 :param key: key for item
286 308 :return: True if key is found
287 309
288 310 """
289 311 shard = self._get_shard(key)
290 312 return key in shard
291 313
292 314 def __contains__(self, item):
293 315 return self.has_key(item)
294 316
295 317 def evict(self, policy=None, size_limit=None):
296 318 """
297 319 Remove old items based on the conditions
298 320
299 321
300 322 explanation of this algo:
301 323 iterate over each shard, then for each shard iterate over the .key files
302 324 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
303 325 access data, time creation, and access counts.
304 326
305 327 Store that into a memory DB so we can run different sorting strategies easily.
306 328 Summing the size is a sum sql query.
307 329
308 330 Then we run a sorting strategy based on eviction policy.
309 331 We iterate over sorted keys, and remove each checking if we hit the overall limit.
310 332 """
311 333
312 334 policy = policy or self._eviction_policy
313 335 size_limit = size_limit or self._cache_size_limit
314 336
315 337 select_policy = EVICTION_POLICY[policy]['evict']
316 338
317 339 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
318 340 policy, format_size(size_limit))
319 341
320 342 if select_policy is None:
321 343 return 0
322 344
323 345 db = DB()
324 346
325 347 data = []
326 348 cnt = 1
327 349 for shard in self._shards:
328 for key_file in os.listdir(shard._directory):
350 for key_file in os.listdir(shard.directory):
329 351 if key_file.endswith('.key'):
330 key_file_path = os.path.join(shard._directory, key_file)
352 key_file_path = os.path.join(shard.directory, key_file)
331 353 with open(key_file_path, 'rb') as f:
332 354 metadata = json.loads(f.read())
333 355
334 356 size = metadata.get('size')
335 357 filename = metadata.get('filename')
336 358 full_path = metadata.get('full_path')
337 359
338 360 if not size:
339 361 # in case we don't have size re-calc it...
340 362 size = os.stat(full_path).st_size
341 363
342 364 data.append([
343 365 cnt,
344 366 key_file,
345 367 key_file_path,
346 368 filename,
347 369 full_path,
348 370 metadata.get('store_time', 0),
349 371 metadata.get('access_time', 0),
350 372 metadata.get('access_count', 0),
351 373 size,
352 374 ])
353 375 cnt += 1
354 376
355 377 # Insert bulk data using executemany
356 378 db.bulk_insert(data)
357 379
358 380 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
359 381 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
360 382 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
361 383 sorted_keys = db.sql(select_policy_qry).fetchall()
362 384
363 385 removed_items = 0
364 386 removed_size = 0
365 387 for key, cached_file, size in sorted_keys:
366 388 # simulate removal impact BEFORE removal
367 389 total_size -= size
368 390
369 391 if total_size <= size_limit:
370 392 # we obtained what we wanted...
371 393 break
372 394
373 395 os.remove(cached_file)
374 396 os.remove(key)
375 397 removed_items += 1
376 398 removed_size += size
377 399
378 400 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
379 401 return removed_items
380 402
381 403
382 404 def get_archival_config(config):
383 405
384 406 final_config = {
385 407
386 408 }
387 409
388 410 for k, v in config.items():
389 411 if k.startswith('archive_cache'):
390 412 final_config[k] = v
391 413
392 414 return final_config
393 415
394 416
395 417 def get_archival_cache_store(config):
396 418
397 419 global cache_meta
398 420 if cache_meta is not None:
399 421 return cache_meta
400 422
401 423 config = get_archival_config(config)
402 424 backend = config['archive_cache.backend.type']
403 425 if backend != 'filesystem':
404 426 raise ValueError('archive_cache.backend.type only supports "filesystem"')
405 427
406 428 archive_cache_locking_url = config['archive_cache.locking.url']
407 429 archive_cache_dir = config['archive_cache.filesystem.store_dir']
408 430 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
409 431 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
410 432 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
411 433
412 434 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
413 435
414 436 # check if it's ok to write, and re-create the archive cache
415 437 if not os.path.isdir(archive_cache_dir):
416 438 os.makedirs(archive_cache_dir, exist_ok=True)
417 439
418 440 d_cache = FanoutCache(
419 441 archive_cache_dir,
420 442 locking_url=archive_cache_locking_url,
421 443 cache_shards=archive_cache_shards,
422 444 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
423 445 cache_eviction_policy=archive_cache_eviction_policy
424 446 )
425 447 cache_meta = d_cache
426 448 return cache_meta
@@ -1,58 +1,58 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import redis
19 19 from ..._vendor import redis_lock
20 from .utils import ArchiveCacheLock
20 from .utils import ArchiveCacheGenerationLock
21 21
22 22
23 23 class GenerationLock:
24 24 """
25 25 Locking mechanism that detects if a lock is acquired
26 26
27 27 with GenerationLock(lock_key):
28 28 compute_archive()
29 29 """
30 30 lock_timeout = 7200
31 31
32 32 def __init__(self, lock_key, url):
33 33 self.lock_key = lock_key
34 34 self._create_client(url)
35 35 self.lock = self.get_lock()
36 36
37 37 def _create_client(self, url):
38 38 connection_pool = redis.ConnectionPool.from_url(url)
39 39 self.writer_client = redis.StrictRedis(
40 40 connection_pool=connection_pool
41 41 )
42 42 self.reader_client = self.writer_client
43 43
44 44 def get_lock(self):
45 45 return redis_lock.Lock(
46 46 redis_client=self.writer_client,
47 47 name=self.lock_key,
48 48 expire=self.lock_timeout,
49 49 strict=True
50 50 )
51 51
52 52 def __enter__(self):
53 53 acquired = self.lock.acquire(blocking=False)
54 54 if not acquired:
55 raise ArchiveCacheLock('Failed to create a lock')
55 raise ArchiveCacheGenerationLock('Failed to create a lock')
56 56
57 57 def __exit__(self, exc_type, exc_val, exc_tb):
58 58 self.lock.release()
@@ -1,71 +1,71 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import os
19 19
20 20
21 class ArchiveCacheLock(Exception):
21 class ArchiveCacheGenerationLock(Exception):
22 22 pass
23 23
24 24
25 25 def archive_iterator(_reader, block_size: int = 4096 * 512):
26 26 # 4096 * 64 = 64KB
27 27 while 1:
28 28 data = _reader.read(block_size)
29 29 if not data:
30 30 break
31 31 yield data
32 32
33 33
34 34 def get_directory_statistics(start_path):
35 35 """
36 36 total_files, total_size, directory_stats = get_directory_statistics(start_path)
37 37
38 38 print(f"Directory statistics for: {start_path}\n")
39 39 print(f"Total files: {total_files}")
40 40 print(f"Total size: {format_size(total_size)}\n")
41 41
42 42 :param start_path:
43 43 :return:
44 44 """
45 45
46 46 total_files = 0
47 47 total_size = 0
48 48 directory_stats = {}
49 49
50 50 for dir_path, dir_names, file_names in os.walk(start_path):
51 51 dir_size = 0
52 52 file_count = len(file_names)
53 53
54 54 for file in file_names:
55 55 filepath = os.path.join(dir_path, file)
56 56 file_size = os.path.getsize(filepath)
57 57 dir_size += file_size
58 58
59 59 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
60 60 total_files += file_count
61 61 total_size += dir_size
62 62
63 63 return total_files, total_size, directory_stats
64 64
65 65
66 66 def format_size(size):
67 67 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
68 68 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
69 69 if size < 1024:
70 70 return f"{size:.2f} {unit}"
71 71 size /= 1024
General Comments 0
You need to be logged in to leave comments. Login now