##// END OF EJS Templates
archive-cache: synced with CE lib
super-admin -
r1242:8380b87c default
parent child Browse files
Show More
@@ -1,258 +1,411 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import codecs
19 19 import contextlib
20 20 import functools
21 21 import os
22 22 import logging
23 23 import time
24 24 import typing
25 25 import zlib
26 import sqlite3
26 27
27 28 from vcsserver.lib.rc_json import json
28 29 from .lock import GenerationLock
29 30
30 31 log = logging.getLogger(__name__)
31 32
32 33 cache_meta = None
33 34
34 35 UNKNOWN = -241
35 36 NO_VAL = -917
36 37
37 38 MODE_BINARY = 'BINARY'
38 39
39 40
41 EVICTION_POLICY = {
42 'none': {
43 'evict': None,
44 },
45 'least-recently-stored': {
46 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
47 },
48 'least-recently-used': {
49 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
50 },
51 'least-frequently-used': {
52 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
53 },
54 }
55
56
57 class DB:
58
59 def __init__(self):
60 self.connection = sqlite3.connect(':memory:')
61 self._init_db()
62
63 def _init_db(self):
64 qry = '''
65 CREATE TABLE IF NOT EXISTS archive_cache (
66 rowid INTEGER PRIMARY KEY,
67 key_file TEXT,
68 key_file_path TEXT,
69 filename TEXT,
70 full_path TEXT,
71 store_time REAL,
72 access_time REAL,
73 access_count INTEGER DEFAULT 0,
74 size INTEGER DEFAULT 0
75 )
76 '''
77
78 self.sql(qry)
79 self.connection.commit()
80
81 @property
82 def sql(self):
83 return self.connection.execute
84
85 def bulk_insert(self, rows):
86 qry = '''
87 INSERT INTO archive_cache (
88 rowid,
89 key_file,
90 key_file_path,
91 filename,
92 full_path,
93 store_time,
94 access_time,
95 access_count,
96 size
97 )
98 VALUES (
99 ?, ?, ?, ?, ?, ?, ?, ?, ?
100 )
101 '''
102 cursor = self.connection.cursor()
103 cursor.executemany(qry, rows)
104 self.connection.commit()
105
106
40 107 class FileSystemCache:
41 108
42 109 def __init__(self, index, directory, **settings):
43 110 self._index = index
44 111 self._directory = directory
45 112
46 113 def _write_file(self, full_path, iterator, mode, encoding=None):
47 114 full_dir, _ = os.path.split(full_path)
48 115
49 116 for count in range(1, 11):
50 117 with contextlib.suppress(OSError):
51 118 os.makedirs(full_dir)
52 119
53 120 try:
54 121 # Another cache may have deleted the directory before
55 122 # the file could be opened.
56 123 writer = open(full_path, mode, encoding=encoding)
57 124 except OSError:
58 125 if count == 10:
59 126 # Give up after 10 tries to open the file.
60 127 raise
61 128 continue
62 129
63 130 with writer:
64 131 size = 0
65 132 for chunk in iterator:
66 133 size += len(chunk)
67 134 writer.write(chunk)
68 135 return size
69 136
70 137 def _get_keyfile(self, key):
71 138 return os.path.join(self._directory, f'{key}.key')
72 139
73 140 def store(self, key, value_reader, metadata):
74 141 filename, full_path = self.random_filename()
75 142 key_file = self._get_keyfile(key)
76 143
77 144 # STORE METADATA
78 145 _metadata = {
79 146 "version": "v1",
80 "timestamp": time.time(),
81 147 "filename": filename,
82 148 "full_path": full_path,
83 149 "key_file": key_file,
150 "store_time": time.time(),
151 "access_count": 1,
152 "access_time": 0,
153 "size": 0
84 154 }
85 155 if metadata:
86 156 _metadata.update(metadata)
87 157
88 158 reader = functools.partial(value_reader.read, 2**22)
89 159
90 160 iterator = iter(reader, b'')
91 161 size = self._write_file(full_path, iterator, 'xb')
162 metadata['size'] = size
92 163
93 164 # after archive is finished, we create a key to save the presence of the binary file
94 165 with open(key_file, 'wb') as f:
95 166 f.write(json.dumps(_metadata))
96 167
97 168 return key, size, MODE_BINARY, filename, _metadata
98 169
99 170 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
100 171 if key not in self:
101 172 raise KeyError(key)
102 173
103 174 key_file = self._get_keyfile(key)
104 175 with open(key_file, 'rb') as f:
105 176 metadata = json.loads(f.read())
106 177
107 178 filename = metadata['filename']
108 179
180 try:
109 181 return open(os.path.join(self._directory, filename), 'rb'), metadata
182 finally:
183 # update usage stats, count and accessed
184 metadata["access_count"] = metadata.get("access_count", 0) + 1
185 metadata["access_time"] = time.time()
186
187 with open(key_file, 'wb') as f:
188 f.write(json.dumps(metadata))
110 189
111 190 def random_filename(self):
112 191 """Return filename and full-path tuple for file storage.
113 192
114 193 Filename will be a randomly generated 28 character hexadecimal string
115 194 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
116 195 reduce the size of directories. On older filesystems, lookups in
117 196 directories with many files may be slow.
118 197 """
119 198
120 199 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
121 200 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
122 201 name = hex_name[4:] + '.archive_cache'
123 202 filename = os.path.join(sub_dir, name)
124 203 full_path = os.path.join(self._directory, filename)
125 204 return filename, full_path
126 205
127 206 def hash(self, key):
128 207 """Compute portable hash for `key`.
129 208
130 209 :param key: key to hash
131 210 :return: hash value
132 211
133 212 """
134 213 mask = 0xFFFFFFFF
135 214 return zlib.adler32(key.encode('utf-8')) & mask # noqa
136 215
137 216 def __contains__(self, key):
138 217 """Return `True` if `key` matching item is found in cache.
139 218
140 219 :param key: key matching item
141 220 :return: True if key matching item
142 221
143 222 """
144 223 key_file = self._get_keyfile(key)
145 224 return os.path.exists(key_file)
146 225
147 226
148 227 class FanoutCache:
149 228 """Cache that shards keys and values."""
150 229
151 230 def __init__(
152 231 self, directory=None, **settings
153 232 ):
154 233 """Initialize cache instance.
155 234
156 235 :param str directory: cache directory
157 236 :param settings: settings dict
158 237
159 238 """
160 239 if directory is None:
161 240 raise ValueError('directory cannot be None')
162 241
163 242 directory = str(directory)
164 243 directory = os.path.expanduser(directory)
165 244 directory = os.path.expandvars(directory)
166 245 self._directory = directory
167 246
168 247 self._count = settings.pop('cache_shards')
169 248 self._locking_url = settings.pop('locking_url')
170 249
250 self._eviction_policy = settings['cache_eviction_policy']
251 self._cache_size_limit = settings['cache_size_limit']
252
171 253 self._shards = tuple(
172 254 FileSystemCache(
173 255 index=num,
174 256 directory=os.path.join(directory, 'shard_%03d' % num),
175 257 **settings,
176 258 )
177 259 for num in range(self._count)
178 260 )
179 261 self._hash = self._shards[0].hash
180 262
181 263 def get_lock(self, lock_key):
182 264 return GenerationLock(lock_key, self._locking_url)
183 265
184 266 def _get_shard(self, key) -> FileSystemCache:
185 267 index = self._hash(key) % self._count
186 268 shard = self._shards[index]
187 269 return shard
188 270
189 271 def store(self, key, value_reader, metadata=None):
190 272 shard = self._get_shard(key)
191 273 return shard.store(key, value_reader, metadata)
192 274
193 275 def fetch(self, key):
194 276 """Return file handle corresponding to `key` from cache.
195 277 """
196 278 shard = self._get_shard(key)
197 279 return shard.fetch(key)
198 280
199 281 def has_key(self, key):
200 282 """Return `True` if `key` matching item is found in cache.
201 283
202 284 :param key: key for item
203 285 :return: True if key is found
204 286
205 287 """
206 288 shard = self._get_shard(key)
207 289 return key in shard
208 290
209 291 def __contains__(self, item):
210 292 return self.has_key(item)
211 293
294 def evict(self, policy=None, size_limit=None):
295 """
296 Remove old items based on the conditions
297
298
299 explanation of this algo:
300 iterate over each shard, then for each shard iterate over the .key files
301 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
302 access data, time creation, and access counts.
303
304 Store that into a memory DB so we can run different sorting strategies easily.
305 Summing the size is a sum sql query.
306
307 Then we run a sorting strategy based on eviction policy.
308 We iterate over sorted keys, and remove each checking if we hit the overall limit.
309 """
310
311 policy = policy or self._eviction_policy
312 size_limit = size_limit or self._cache_size_limit
313
314 select_policy = EVICTION_POLICY[policy]['evict']
315
316 if select_policy is None:
317 return 0
318
319 db = DB()
320
321 data = []
322 cnt = 1
323 for shard in self._shards:
324 for key_file in os.listdir(shard._directory):
325 if key_file.endswith('.key'):
326 key_file_path = os.path.join(shard._directory, key_file)
327 with open(key_file_path, 'rb') as f:
328 metadata = json.loads(f.read())
329 # in case we don't have size re-calc it...
330 if not metadata.get('size'):
331 fn = metadata.get('full_path')
332 size = os.stat(fn).st_size
333
334 data.append([
335 cnt,
336 key_file,
337 key_file_path,
338 metadata.get('filename'),
339 metadata.get('full_path'),
340 metadata.get('store_time', 0),
341 metadata.get('access_time', 0),
342 metadata.get('access_count', 0),
343 metadata.get('size', size),
344 ])
345 cnt += 1
346
347 # Insert bulk data using executemany
348 db.bulk_insert(data)
349
350 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
351
352 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
353 sorted_keys = db.sql(select_policy_qry).fetchall()
354
355 for key, cached_file, size in sorted_keys:
356 # simulate removal impact BEFORE removal
357 total_size -= size
358 if total_size <= size_limit:
359 # we obtained what we wanted...
360 break
361
362 os.remove(cached_file)
363 os.remove(key)
364 return
365
212 366
213 367 def get_archival_config(config):
214 368
215 369 final_config = {
216 370
217 371 }
218 372
219 373 for k, v in config.items():
220 374 if k.startswith('archive_cache'):
221 375 final_config[k] = v
222 376
223 377 return final_config
224 378
225 379
226 380 def get_archival_cache_store(config):
227 381
228 382 global cache_meta
229 383 if cache_meta is not None:
230 384 return cache_meta
231 385
232 386 config = get_archival_config(config)
233 387 backend = config['archive_cache.backend.type']
234 388 if backend != 'filesystem':
235 389 raise ValueError('archive_cache.backend.type only supports "filesystem"')
236 390
237 391 archive_cache_locking_url = config['archive_cache.locking.url']
238 392 archive_cache_dir = config['archive_cache.filesystem.store_dir']
239 393 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
240 394 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
241 395 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
242 396
243 397 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
244 398
245 399 # check if it's ok to write, and re-create the archive cache
246 400 if not os.path.isdir(archive_cache_dir):
247 401 os.makedirs(archive_cache_dir, exist_ok=True)
248 402
249 403 d_cache = FanoutCache(
250 404 archive_cache_dir,
251 405 locking_url=archive_cache_locking_url,
252 406 cache_shards=archive_cache_shards,
253 407 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
254 408 cache_eviction_policy=archive_cache_eviction_policy
255 409 )
256 410 cache_meta = d_cache
257 411 return cache_meta
258
@@ -1,29 +1,71 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 import os
19
18 20
19 21 class ArchiveCacheLock(Exception):
20 22 pass
21 23
22 24
23 25 def archive_iterator(_reader, block_size: int = 4096 * 512):
24 26 # 4096 * 64 = 64KB
25 27 while 1:
26 28 data = _reader.read(block_size)
27 29 if not data:
28 30 break
29 31 yield data
32
33
34 def get_directory_statistics(start_path):
35 """
36 total_files, total_size, directory_stats = get_directory_statistics(start_path)
37
38 print(f"Directory statistics for: {start_path}\n")
39 print(f"Total files: {total_files}")
40 print(f"Total size: {format_size(total_size)}\n")
41
42 :param start_path:
43 :return:
44 """
45
46 total_files = 0
47 total_size = 0
48 directory_stats = {}
49
50 for dir_path, dir_names, file_names in os.walk(start_path):
51 dir_size = 0
52 file_count = len(file_names)
53
54 for file in file_names:
55 filepath = os.path.join(dir_path, file)
56 file_size = os.path.getsize(filepath)
57 dir_size += file_size
58
59 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
60 total_files += file_count
61 total_size += dir_size
62
63 return total_files, total_size, directory_stats
64
65
66 def format_size(size):
67 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
68 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
69 if size < 1024:
70 return f"{size:.2f} {unit}"
71 size /= 1024
General Comments 0
You need to be logged in to leave comments. Login now