##// END OF EJS Templates
feat(archive-cache): added logging and relative imports
super-admin -
r5424:035c4f28 default
parent child Browse files
Show More
@@ -1,412 +1,423 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import contextlib
21 21 import functools
22 22 import os
23 23 import logging
24 24 import time
25 25 import typing
26 26 import zlib
27 27 import sqlite3
28 28
29 from rhodecode.lib.ext_json import json
29 from ...ext_json import json
30 30 from .lock import GenerationLock
31 from .utils import format_size
31 32
32 33 log = logging.getLogger(__name__)
33 34
34 35 cache_meta = None
35 36
36 37 UNKNOWN = -241
37 38 NO_VAL = -917
38 39
39 40 MODE_BINARY = 'BINARY'
40 41
41 42
42 43 EVICTION_POLICY = {
43 44 'none': {
44 45 'evict': None,
45 46 },
46 47 'least-recently-stored': {
47 48 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 49 },
49 50 'least-recently-used': {
50 51 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 52 },
52 53 'least-frequently-used': {
53 54 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 55 },
55 56 }
56 57
57 58
58 59 class DB:
59 60
60 61 def __init__(self):
61 62 self.connection = sqlite3.connect(':memory:')
62 63 self._init_db()
63 64
64 65 def _init_db(self):
65 66 qry = '''
66 67 CREATE TABLE IF NOT EXISTS archive_cache (
67 68 rowid INTEGER PRIMARY KEY,
68 69 key_file TEXT,
69 70 key_file_path TEXT,
70 71 filename TEXT,
71 72 full_path TEXT,
72 73 store_time REAL,
73 74 access_time REAL,
74 75 access_count INTEGER DEFAULT 0,
75 76 size INTEGER DEFAULT 0
76 77 )
77 78 '''
78 79
79 80 self.sql(qry)
80 81 self.connection.commit()
81 82
82 83 @property
83 84 def sql(self):
84 85 return self.connection.execute
85 86
86 87 def bulk_insert(self, rows):
87 88 qry = '''
88 89 INSERT INTO archive_cache (
89 90 rowid,
90 91 key_file,
91 92 key_file_path,
92 93 filename,
93 94 full_path,
94 95 store_time,
95 96 access_time,
96 97 access_count,
97 98 size
98 99 )
99 100 VALUES (
100 101 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 102 )
102 103 '''
103 104 cursor = self.connection.cursor()
104 105 cursor.executemany(qry, rows)
105 106 self.connection.commit()
106 107
107 108
108 109 class FileSystemCache:
109 110
110 111 def __init__(self, index, directory, **settings):
111 112 self._index = index
112 113 self._directory = directory
113 114
114 115 def _write_file(self, full_path, iterator, mode, encoding=None):
115 116 full_dir, _ = os.path.split(full_path)
116 117
117 118 for count in range(1, 11):
118 119 with contextlib.suppress(OSError):
119 120 os.makedirs(full_dir)
120 121
121 122 try:
122 123 # Another cache may have deleted the directory before
123 124 # the file could be opened.
124 125 writer = open(full_path, mode, encoding=encoding)
125 126 except OSError:
126 127 if count == 10:
127 128 # Give up after 10 tries to open the file.
128 129 raise
129 130 continue
130 131
131 132 with writer:
132 133 size = 0
133 134 for chunk in iterator:
134 135 size += len(chunk)
135 136 writer.write(chunk)
136 137 return size
137 138
138 139 def _get_keyfile(self, key):
139 140 return os.path.join(self._directory, f'{key}.key')
140 141
141 142 def store(self, key, value_reader, metadata):
142 143 filename, full_path = self.random_filename()
143 144 key_file = self._get_keyfile(key)
144 145
145 146 # STORE METADATA
146 147 _metadata = {
147 148 "version": "v1",
148 149 "filename": filename,
149 150 "full_path": full_path,
150 151 "key_file": key_file,
151 152 "store_time": time.time(),
152 153 "access_count": 1,
153 154 "access_time": 0,
154 155 "size": 0
155 156 }
156 157 if metadata:
157 158 _metadata.update(metadata)
158 159
159 160 reader = functools.partial(value_reader.read, 2**22)
160 161
161 162 iterator = iter(reader, b'')
162 163 size = self._write_file(full_path, iterator, 'xb')
163 164 metadata['size'] = size
164 165
165 166 # after archive is finished, we create a key to save the presence of the binary file
166 167 with open(key_file, 'wb') as f:
167 168 f.write(json.dumps(_metadata))
168 169
169 170 return key, size, MODE_BINARY, filename, _metadata
170 171
171 172 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
172 173 if key not in self:
173 174 raise KeyError(key)
174 175
175 176 key_file = self._get_keyfile(key)
176 177 with open(key_file, 'rb') as f:
177 178 metadata = json.loads(f.read())
178 179
179 180 filename = metadata['filename']
180 181
181 182 try:
182 183 return open(os.path.join(self._directory, filename), 'rb'), metadata
183 184 finally:
184 185 # update usage stats, count and accessed
185 186 metadata["access_count"] = metadata.get("access_count", 0) + 1
186 187 metadata["access_time"] = time.time()
187 188
188 189 with open(key_file, 'wb') as f:
189 190 f.write(json.dumps(metadata))
190 191
191 192 def random_filename(self):
192 193 """Return filename and full-path tuple for file storage.
193 194
194 195 Filename will be a randomly generated 28 character hexadecimal string
195 196 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
196 197 reduce the size of directories. On older filesystems, lookups in
197 198 directories with many files may be slow.
198 199 """
199 200
200 201 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
201 202 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
202 203 name = hex_name[4:] + '.archive_cache'
203 204 filename = os.path.join(sub_dir, name)
204 205 full_path = os.path.join(self._directory, filename)
205 206 return filename, full_path
206 207
207 208 def hash(self, key):
208 209 """Compute portable hash for `key`.
209 210
210 211 :param key: key to hash
211 212 :return: hash value
212 213
213 214 """
214 215 mask = 0xFFFFFFFF
215 216 return zlib.adler32(key.encode('utf-8')) & mask # noqa
216 217
217 218 def __contains__(self, key):
218 219 """Return `True` if `key` matching item is found in cache.
219 220
220 221 :param key: key matching item
221 222 :return: True if key matching item
222 223
223 224 """
224 225 key_file = self._get_keyfile(key)
225 226 return os.path.exists(key_file)
226 227
227 228
228 229 class FanoutCache:
229 230 """Cache that shards keys and values."""
230 231
231 232 def __init__(
232 233 self, directory=None, **settings
233 234 ):
234 235 """Initialize cache instance.
235 236
236 237 :param str directory: cache directory
237 238 :param settings: settings dict
238 239
239 240 """
240 241 if directory is None:
241 242 raise ValueError('directory cannot be None')
242 243
243 244 directory = str(directory)
244 245 directory = os.path.expanduser(directory)
245 246 directory = os.path.expandvars(directory)
246 247 self._directory = directory
247 248
248 249 self._count = settings.pop('cache_shards')
249 250 self._locking_url = settings.pop('locking_url')
250 251
251 252 self._eviction_policy = settings['cache_eviction_policy']
252 253 self._cache_size_limit = settings['cache_size_limit']
253 254
254 255 self._shards = tuple(
255 256 FileSystemCache(
256 257 index=num,
257 258 directory=os.path.join(directory, 'shard_%03d' % num),
258 259 **settings,
259 260 )
260 261 for num in range(self._count)
261 262 )
262 263 self._hash = self._shards[0].hash
263 264
264 265 def get_lock(self, lock_key):
265 266 return GenerationLock(lock_key, self._locking_url)
266 267
267 268 def _get_shard(self, key) -> FileSystemCache:
268 269 index = self._hash(key) % self._count
269 270 shard = self._shards[index]
270 271 return shard
271 272
272 273 def store(self, key, value_reader, metadata=None):
273 274 shard = self._get_shard(key)
274 275 return shard.store(key, value_reader, metadata)
275 276
276 277 def fetch(self, key):
277 278 """Return file handle corresponding to `key` from cache.
278 279 """
279 280 shard = self._get_shard(key)
280 281 return shard.fetch(key)
281 282
282 283 def has_key(self, key):
283 284 """Return `True` if `key` matching item is found in cache.
284 285
285 286 :param key: key for item
286 287 :return: True if key is found
287 288
288 289 """
289 290 shard = self._get_shard(key)
290 291 return key in shard
291 292
292 293 def __contains__(self, item):
293 294 return self.has_key(item)
294 295
295 296 def evict(self, policy=None, size_limit=None):
296 297 """
297 298 Remove old items based on the conditions
298 299
299 300
300 301 explanation of this algo:
301 302 iterate over each shard, then for each shard iterate over the .key files
302 303 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
303 304 access data, time creation, and access counts.
304 305
305 306 Store that into a memory DB so we can run different sorting strategies easily.
306 307 Summing the size is a sum sql query.
307 308
308 309 Then we run a sorting strategy based on eviction policy.
309 310 We iterate over sorted keys, and remove each checking if we hit the overall limit.
310 311 """
311 312
312 313 policy = policy or self._eviction_policy
313 314 size_limit = size_limit or self._cache_size_limit
314 315
315 316 select_policy = EVICTION_POLICY[policy]['evict']
316 317
318 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
319 policy, format_size(size_limit))
320
317 321 if select_policy is None:
318 322 return 0
319 323
320 324 db = DB()
321 325
322 326 data = []
323 327 cnt = 1
324 328 for shard in self._shards:
325 329 for key_file in os.listdir(shard._directory):
326 330 if key_file.endswith('.key'):
327 331 key_file_path = os.path.join(shard._directory, key_file)
328 332 with open(key_file_path, 'rb') as f:
329 333 metadata = json.loads(f.read())
330 334 # in case we don't have size re-calc it...
331 335 if not metadata.get('size'):
332 336 fn = metadata.get('full_path')
333 337 size = os.stat(fn).st_size
334 338
335 339 data.append([
336 340 cnt,
337 341 key_file,
338 342 key_file_path,
339 343 metadata.get('filename'),
340 344 metadata.get('full_path'),
341 345 metadata.get('store_time', 0),
342 346 metadata.get('access_time', 0),
343 347 metadata.get('access_count', 0),
344 348 metadata.get('size', size),
345 349 ])
346 350 cnt += 1
347 351
348 352 # Insert bulk data using executemany
349 353 db.bulk_insert(data)
350 354
351 355 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
352
356 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
353 357 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
354 358 sorted_keys = db.sql(select_policy_qry).fetchall()
355 359
360 removed_items = 0
361 removed_size = 0
356 362 for key, cached_file, size in sorted_keys:
357 363 # simulate removal impact BEFORE removal
358 364 total_size -= size
365
359 366 if total_size <= size_limit:
360 367 # we obtained what we wanted...
361 368 break
362 369
363 370 os.remove(cached_file)
364 371 os.remove(key)
365 return
372 removed_items += 1
373 removed_size += size
374
375 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
376 return removed_items
366 377
367 378
368 379 def get_archival_config(config):
369 380
370 381 final_config = {
371 382
372 383 }
373 384
374 385 for k, v in config.items():
375 386 if k.startswith('archive_cache'):
376 387 final_config[k] = v
377 388
378 389 return final_config
379 390
380 391
381 392 def get_archival_cache_store(config):
382 393
383 394 global cache_meta
384 395 if cache_meta is not None:
385 396 return cache_meta
386 397
387 398 config = get_archival_config(config)
388 399 backend = config['archive_cache.backend.type']
389 400 if backend != 'filesystem':
390 401 raise ValueError('archive_cache.backend.type only supports "filesystem"')
391 402
392 403 archive_cache_locking_url = config['archive_cache.locking.url']
393 404 archive_cache_dir = config['archive_cache.filesystem.store_dir']
394 405 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
395 406 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
396 407 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
397 408
398 409 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
399 410
400 411 # check if it's ok to write, and re-create the archive cache
401 412 if not os.path.isdir(archive_cache_dir):
402 413 os.makedirs(archive_cache_dir, exist_ok=True)
403 414
404 415 d_cache = FanoutCache(
405 416 archive_cache_dir,
406 417 locking_url=archive_cache_locking_url,
407 418 cache_shards=archive_cache_shards,
408 419 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
409 420 cache_eviction_policy=archive_cache_eviction_policy
410 421 )
411 422 cache_meta = d_cache
412 423 return cache_meta
@@ -1,60 +1,59 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import redis
20 from rhodecode.lib._vendor import redis_lock
21
20 from ..._vendor import redis_lock
22 21 from .utils import ArchiveCacheLock
23 22
24 23
25 24 class GenerationLock:
26 25 """
27 26 Locking mechanism that detects if a lock is acquired
28 27
29 28 with GenerationLock(lock_key):
30 29 compute_archive()
31 30 """
32 31 lock_timeout = 7200
33 32
34 33 def __init__(self, lock_key, url):
35 34 self.lock_key = lock_key
36 35 self._create_client(url)
37 36 self.lock = self.get_lock()
38 37
39 38 def _create_client(self, url):
40 39 connection_pool = redis.ConnectionPool.from_url(url)
41 40 self.writer_client = redis.StrictRedis(
42 41 connection_pool=connection_pool
43 42 )
44 43 self.reader_client = self.writer_client
45 44
46 45 def get_lock(self):
47 46 return redis_lock.Lock(
48 47 redis_client=self.writer_client,
49 48 name=self.lock_key,
50 49 expire=self.lock_timeout,
51 50 strict=True
52 51 )
53 52
54 53 def __enter__(self):
55 54 acquired = self.lock.acquire(blocking=False)
56 55 if not acquired:
57 56 raise ArchiveCacheLock('Failed to create a lock')
58 57
59 58 def __exit__(self, exc_type, exc_val, exc_tb):
60 59 self.lock.release()
General Comments 0
You need to be logged in to leave comments. Login now