##// END OF EJS Templates
feat(archive-cache): implemented eviction policy logic
super-admin -
r5423:7155ef47 default
parent child Browse files
Show More
@@ -1,278 +1,412 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import contextlib
21 21 import functools
22 22 import os
23 23 import logging
24 24 import time
25 25 import typing
26 26 import zlib
27 import sqlite3
27 28
28 29 from rhodecode.lib.ext_json import json
29 30 from .lock import GenerationLock
30 31
31 32 log = logging.getLogger(__name__)
32 33
33 34 cache_meta = None
34 35
35 36 UNKNOWN = -241
36 37 NO_VAL = -917
37 38
38 39 MODE_BINARY = 'BINARY'
39 40
40 41
42 EVICTION_POLICY = {
43 'none': {
44 'evict': None,
45 },
46 'least-recently-stored': {
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 },
49 'least-recently-used': {
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 },
52 'least-frequently-used': {
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 },
55 }
56
57
58 class DB:
59
60 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
63
64 def _init_db(self):
65 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
69 key_file_path TEXT,
70 filename TEXT,
71 full_path TEXT,
72 store_time REAL,
73 access_time REAL,
74 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
76 )
77 '''
78
79 self.sql(qry)
80 self.connection.commit()
81
82 @property
83 def sql(self):
84 return self.connection.execute
85
86 def bulk_insert(self, rows):
87 qry = '''
88 INSERT INTO archive_cache (
89 rowid,
90 key_file,
91 key_file_path,
92 filename,
93 full_path,
94 store_time,
95 access_time,
96 access_count,
97 size
98 )
99 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
102 '''
103 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
105 self.connection.commit()
106
107
41 108 class FileSystemCache:
42 109
43 110 def __init__(self, index, directory, **settings):
44 111 self._index = index
45 112 self._directory = directory
46 113
47 114 def _write_file(self, full_path, iterator, mode, encoding=None):
48 115 full_dir, _ = os.path.split(full_path)
49 116
50 117 for count in range(1, 11):
51 118 with contextlib.suppress(OSError):
52 119 os.makedirs(full_dir)
53 120
54 121 try:
55 122 # Another cache may have deleted the directory before
56 123 # the file could be opened.
57 124 writer = open(full_path, mode, encoding=encoding)
58 125 except OSError:
59 126 if count == 10:
60 127 # Give up after 10 tries to open the file.
61 128 raise
62 129 continue
63 130
64 131 with writer:
65 132 size = 0
66 133 for chunk in iterator:
67 134 size += len(chunk)
68 135 writer.write(chunk)
69 136 return size
70 137
71 138 def _get_keyfile(self, key):
72 139 return os.path.join(self._directory, f'{key}.key')
73 140
74 141 def store(self, key, value_reader, metadata):
75 142 filename, full_path = self.random_filename()
76 143 key_file = self._get_keyfile(key)
77 144
78 145 # STORE METADATA
79 146 _metadata = {
80 147 "version": "v1",
81 148 "filename": filename,
82 149 "full_path": full_path,
83 150 "key_file": key_file,
84 151 "store_time": time.time(),
85 152 "access_count": 1,
86 153 "access_time": 0,
87 154 "size": 0
88 155 }
89 156 if metadata:
90 157 _metadata.update(metadata)
91 158
92 159 reader = functools.partial(value_reader.read, 2**22)
93 160
94 161 iterator = iter(reader, b'')
95 162 size = self._write_file(full_path, iterator, 'xb')
96 163 metadata['size'] = size
97 164
98 165 # after archive is finished, we create a key to save the presence of the binary file
99 166 with open(key_file, 'wb') as f:
100 167 f.write(json.dumps(_metadata))
101 168
102 169 return key, size, MODE_BINARY, filename, _metadata
103 170
104 171 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
105 172 if key not in self:
106 173 raise KeyError(key)
107 174
108 175 key_file = self._get_keyfile(key)
109 176 with open(key_file, 'rb') as f:
110 177 metadata = json.loads(f.read())
111 178
112 179 filename = metadata['filename']
113 180
114 181 try:
115 182 return open(os.path.join(self._directory, filename), 'rb'), metadata
116 183 finally:
117 184 # update usage stats, count and accessed
118 185 metadata["access_count"] = metadata.get("access_count", 0) + 1
119 186 metadata["access_time"] = time.time()
120 187
121 188 with open(key_file, 'wb') as f:
122 189 f.write(json.dumps(metadata))
123 190
124 191 def random_filename(self):
125 192 """Return filename and full-path tuple for file storage.
126 193
127 194 Filename will be a randomly generated 28 character hexadecimal string
128 195 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
129 196 reduce the size of directories. On older filesystems, lookups in
130 197 directories with many files may be slow.
131 198 """
132 199
133 200 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
134 201 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
135 202 name = hex_name[4:] + '.archive_cache'
136 203 filename = os.path.join(sub_dir, name)
137 204 full_path = os.path.join(self._directory, filename)
138 205 return filename, full_path
139 206
140 207 def hash(self, key):
141 208 """Compute portable hash for `key`.
142 209
143 210 :param key: key to hash
144 211 :return: hash value
145 212
146 213 """
147 214 mask = 0xFFFFFFFF
148 215 return zlib.adler32(key.encode('utf-8')) & mask # noqa
149 216
150 217 def __contains__(self, key):
151 218 """Return `True` if `key` matching item is found in cache.
152 219
153 220 :param key: key matching item
154 221 :return: True if key matching item
155 222
156 223 """
157 224 key_file = self._get_keyfile(key)
158 225 return os.path.exists(key_file)
159 226
160 227
161 228 class FanoutCache:
162 229 """Cache that shards keys and values."""
163 230
164 231 def __init__(
165 232 self, directory=None, **settings
166 233 ):
167 234 """Initialize cache instance.
168 235
169 236 :param str directory: cache directory
170 237 :param settings: settings dict
171 238
172 239 """
173 240 if directory is None:
174 241 raise ValueError('directory cannot be None')
175 242
176 243 directory = str(directory)
177 244 directory = os.path.expanduser(directory)
178 245 directory = os.path.expandvars(directory)
179 246 self._directory = directory
180 247
181 248 self._count = settings.pop('cache_shards')
182 249 self._locking_url = settings.pop('locking_url')
183 250
184 251 self._eviction_policy = settings['cache_eviction_policy']
185 252 self._cache_size_limit = settings['cache_size_limit']
186 253
187 254 self._shards = tuple(
188 255 FileSystemCache(
189 256 index=num,
190 257 directory=os.path.join(directory, 'shard_%03d' % num),
191 258 **settings,
192 259 )
193 260 for num in range(self._count)
194 261 )
195 262 self._hash = self._shards[0].hash
196 263
197 264 def get_lock(self, lock_key):
198 265 return GenerationLock(lock_key, self._locking_url)
199 266
200 267 def _get_shard(self, key) -> FileSystemCache:
201 268 index = self._hash(key) % self._count
202 269 shard = self._shards[index]
203 270 return shard
204 271
205 272 def store(self, key, value_reader, metadata=None):
206 273 shard = self._get_shard(key)
207 274 return shard.store(key, value_reader, metadata)
208 275
209 276 def fetch(self, key):
210 277 """Return file handle corresponding to `key` from cache.
211 278 """
212 279 shard = self._get_shard(key)
213 280 return shard.fetch(key)
214 281
215 282 def has_key(self, key):
216 283 """Return `True` if `key` matching item is found in cache.
217 284
218 285 :param key: key for item
219 286 :return: True if key is found
220 287
221 288 """
222 289 shard = self._get_shard(key)
223 290 return key in shard
224 291
225 292 def __contains__(self, item):
226 293 return self.has_key(item)
227 294
228 def evict(self):
229 """Remove old items based on the conditions"""
230 # TODO: Implement this...
295 def evict(self, policy=None, size_limit=None):
296 """
297 Remove old items based on the conditions
298
299
300 explanation of this algo:
301 iterate over each shard, then for each shard iterate over the .key files
302 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
303 access data, time creation, and access counts.
304
305 Store that into a memory DB so we can run different sorting strategies easily.
306 Summing the size is a sum sql query.
307
308 Then we run a sorting strategy based on eviction policy.
309 We iterate over sorted keys, and remove each checking if we hit the overall limit.
310 """
311
312 policy = policy or self._eviction_policy
313 size_limit = size_limit or self._cache_size_limit
314
315 select_policy = EVICTION_POLICY[policy]['evict']
316
317 if select_policy is None:
318 return 0
319
320 db = DB()
321
322 data = []
323 cnt = 1
324 for shard in self._shards:
325 for key_file in os.listdir(shard._directory):
326 if key_file.endswith('.key'):
327 key_file_path = os.path.join(shard._directory, key_file)
328 with open(key_file_path, 'rb') as f:
329 metadata = json.loads(f.read())
330 # in case we don't have size re-calc it...
331 if not metadata.get('size'):
332 fn = metadata.get('full_path')
333 size = os.stat(fn).st_size
334
335 data.append([
336 cnt,
337 key_file,
338 key_file_path,
339 metadata.get('filename'),
340 metadata.get('full_path'),
341 metadata.get('store_time', 0),
342 metadata.get('access_time', 0),
343 metadata.get('access_count', 0),
344 metadata.get('size', size),
345 ])
346 cnt += 1
347
348 # Insert bulk data using executemany
349 db.bulk_insert(data)
350
351 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
352
353 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
354 sorted_keys = db.sql(select_policy_qry).fetchall()
355
356 for key, cached_file, size in sorted_keys:
357 # simulate removal impact BEFORE removal
358 total_size -= size
359 if total_size <= size_limit:
360 # we obtained what we wanted...
361 break
362
363 os.remove(cached_file)
364 os.remove(key)
231 365 return
232 366
233 367
234 368 def get_archival_config(config):
235 369
236 370 final_config = {
237 371
238 372 }
239 373
240 374 for k, v in config.items():
241 375 if k.startswith('archive_cache'):
242 376 final_config[k] = v
243 377
244 378 return final_config
245 379
246 380
247 381 def get_archival_cache_store(config):
248 382
249 383 global cache_meta
250 384 if cache_meta is not None:
251 385 return cache_meta
252 386
253 387 config = get_archival_config(config)
254 388 backend = config['archive_cache.backend.type']
255 389 if backend != 'filesystem':
256 390 raise ValueError('archive_cache.backend.type only supports "filesystem"')
257 391
258 392 archive_cache_locking_url = config['archive_cache.locking.url']
259 393 archive_cache_dir = config['archive_cache.filesystem.store_dir']
260 394 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
261 395 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
262 396 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
263 397
264 398 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
265 399
266 400 # check if it's ok to write, and re-create the archive cache
267 401 if not os.path.isdir(archive_cache_dir):
268 402 os.makedirs(archive_cache_dir, exist_ok=True)
269 403
270 404 d_cache = FanoutCache(
271 405 archive_cache_dir,
272 406 locking_url=archive_cache_locking_url,
273 407 cache_shards=archive_cache_shards,
274 408 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
275 409 cache_eviction_policy=archive_cache_eviction_policy
276 410 )
277 411 cache_meta = d_cache
278 412 return cache_meta
General Comments 0
You need to be logged in to leave comments. Login now