##// END OF EJS Templates
chore(code-sync): synced code from ce for archive_cache
super-admin -
r1244:ecae6663 default
parent child Browse files
Show More
@@ -1,411 +1,426 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import codecs
19 19 import contextlib
20 20 import functools
21 21 import os
22 22 import logging
23 23 import time
24 24 import typing
25 25 import zlib
26 26 import sqlite3
27 27
28 from vcsserver.lib.rc_json import json
28 from ...ext_json import json
29 29 from .lock import GenerationLock
30 from .utils import format_size
30 31
31 32 log = logging.getLogger(__name__)
32 33
33 34 cache_meta = None
34 35
35 36 UNKNOWN = -241
36 37 NO_VAL = -917
37 38
38 39 MODE_BINARY = 'BINARY'
39 40
40 41
41 42 EVICTION_POLICY = {
42 43 'none': {
43 44 'evict': None,
44 45 },
45 46 'least-recently-stored': {
46 47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
47 48 },
48 49 'least-recently-used': {
49 50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
50 51 },
51 52 'least-frequently-used': {
52 53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
53 54 },
54 55 }
55 56
56 57
57 58 class DB:
58 59
59 60 def __init__(self):
60 61 self.connection = sqlite3.connect(':memory:')
61 62 self._init_db()
62 63
63 64 def _init_db(self):
64 65 qry = '''
65 66 CREATE TABLE IF NOT EXISTS archive_cache (
66 67 rowid INTEGER PRIMARY KEY,
67 68 key_file TEXT,
68 69 key_file_path TEXT,
69 70 filename TEXT,
70 71 full_path TEXT,
71 72 store_time REAL,
72 73 access_time REAL,
73 74 access_count INTEGER DEFAULT 0,
74 75 size INTEGER DEFAULT 0
75 76 )
76 77 '''
77 78
78 79 self.sql(qry)
79 80 self.connection.commit()
80 81
81 82 @property
82 83 def sql(self):
83 84 return self.connection.execute
84 85
85 86 def bulk_insert(self, rows):
86 87 qry = '''
87 88 INSERT INTO archive_cache (
88 89 rowid,
89 90 key_file,
90 91 key_file_path,
91 92 filename,
92 93 full_path,
93 94 store_time,
94 95 access_time,
95 96 access_count,
96 97 size
97 98 )
98 99 VALUES (
99 100 ?, ?, ?, ?, ?, ?, ?, ?, ?
100 101 )
101 102 '''
102 103 cursor = self.connection.cursor()
103 104 cursor.executemany(qry, rows)
104 105 self.connection.commit()
105 106
106 107
107 108 class FileSystemCache:
108 109
109 110 def __init__(self, index, directory, **settings):
110 111 self._index = index
111 112 self._directory = directory
112 113
113 114 def _write_file(self, full_path, iterator, mode, encoding=None):
114 115 full_dir, _ = os.path.split(full_path)
115 116
116 117 for count in range(1, 11):
117 118 with contextlib.suppress(OSError):
118 119 os.makedirs(full_dir)
119 120
120 121 try:
121 122 # Another cache may have deleted the directory before
122 123 # the file could be opened.
123 124 writer = open(full_path, mode, encoding=encoding)
124 125 except OSError:
125 126 if count == 10:
126 127 # Give up after 10 tries to open the file.
127 128 raise
128 129 continue
129 130
130 131 with writer:
131 132 size = 0
132 133 for chunk in iterator:
133 134 size += len(chunk)
134 135 writer.write(chunk)
135 136 return size
136 137
137 138 def _get_keyfile(self, key):
138 139 return os.path.join(self._directory, f'{key}.key')
139 140
140 141 def store(self, key, value_reader, metadata):
141 142 filename, full_path = self.random_filename()
142 143 key_file = self._get_keyfile(key)
143 144
144 145 # STORE METADATA
145 146 _metadata = {
146 147 "version": "v1",
147 148 "filename": filename,
148 149 "full_path": full_path,
149 150 "key_file": key_file,
150 151 "store_time": time.time(),
151 152 "access_count": 1,
152 153 "access_time": 0,
153 154 "size": 0
154 155 }
155 156 if metadata:
156 157 _metadata.update(metadata)
157 158
158 159 reader = functools.partial(value_reader.read, 2**22)
159 160
160 161 iterator = iter(reader, b'')
161 162 size = self._write_file(full_path, iterator, 'xb')
162 163 metadata['size'] = size
163 164
164 165 # after archive is finished, we create a key to save the presence of the binary file
165 166 with open(key_file, 'wb') as f:
166 167 f.write(json.dumps(_metadata))
167 168
168 169 return key, size, MODE_BINARY, filename, _metadata
169 170
170 171 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
171 172 if key not in self:
172 173 raise KeyError(key)
173 174
174 175 key_file = self._get_keyfile(key)
175 176 with open(key_file, 'rb') as f:
176 177 metadata = json.loads(f.read())
177 178
178 179 filename = metadata['filename']
179 180
180 181 try:
181 182 return open(os.path.join(self._directory, filename), 'rb'), metadata
182 183 finally:
183 184 # update usage stats, count and accessed
184 185 metadata["access_count"] = metadata.get("access_count", 0) + 1
185 186 metadata["access_time"] = time.time()
186 187
187 188 with open(key_file, 'wb') as f:
188 189 f.write(json.dumps(metadata))
189 190
190 191 def random_filename(self):
191 192 """Return filename and full-path tuple for file storage.
192 193
193 194 Filename will be a randomly generated 28 character hexadecimal string
194 195 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
195 196 reduce the size of directories. On older filesystems, lookups in
196 197 directories with many files may be slow.
197 198 """
198 199
199 200 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
200 201 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
201 202 name = hex_name[4:] + '.archive_cache'
202 203 filename = os.path.join(sub_dir, name)
203 204 full_path = os.path.join(self._directory, filename)
204 205 return filename, full_path
205 206
206 207 def hash(self, key):
207 208 """Compute portable hash for `key`.
208 209
209 210 :param key: key to hash
210 211 :return: hash value
211 212
212 213 """
213 214 mask = 0xFFFFFFFF
214 215 return zlib.adler32(key.encode('utf-8')) & mask # noqa
215 216
216 217 def __contains__(self, key):
217 218 """Return `True` if `key` matching item is found in cache.
218 219
219 220 :param key: key matching item
220 221 :return: True if key matching item
221 222
222 223 """
223 224 key_file = self._get_keyfile(key)
224 225 return os.path.exists(key_file)
225 226
226 227
227 228 class FanoutCache:
228 229 """Cache that shards keys and values."""
229 230
230 231 def __init__(
231 232 self, directory=None, **settings
232 233 ):
233 234 """Initialize cache instance.
234 235
235 236 :param str directory: cache directory
236 237 :param settings: settings dict
237 238
238 239 """
239 240 if directory is None:
240 241 raise ValueError('directory cannot be None')
241 242
242 243 directory = str(directory)
243 244 directory = os.path.expanduser(directory)
244 245 directory = os.path.expandvars(directory)
245 246 self._directory = directory
246 247
247 248 self._count = settings.pop('cache_shards')
248 249 self._locking_url = settings.pop('locking_url')
249 250
250 251 self._eviction_policy = settings['cache_eviction_policy']
251 252 self._cache_size_limit = settings['cache_size_limit']
252 253
253 254 self._shards = tuple(
254 255 FileSystemCache(
255 256 index=num,
256 257 directory=os.path.join(directory, 'shard_%03d' % num),
257 258 **settings,
258 259 )
259 260 for num in range(self._count)
260 261 )
261 262 self._hash = self._shards[0].hash
262 263
263 264 def get_lock(self, lock_key):
264 265 return GenerationLock(lock_key, self._locking_url)
265 266
266 267 def _get_shard(self, key) -> FileSystemCache:
267 268 index = self._hash(key) % self._count
268 269 shard = self._shards[index]
269 270 return shard
270 271
271 272 def store(self, key, value_reader, metadata=None):
272 273 shard = self._get_shard(key)
273 274 return shard.store(key, value_reader, metadata)
274 275
275 276 def fetch(self, key):
276 277 """Return file handle corresponding to `key` from cache.
277 278 """
278 279 shard = self._get_shard(key)
279 280 return shard.fetch(key)
280 281
281 282 def has_key(self, key):
282 283 """Return `True` if `key` matching item is found in cache.
283 284
284 285 :param key: key for item
285 286 :return: True if key is found
286 287
287 288 """
288 289 shard = self._get_shard(key)
289 290 return key in shard
290 291
291 292 def __contains__(self, item):
292 293 return self.has_key(item)
293 294
294 295 def evict(self, policy=None, size_limit=None):
295 296 """
296 297 Remove old items based on the conditions
297 298
298 299
299 300 explanation of this algo:
300 301 iterate over each shard, then for each shard iterate over the .key files
301 302 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
302 303 access data, time creation, and access counts.
303 304
304 305 Store that into a memory DB so we can run different sorting strategies easily.
305 306 Summing the size is a sum sql query.
306 307
307 308 Then we run a sorting strategy based on eviction policy.
308 309 We iterate over sorted keys, and remove each checking if we hit the overall limit.
309 310 """
310 311
311 312 policy = policy or self._eviction_policy
312 313 size_limit = size_limit or self._cache_size_limit
313 314
314 315 select_policy = EVICTION_POLICY[policy]['evict']
315 316
317 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
318 policy, format_size(size_limit))
319
316 320 if select_policy is None:
317 321 return 0
318 322
319 323 db = DB()
320 324
321 325 data = []
322 326 cnt = 1
323 327 for shard in self._shards:
324 328 for key_file in os.listdir(shard._directory):
325 329 if key_file.endswith('.key'):
326 330 key_file_path = os.path.join(shard._directory, key_file)
327 331 with open(key_file_path, 'rb') as f:
328 332 metadata = json.loads(f.read())
329 # in case we don't have size re-calc it...
330 if not metadata.get('size'):
331 fn = metadata.get('full_path')
332 size = os.stat(fn).st_size
333
334 size = metadata.get('size')
335 filename = metadata.get('filename')
336 full_path = metadata.get('full_path')
337
338 if not size:
339 # in case we don't have size re-calc it...
340 size = os.stat(full_path).st_size
333 341
334 342 data.append([
335 343 cnt,
336 344 key_file,
337 345 key_file_path,
338 metadata.get('filename'),
339 metadata.get('full_path'),
346 filename,
347 full_path,
340 348 metadata.get('store_time', 0),
341 349 metadata.get('access_time', 0),
342 350 metadata.get('access_count', 0),
343 metadata.get('size', size),
351 size,
344 352 ])
345 353 cnt += 1
346 354
347 355 # Insert bulk data using executemany
348 356 db.bulk_insert(data)
349 357
350 358 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
351
359 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
352 360 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
353 361 sorted_keys = db.sql(select_policy_qry).fetchall()
354 362
363 removed_items = 0
364 removed_size = 0
355 365 for key, cached_file, size in sorted_keys:
356 366 # simulate removal impact BEFORE removal
357 367 total_size -= size
368
358 369 if total_size <= size_limit:
359 370 # we obtained what we wanted...
360 371 break
361 372
362 373 os.remove(cached_file)
363 374 os.remove(key)
364 return
375 removed_items += 1
376 removed_size += size
377
378 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
379 return removed_items
365 380
366 381
367 382 def get_archival_config(config):
368 383
369 384 final_config = {
370 385
371 386 }
372 387
373 388 for k, v in config.items():
374 389 if k.startswith('archive_cache'):
375 390 final_config[k] = v
376 391
377 392 return final_config
378 393
379 394
380 395 def get_archival_cache_store(config):
381 396
382 397 global cache_meta
383 398 if cache_meta is not None:
384 399 return cache_meta
385 400
386 401 config = get_archival_config(config)
387 402 backend = config['archive_cache.backend.type']
388 403 if backend != 'filesystem':
389 404 raise ValueError('archive_cache.backend.type only supports "filesystem"')
390 405
391 406 archive_cache_locking_url = config['archive_cache.locking.url']
392 407 archive_cache_dir = config['archive_cache.filesystem.store_dir']
393 408 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
394 409 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
395 410 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
396 411
397 412 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
398 413
399 414 # check if it's ok to write, and re-create the archive cache
400 415 if not os.path.isdir(archive_cache_dir):
401 416 os.makedirs(archive_cache_dir, exist_ok=True)
402 417
403 418 d_cache = FanoutCache(
404 419 archive_cache_dir,
405 420 locking_url=archive_cache_locking_url,
406 421 cache_shards=archive_cache_shards,
407 422 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
408 423 cache_eviction_policy=archive_cache_eviction_policy
409 424 )
410 425 cache_meta = d_cache
411 426 return cache_meta
@@ -1,59 +1,58 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import redis
19 from vcsserver.lib._vendor import redis_lock
20
19 from ..._vendor import redis_lock
21 20 from .utils import ArchiveCacheLock
22 21
23 22
24 23 class GenerationLock:
25 24 """
26 25 Locking mechanism that detects if a lock is acquired
27 26
28 27 with GenerationLock(lock_key):
29 28 compute_archive()
30 29 """
31 30 lock_timeout = 7200
32 31
33 32 def __init__(self, lock_key, url):
34 33 self.lock_key = lock_key
35 34 self._create_client(url)
36 35 self.lock = self.get_lock()
37 36
38 37 def _create_client(self, url):
39 38 connection_pool = redis.ConnectionPool.from_url(url)
40 39 self.writer_client = redis.StrictRedis(
41 40 connection_pool=connection_pool
42 41 )
43 42 self.reader_client = self.writer_client
44 43
45 44 def get_lock(self):
46 45 return redis_lock.Lock(
47 46 redis_client=self.writer_client,
48 47 name=self.lock_key,
49 48 expire=self.lock_timeout,
50 49 strict=True
51 50 )
52 51
53 52 def __enter__(self):
54 53 acquired = self.lock.acquire(blocking=False)
55 54 if not acquired:
56 55 raise ArchiveCacheLock('Failed to create a lock')
57 56
58 57 def __exit__(self, exc_type, exc_val, exc_tb):
59 58 self.lock.release()
General Comments 0
You need to be logged in to leave comments. Login now