##// END OF EJS Templates
feat(archive-cache): implemented eviction policy logic
super-admin -
r5423:7155ef47 default
parent child Browse files
Show More
@@ -1,278 +1,412 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import contextlib
20 import contextlib
21 import functools
21 import functools
22 import os
22 import os
23 import logging
23 import logging
24 import time
24 import time
25 import typing
25 import typing
26 import zlib
26 import zlib
27 import sqlite3
27
28
28 from rhodecode.lib.ext_json import json
29 from rhodecode.lib.ext_json import json
29 from .lock import GenerationLock
30 from .lock import GenerationLock
30
31
31 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
32
33
33 cache_meta = None
34 cache_meta = None
34
35
35 UNKNOWN = -241
36 UNKNOWN = -241
36 NO_VAL = -917
37 NO_VAL = -917
37
38
38 MODE_BINARY = 'BINARY'
39 MODE_BINARY = 'BINARY'
39
40
40
41
42 EVICTION_POLICY = {
43 'none': {
44 'evict': None,
45 },
46 'least-recently-stored': {
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 },
49 'least-recently-used': {
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 },
52 'least-frequently-used': {
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 },
55 }
56
57
58 class DB:
59
60 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
63
64 def _init_db(self):
65 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
69 key_file_path TEXT,
70 filename TEXT,
71 full_path TEXT,
72 store_time REAL,
73 access_time REAL,
74 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
76 )
77 '''
78
79 self.sql(qry)
80 self.connection.commit()
81
82 @property
83 def sql(self):
84 return self.connection.execute
85
86 def bulk_insert(self, rows):
87 qry = '''
88 INSERT INTO archive_cache (
89 rowid,
90 key_file,
91 key_file_path,
92 filename,
93 full_path,
94 store_time,
95 access_time,
96 access_count,
97 size
98 )
99 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
102 '''
103 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
105 self.connection.commit()
106
107
41 class FileSystemCache:
108 class FileSystemCache:
42
109
43 def __init__(self, index, directory, **settings):
110 def __init__(self, index, directory, **settings):
44 self._index = index
111 self._index = index
45 self._directory = directory
112 self._directory = directory
46
113
47 def _write_file(self, full_path, iterator, mode, encoding=None):
114 def _write_file(self, full_path, iterator, mode, encoding=None):
48 full_dir, _ = os.path.split(full_path)
115 full_dir, _ = os.path.split(full_path)
49
116
50 for count in range(1, 11):
117 for count in range(1, 11):
51 with contextlib.suppress(OSError):
118 with contextlib.suppress(OSError):
52 os.makedirs(full_dir)
119 os.makedirs(full_dir)
53
120
54 try:
121 try:
55 # Another cache may have deleted the directory before
122 # Another cache may have deleted the directory before
56 # the file could be opened.
123 # the file could be opened.
57 writer = open(full_path, mode, encoding=encoding)
124 writer = open(full_path, mode, encoding=encoding)
58 except OSError:
125 except OSError:
59 if count == 10:
126 if count == 10:
60 # Give up after 10 tries to open the file.
127 # Give up after 10 tries to open the file.
61 raise
128 raise
62 continue
129 continue
63
130
64 with writer:
131 with writer:
65 size = 0
132 size = 0
66 for chunk in iterator:
133 for chunk in iterator:
67 size += len(chunk)
134 size += len(chunk)
68 writer.write(chunk)
135 writer.write(chunk)
69 return size
136 return size
70
137
71 def _get_keyfile(self, key):
138 def _get_keyfile(self, key):
72 return os.path.join(self._directory, f'{key}.key')
139 return os.path.join(self._directory, f'{key}.key')
73
140
74 def store(self, key, value_reader, metadata):
141 def store(self, key, value_reader, metadata):
75 filename, full_path = self.random_filename()
142 filename, full_path = self.random_filename()
76 key_file = self._get_keyfile(key)
143 key_file = self._get_keyfile(key)
77
144
78 # STORE METADATA
145 # STORE METADATA
79 _metadata = {
146 _metadata = {
80 "version": "v1",
147 "version": "v1",
81 "filename": filename,
148 "filename": filename,
82 "full_path": full_path,
149 "full_path": full_path,
83 "key_file": key_file,
150 "key_file": key_file,
84 "store_time": time.time(),
151 "store_time": time.time(),
85 "access_count": 1,
152 "access_count": 1,
86 "access_time": 0,
153 "access_time": 0,
87 "size": 0
154 "size": 0
88 }
155 }
89 if metadata:
156 if metadata:
90 _metadata.update(metadata)
157 _metadata.update(metadata)
91
158
92 reader = functools.partial(value_reader.read, 2**22)
159 reader = functools.partial(value_reader.read, 2**22)
93
160
94 iterator = iter(reader, b'')
161 iterator = iter(reader, b'')
95 size = self._write_file(full_path, iterator, 'xb')
162 size = self._write_file(full_path, iterator, 'xb')
96 metadata['size'] = size
163 metadata['size'] = size
97
164
98 # after archive is finished, we create a key to save the presence of the binary file
165 # after archive is finished, we create a key to save the presence of the binary file
99 with open(key_file, 'wb') as f:
166 with open(key_file, 'wb') as f:
100 f.write(json.dumps(_metadata))
167 f.write(json.dumps(_metadata))
101
168
102 return key, size, MODE_BINARY, filename, _metadata
169 return key, size, MODE_BINARY, filename, _metadata
103
170
104 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
171 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
105 if key not in self:
172 if key not in self:
106 raise KeyError(key)
173 raise KeyError(key)
107
174
108 key_file = self._get_keyfile(key)
175 key_file = self._get_keyfile(key)
109 with open(key_file, 'rb') as f:
176 with open(key_file, 'rb') as f:
110 metadata = json.loads(f.read())
177 metadata = json.loads(f.read())
111
178
112 filename = metadata['filename']
179 filename = metadata['filename']
113
180
114 try:
181 try:
115 return open(os.path.join(self._directory, filename), 'rb'), metadata
182 return open(os.path.join(self._directory, filename), 'rb'), metadata
116 finally:
183 finally:
117 # update usage stats, count and accessed
184 # update usage stats, count and accessed
118 metadata["access_count"] = metadata.get("access_count", 0) + 1
185 metadata["access_count"] = metadata.get("access_count", 0) + 1
119 metadata["access_time"] = time.time()
186 metadata["access_time"] = time.time()
120
187
121 with open(key_file, 'wb') as f:
188 with open(key_file, 'wb') as f:
122 f.write(json.dumps(metadata))
189 f.write(json.dumps(metadata))
123
190
124 def random_filename(self):
191 def random_filename(self):
125 """Return filename and full-path tuple for file storage.
192 """Return filename and full-path tuple for file storage.
126
193
127 Filename will be a randomly generated 28 character hexadecimal string
194 Filename will be a randomly generated 28 character hexadecimal string
128 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
195 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
129 reduce the size of directories. On older filesystems, lookups in
196 reduce the size of directories. On older filesystems, lookups in
130 directories with many files may be slow.
197 directories with many files may be slow.
131 """
198 """
132
199
133 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
200 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
134 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
201 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
135 name = hex_name[4:] + '.archive_cache'
202 name = hex_name[4:] + '.archive_cache'
136 filename = os.path.join(sub_dir, name)
203 filename = os.path.join(sub_dir, name)
137 full_path = os.path.join(self._directory, filename)
204 full_path = os.path.join(self._directory, filename)
138 return filename, full_path
205 return filename, full_path
139
206
140 def hash(self, key):
207 def hash(self, key):
141 """Compute portable hash for `key`.
208 """Compute portable hash for `key`.
142
209
143 :param key: key to hash
210 :param key: key to hash
144 :return: hash value
211 :return: hash value
145
212
146 """
213 """
147 mask = 0xFFFFFFFF
214 mask = 0xFFFFFFFF
148 return zlib.adler32(key.encode('utf-8')) & mask # noqa
215 return zlib.adler32(key.encode('utf-8')) & mask # noqa
149
216
150 def __contains__(self, key):
217 def __contains__(self, key):
151 """Return `True` if `key` matching item is found in cache.
218 """Return `True` if `key` matching item is found in cache.
152
219
153 :param key: key matching item
220 :param key: key matching item
154 :return: True if key matching item
221 :return: True if key matching item
155
222
156 """
223 """
157 key_file = self._get_keyfile(key)
224 key_file = self._get_keyfile(key)
158 return os.path.exists(key_file)
225 return os.path.exists(key_file)
159
226
160
227
161 class FanoutCache:
228 class FanoutCache:
162 """Cache that shards keys and values."""
229 """Cache that shards keys and values."""
163
230
164 def __init__(
231 def __init__(
165 self, directory=None, **settings
232 self, directory=None, **settings
166 ):
233 ):
167 """Initialize cache instance.
234 """Initialize cache instance.
168
235
169 :param str directory: cache directory
236 :param str directory: cache directory
170 :param settings: settings dict
237 :param settings: settings dict
171
238
172 """
239 """
173 if directory is None:
240 if directory is None:
174 raise ValueError('directory cannot be None')
241 raise ValueError('directory cannot be None')
175
242
176 directory = str(directory)
243 directory = str(directory)
177 directory = os.path.expanduser(directory)
244 directory = os.path.expanduser(directory)
178 directory = os.path.expandvars(directory)
245 directory = os.path.expandvars(directory)
179 self._directory = directory
246 self._directory = directory
180
247
181 self._count = settings.pop('cache_shards')
248 self._count = settings.pop('cache_shards')
182 self._locking_url = settings.pop('locking_url')
249 self._locking_url = settings.pop('locking_url')
183
250
184 self._eviction_policy = settings['cache_eviction_policy']
251 self._eviction_policy = settings['cache_eviction_policy']
185 self._cache_size_limit = settings['cache_size_limit']
252 self._cache_size_limit = settings['cache_size_limit']
186
253
187 self._shards = tuple(
254 self._shards = tuple(
188 FileSystemCache(
255 FileSystemCache(
189 index=num,
256 index=num,
190 directory=os.path.join(directory, 'shard_%03d' % num),
257 directory=os.path.join(directory, 'shard_%03d' % num),
191 **settings,
258 **settings,
192 )
259 )
193 for num in range(self._count)
260 for num in range(self._count)
194 )
261 )
195 self._hash = self._shards[0].hash
262 self._hash = self._shards[0].hash
196
263
197 def get_lock(self, lock_key):
264 def get_lock(self, lock_key):
198 return GenerationLock(lock_key, self._locking_url)
265 return GenerationLock(lock_key, self._locking_url)
199
266
200 def _get_shard(self, key) -> FileSystemCache:
267 def _get_shard(self, key) -> FileSystemCache:
201 index = self._hash(key) % self._count
268 index = self._hash(key) % self._count
202 shard = self._shards[index]
269 shard = self._shards[index]
203 return shard
270 return shard
204
271
205 def store(self, key, value_reader, metadata=None):
272 def store(self, key, value_reader, metadata=None):
206 shard = self._get_shard(key)
273 shard = self._get_shard(key)
207 return shard.store(key, value_reader, metadata)
274 return shard.store(key, value_reader, metadata)
208
275
209 def fetch(self, key):
276 def fetch(self, key):
210 """Return file handle corresponding to `key` from cache.
277 """Return file handle corresponding to `key` from cache.
211 """
278 """
212 shard = self._get_shard(key)
279 shard = self._get_shard(key)
213 return shard.fetch(key)
280 return shard.fetch(key)
214
281
215 def has_key(self, key):
282 def has_key(self, key):
216 """Return `True` if `key` matching item is found in cache.
283 """Return `True` if `key` matching item is found in cache.
217
284
218 :param key: key for item
285 :param key: key for item
219 :return: True if key is found
286 :return: True if key is found
220
287
221 """
288 """
222 shard = self._get_shard(key)
289 shard = self._get_shard(key)
223 return key in shard
290 return key in shard
224
291
225 def __contains__(self, item):
292 def __contains__(self, item):
226 return self.has_key(item)
293 return self.has_key(item)
227
294
228 def evict(self):
295 def evict(self, policy=None, size_limit=None):
229 """Remove old items based on the conditions"""
296 """
230 # TODO: Implement this...
297 Remove old items based on the conditions
298
299
300 explanation of this algo:
301 iterate over each shard, then for each shard iterate over the .key files
302 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
303 access data, time creation, and access counts.
304
305 Store that into a memory DB so we can run different sorting strategies easily.
306 Summing the size is a sum sql query.
307
308 Then we run a sorting strategy based on eviction policy.
309 We iterate over sorted keys, and remove each checking if we hit the overall limit.
310 """
311
312 policy = policy or self._eviction_policy
313 size_limit = size_limit or self._cache_size_limit
314
315 select_policy = EVICTION_POLICY[policy]['evict']
316
317 if select_policy is None:
318 return 0
319
320 db = DB()
321
322 data = []
323 cnt = 1
324 for shard in self._shards:
325 for key_file in os.listdir(shard._directory):
326 if key_file.endswith('.key'):
327 key_file_path = os.path.join(shard._directory, key_file)
328 with open(key_file_path, 'rb') as f:
329 metadata = json.loads(f.read())
330 # in case we don't have size re-calc it...
331 if not metadata.get('size'):
332 fn = metadata.get('full_path')
333 size = os.stat(fn).st_size
334
335 data.append([
336 cnt,
337 key_file,
338 key_file_path,
339 metadata.get('filename'),
340 metadata.get('full_path'),
341 metadata.get('store_time', 0),
342 metadata.get('access_time', 0),
343 metadata.get('access_count', 0),
344 metadata.get('size', size),
345 ])
346 cnt += 1
347
348 # Insert bulk data using executemany
349 db.bulk_insert(data)
350
351 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
352
353 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
354 sorted_keys = db.sql(select_policy_qry).fetchall()
355
356 for key, cached_file, size in sorted_keys:
357 # simulate removal impact BEFORE removal
358 total_size -= size
359 if total_size <= size_limit:
360 # we obtained what we wanted...
361 break
362
363 os.remove(cached_file)
364 os.remove(key)
231 return
365 return
232
366
233
367
234 def get_archival_config(config):
368 def get_archival_config(config):
235
369
236 final_config = {
370 final_config = {
237
371
238 }
372 }
239
373
240 for k, v in config.items():
374 for k, v in config.items():
241 if k.startswith('archive_cache'):
375 if k.startswith('archive_cache'):
242 final_config[k] = v
376 final_config[k] = v
243
377
244 return final_config
378 return final_config
245
379
246
380
247 def get_archival_cache_store(config):
381 def get_archival_cache_store(config):
248
382
249 global cache_meta
383 global cache_meta
250 if cache_meta is not None:
384 if cache_meta is not None:
251 return cache_meta
385 return cache_meta
252
386
253 config = get_archival_config(config)
387 config = get_archival_config(config)
254 backend = config['archive_cache.backend.type']
388 backend = config['archive_cache.backend.type']
255 if backend != 'filesystem':
389 if backend != 'filesystem':
256 raise ValueError('archive_cache.backend.type only supports "filesystem"')
390 raise ValueError('archive_cache.backend.type only supports "filesystem"')
257
391
258 archive_cache_locking_url = config['archive_cache.locking.url']
392 archive_cache_locking_url = config['archive_cache.locking.url']
259 archive_cache_dir = config['archive_cache.filesystem.store_dir']
393 archive_cache_dir = config['archive_cache.filesystem.store_dir']
260 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
394 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
261 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
395 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
262 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
396 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
263
397
264 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
398 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
265
399
266 # check if it's ok to write, and re-create the archive cache
400 # check if it's ok to write, and re-create the archive cache
267 if not os.path.isdir(archive_cache_dir):
401 if not os.path.isdir(archive_cache_dir):
268 os.makedirs(archive_cache_dir, exist_ok=True)
402 os.makedirs(archive_cache_dir, exist_ok=True)
269
403
270 d_cache = FanoutCache(
404 d_cache = FanoutCache(
271 archive_cache_dir,
405 archive_cache_dir,
272 locking_url=archive_cache_locking_url,
406 locking_url=archive_cache_locking_url,
273 cache_shards=archive_cache_shards,
407 cache_shards=archive_cache_shards,
274 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
408 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
275 cache_eviction_policy=archive_cache_eviction_policy
409 cache_eviction_policy=archive_cache_eviction_policy
276 )
410 )
277 cache_meta = d_cache
411 cache_meta = d_cache
278 return cache_meta
412 return cache_meta
General Comments 0
You need to be logged in to leave comments. Login now