##// END OF EJS Templates
archive-cache: synced with CE lib
super-admin -
r1242:8380b87c default
parent child Browse files
Show More
@@ -1,258 +1,411 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import codecs
18 import codecs
19 import contextlib
19 import contextlib
20 import functools
20 import functools
21 import os
21 import os
22 import logging
22 import logging
23 import time
23 import time
24 import typing
24 import typing
25 import zlib
25 import zlib
26 import sqlite3
26
27
27 from vcsserver.lib.rc_json import json
28 from vcsserver.lib.rc_json import json
28 from .lock import GenerationLock
29 from .lock import GenerationLock
29
30
30 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
31
32
32 cache_meta = None
33 cache_meta = None
33
34
34 UNKNOWN = -241
35 UNKNOWN = -241
35 NO_VAL = -917
36 NO_VAL = -917
36
37
37 MODE_BINARY = 'BINARY'
38 MODE_BINARY = 'BINARY'
38
39
39
40
41 EVICTION_POLICY = {
42 'none': {
43 'evict': None,
44 },
45 'least-recently-stored': {
46 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
47 },
48 'least-recently-used': {
49 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
50 },
51 'least-frequently-used': {
52 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
53 },
54 }
55
56
57 class DB:
58
59 def __init__(self):
60 self.connection = sqlite3.connect(':memory:')
61 self._init_db()
62
63 def _init_db(self):
64 qry = '''
65 CREATE TABLE IF NOT EXISTS archive_cache (
66 rowid INTEGER PRIMARY KEY,
67 key_file TEXT,
68 key_file_path TEXT,
69 filename TEXT,
70 full_path TEXT,
71 store_time REAL,
72 access_time REAL,
73 access_count INTEGER DEFAULT 0,
74 size INTEGER DEFAULT 0
75 )
76 '''
77
78 self.sql(qry)
79 self.connection.commit()
80
81 @property
82 def sql(self):
83 return self.connection.execute
84
85 def bulk_insert(self, rows):
86 qry = '''
87 INSERT INTO archive_cache (
88 rowid,
89 key_file,
90 key_file_path,
91 filename,
92 full_path,
93 store_time,
94 access_time,
95 access_count,
96 size
97 )
98 VALUES (
99 ?, ?, ?, ?, ?, ?, ?, ?, ?
100 )
101 '''
102 cursor = self.connection.cursor()
103 cursor.executemany(qry, rows)
104 self.connection.commit()
105
106
40 class FileSystemCache:
107 class FileSystemCache:
41
108
42 def __init__(self, index, directory, **settings):
109 def __init__(self, index, directory, **settings):
43 self._index = index
110 self._index = index
44 self._directory = directory
111 self._directory = directory
45
112
46 def _write_file(self, full_path, iterator, mode, encoding=None):
113 def _write_file(self, full_path, iterator, mode, encoding=None):
47 full_dir, _ = os.path.split(full_path)
114 full_dir, _ = os.path.split(full_path)
48
115
49 for count in range(1, 11):
116 for count in range(1, 11):
50 with contextlib.suppress(OSError):
117 with contextlib.suppress(OSError):
51 os.makedirs(full_dir)
118 os.makedirs(full_dir)
52
119
53 try:
120 try:
54 # Another cache may have deleted the directory before
121 # Another cache may have deleted the directory before
55 # the file could be opened.
122 # the file could be opened.
56 writer = open(full_path, mode, encoding=encoding)
123 writer = open(full_path, mode, encoding=encoding)
57 except OSError:
124 except OSError:
58 if count == 10:
125 if count == 10:
59 # Give up after 10 tries to open the file.
126 # Give up after 10 tries to open the file.
60 raise
127 raise
61 continue
128 continue
62
129
63 with writer:
130 with writer:
64 size = 0
131 size = 0
65 for chunk in iterator:
132 for chunk in iterator:
66 size += len(chunk)
133 size += len(chunk)
67 writer.write(chunk)
134 writer.write(chunk)
68 return size
135 return size
69
136
70 def _get_keyfile(self, key):
137 def _get_keyfile(self, key):
71 return os.path.join(self._directory, f'{key}.key')
138 return os.path.join(self._directory, f'{key}.key')
72
139
73 def store(self, key, value_reader, metadata):
140 def store(self, key, value_reader, metadata):
74 filename, full_path = self.random_filename()
141 filename, full_path = self.random_filename()
75 key_file = self._get_keyfile(key)
142 key_file = self._get_keyfile(key)
76
143
77 # STORE METADATA
144 # STORE METADATA
78 _metadata = {
145 _metadata = {
79 "version": "v1",
146 "version": "v1",
80 "timestamp": time.time(),
81 "filename": filename,
147 "filename": filename,
82 "full_path": full_path,
148 "full_path": full_path,
83 "key_file": key_file,
149 "key_file": key_file,
150 "store_time": time.time(),
151 "access_count": 1,
152 "access_time": 0,
153 "size": 0
84 }
154 }
85 if metadata:
155 if metadata:
86 _metadata.update(metadata)
156 _metadata.update(metadata)
87
157
88 reader = functools.partial(value_reader.read, 2**22)
158 reader = functools.partial(value_reader.read, 2**22)
89
159
90 iterator = iter(reader, b'')
160 iterator = iter(reader, b'')
91 size = self._write_file(full_path, iterator, 'xb')
161 size = self._write_file(full_path, iterator, 'xb')
162 metadata['size'] = size
92
163
93 # after archive is finished, we create a key to save the presence of the binary file
164 # after archive is finished, we create a key to save the presence of the binary file
94 with open(key_file, 'wb') as f:
165 with open(key_file, 'wb') as f:
95 f.write(json.dumps(_metadata))
166 f.write(json.dumps(_metadata))
96
167
97 return key, size, MODE_BINARY, filename, _metadata
168 return key, size, MODE_BINARY, filename, _metadata
98
169
99 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
170 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
100 if key not in self:
171 if key not in self:
101 raise KeyError(key)
172 raise KeyError(key)
102
173
103 key_file = self._get_keyfile(key)
174 key_file = self._get_keyfile(key)
104 with open(key_file, 'rb') as f:
175 with open(key_file, 'rb') as f:
105 metadata = json.loads(f.read())
176 metadata = json.loads(f.read())
106
177
107 filename = metadata['filename']
178 filename = metadata['filename']
108
179
109 return open(os.path.join(self._directory, filename), 'rb'), metadata
180 try:
181 return open(os.path.join(self._directory, filename), 'rb'), metadata
182 finally:
183 # update usage stats, count and accessed
184 metadata["access_count"] = metadata.get("access_count", 0) + 1
185 metadata["access_time"] = time.time()
186
187 with open(key_file, 'wb') as f:
188 f.write(json.dumps(metadata))
110
189
111 def random_filename(self):
190 def random_filename(self):
112 """Return filename and full-path tuple for file storage.
191 """Return filename and full-path tuple for file storage.
113
192
114 Filename will be a randomly generated 28 character hexadecimal string
193 Filename will be a randomly generated 28 character hexadecimal string
115 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
194 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
116 reduce the size of directories. On older filesystems, lookups in
195 reduce the size of directories. On older filesystems, lookups in
117 directories with many files may be slow.
196 directories with many files may be slow.
118 """
197 """
119
198
120 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
199 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
121 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
200 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
122 name = hex_name[4:] + '.archive_cache'
201 name = hex_name[4:] + '.archive_cache'
123 filename = os.path.join(sub_dir, name)
202 filename = os.path.join(sub_dir, name)
124 full_path = os.path.join(self._directory, filename)
203 full_path = os.path.join(self._directory, filename)
125 return filename, full_path
204 return filename, full_path
126
205
127 def hash(self, key):
206 def hash(self, key):
128 """Compute portable hash for `key`.
207 """Compute portable hash for `key`.
129
208
130 :param key: key to hash
209 :param key: key to hash
131 :return: hash value
210 :return: hash value
132
211
133 """
212 """
134 mask = 0xFFFFFFFF
213 mask = 0xFFFFFFFF
135 return zlib.adler32(key.encode('utf-8')) & mask # noqa
214 return zlib.adler32(key.encode('utf-8')) & mask # noqa
136
215
137 def __contains__(self, key):
216 def __contains__(self, key):
138 """Return `True` if `key` matching item is found in cache.
217 """Return `True` if `key` matching item is found in cache.
139
218
140 :param key: key matching item
219 :param key: key matching item
141 :return: True if key matching item
220 :return: True if key matching item
142
221
143 """
222 """
144 key_file = self._get_keyfile(key)
223 key_file = self._get_keyfile(key)
145 return os.path.exists(key_file)
224 return os.path.exists(key_file)
146
225
147
226
148 class FanoutCache:
227 class FanoutCache:
149 """Cache that shards keys and values."""
228 """Cache that shards keys and values."""
150
229
151 def __init__(
230 def __init__(
152 self, directory=None, **settings
231 self, directory=None, **settings
153 ):
232 ):
154 """Initialize cache instance.
233 """Initialize cache instance.
155
234
156 :param str directory: cache directory
235 :param str directory: cache directory
157 :param settings: settings dict
236 :param settings: settings dict
158
237
159 """
238 """
160 if directory is None:
239 if directory is None:
161 raise ValueError('directory cannot be None')
240 raise ValueError('directory cannot be None')
162
241
163 directory = str(directory)
242 directory = str(directory)
164 directory = os.path.expanduser(directory)
243 directory = os.path.expanduser(directory)
165 directory = os.path.expandvars(directory)
244 directory = os.path.expandvars(directory)
166 self._directory = directory
245 self._directory = directory
167
246
168 self._count = settings.pop('cache_shards')
247 self._count = settings.pop('cache_shards')
169 self._locking_url = settings.pop('locking_url')
248 self._locking_url = settings.pop('locking_url')
170
249
250 self._eviction_policy = settings['cache_eviction_policy']
251 self._cache_size_limit = settings['cache_size_limit']
252
171 self._shards = tuple(
253 self._shards = tuple(
172 FileSystemCache(
254 FileSystemCache(
173 index=num,
255 index=num,
174 directory=os.path.join(directory, 'shard_%03d' % num),
256 directory=os.path.join(directory, 'shard_%03d' % num),
175 **settings,
257 **settings,
176 )
258 )
177 for num in range(self._count)
259 for num in range(self._count)
178 )
260 )
179 self._hash = self._shards[0].hash
261 self._hash = self._shards[0].hash
180
262
181 def get_lock(self, lock_key):
263 def get_lock(self, lock_key):
182 return GenerationLock(lock_key, self._locking_url)
264 return GenerationLock(lock_key, self._locking_url)
183
265
184 def _get_shard(self, key) -> FileSystemCache:
266 def _get_shard(self, key) -> FileSystemCache:
185 index = self._hash(key) % self._count
267 index = self._hash(key) % self._count
186 shard = self._shards[index]
268 shard = self._shards[index]
187 return shard
269 return shard
188
270
189 def store(self, key, value_reader, metadata=None):
271 def store(self, key, value_reader, metadata=None):
190 shard = self._get_shard(key)
272 shard = self._get_shard(key)
191 return shard.store(key, value_reader, metadata)
273 return shard.store(key, value_reader, metadata)
192
274
193 def fetch(self, key):
275 def fetch(self, key):
194 """Return file handle corresponding to `key` from cache.
276 """Return file handle corresponding to `key` from cache.
195 """
277 """
196 shard = self._get_shard(key)
278 shard = self._get_shard(key)
197 return shard.fetch(key)
279 return shard.fetch(key)
198
280
199 def has_key(self, key):
281 def has_key(self, key):
200 """Return `True` if `key` matching item is found in cache.
282 """Return `True` if `key` matching item is found in cache.
201
283
202 :param key: key for item
284 :param key: key for item
203 :return: True if key is found
285 :return: True if key is found
204
286
205 """
287 """
206 shard = self._get_shard(key)
288 shard = self._get_shard(key)
207 return key in shard
289 return key in shard
208
290
209 def __contains__(self, item):
291 def __contains__(self, item):
210 return self.has_key(item)
292 return self.has_key(item)
211
293
294 def evict(self, policy=None, size_limit=None):
295 """
296 Remove old items based on the conditions
297
298
299 explanation of this algo:
300 iterate over each shard, then for each shard iterate over the .key files
301 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
302 access data, time creation, and access counts.
303
304 Store that into a memory DB so we can run different sorting strategies easily.
305 Summing the size is a sum sql query.
306
307 Then we run a sorting strategy based on eviction policy.
308 We iterate over sorted keys, and remove each checking if we hit the overall limit.
309 """
310
311 policy = policy or self._eviction_policy
312 size_limit = size_limit or self._cache_size_limit
313
314 select_policy = EVICTION_POLICY[policy]['evict']
315
316 if select_policy is None:
317 return 0
318
319 db = DB()
320
321 data = []
322 cnt = 1
323 for shard in self._shards:
324 for key_file in os.listdir(shard._directory):
325 if key_file.endswith('.key'):
326 key_file_path = os.path.join(shard._directory, key_file)
327 with open(key_file_path, 'rb') as f:
328 metadata = json.loads(f.read())
329 # in case we don't have size re-calc it...
330 if not metadata.get('size'):
331 fn = metadata.get('full_path')
332 size = os.stat(fn).st_size
333
334 data.append([
335 cnt,
336 key_file,
337 key_file_path,
338 metadata.get('filename'),
339 metadata.get('full_path'),
340 metadata.get('store_time', 0),
341 metadata.get('access_time', 0),
342 metadata.get('access_count', 0),
343 metadata.get('size', size),
344 ])
345 cnt += 1
346
347 # Insert bulk data using executemany
348 db.bulk_insert(data)
349
350 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
351
352 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
353 sorted_keys = db.sql(select_policy_qry).fetchall()
354
355 for key, cached_file, size in sorted_keys:
356 # simulate removal impact BEFORE removal
357 total_size -= size
358 if total_size <= size_limit:
359 # we obtained what we wanted...
360 break
361
362 os.remove(cached_file)
363 os.remove(key)
364 return
365
212
366
213 def get_archival_config(config):
367 def get_archival_config(config):
214
368
215 final_config = {
369 final_config = {
216
370
217 }
371 }
218
372
219 for k, v in config.items():
373 for k, v in config.items():
220 if k.startswith('archive_cache'):
374 if k.startswith('archive_cache'):
221 final_config[k] = v
375 final_config[k] = v
222
376
223 return final_config
377 return final_config
224
378
225
379
226 def get_archival_cache_store(config):
380 def get_archival_cache_store(config):
227
381
228 global cache_meta
382 global cache_meta
229 if cache_meta is not None:
383 if cache_meta is not None:
230 return cache_meta
384 return cache_meta
231
385
232 config = get_archival_config(config)
386 config = get_archival_config(config)
233 backend = config['archive_cache.backend.type']
387 backend = config['archive_cache.backend.type']
234 if backend != 'filesystem':
388 if backend != 'filesystem':
235 raise ValueError('archive_cache.backend.type only supports "filesystem"')
389 raise ValueError('archive_cache.backend.type only supports "filesystem"')
236
390
237 archive_cache_locking_url = config['archive_cache.locking.url']
391 archive_cache_locking_url = config['archive_cache.locking.url']
238 archive_cache_dir = config['archive_cache.filesystem.store_dir']
392 archive_cache_dir = config['archive_cache.filesystem.store_dir']
239 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
393 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
240 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
394 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
241 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
395 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
242
396
243 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
397 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
244
398
245 # check if it's ok to write, and re-create the archive cache
399 # check if it's ok to write, and re-create the archive cache
246 if not os.path.isdir(archive_cache_dir):
400 if not os.path.isdir(archive_cache_dir):
247 os.makedirs(archive_cache_dir, exist_ok=True)
401 os.makedirs(archive_cache_dir, exist_ok=True)
248
402
249 d_cache = FanoutCache(
403 d_cache = FanoutCache(
250 archive_cache_dir,
404 archive_cache_dir,
251 locking_url=archive_cache_locking_url,
405 locking_url=archive_cache_locking_url,
252 cache_shards=archive_cache_shards,
406 cache_shards=archive_cache_shards,
253 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
407 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
254 cache_eviction_policy=archive_cache_eviction_policy
408 cache_eviction_policy=archive_cache_eviction_policy
255 )
409 )
256 cache_meta = d_cache
410 cache_meta = d_cache
257 return cache_meta
411 return cache_meta
258
@@ -1,29 +1,71 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import os
19
18
20
19 class ArchiveCacheLock(Exception):
21 class ArchiveCacheLock(Exception):
20 pass
22 pass
21
23
22
24
23 def archive_iterator(_reader, block_size: int = 4096 * 512):
25 def archive_iterator(_reader, block_size: int = 4096 * 512):
24 # 4096 * 64 = 64KB
26 # 4096 * 64 = 64KB
25 while 1:
27 while 1:
26 data = _reader.read(block_size)
28 data = _reader.read(block_size)
27 if not data:
29 if not data:
28 break
30 break
29 yield data
31 yield data
32
33
34 def get_directory_statistics(start_path):
35 """
36 total_files, total_size, directory_stats = get_directory_statistics(start_path)
37
38 print(f"Directory statistics for: {start_path}\n")
39 print(f"Total files: {total_files}")
40 print(f"Total size: {format_size(total_size)}\n")
41
42 :param start_path:
43 :return:
44 """
45
46 total_files = 0
47 total_size = 0
48 directory_stats = {}
49
50 for dir_path, dir_names, file_names in os.walk(start_path):
51 dir_size = 0
52 file_count = len(file_names)
53
54 for file in file_names:
55 filepath = os.path.join(dir_path, file)
56 file_size = os.path.getsize(filepath)
57 dir_size += file_size
58
59 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
60 total_files += file_count
61 total_size += dir_size
62
63 return total_files, total_size, directory_stats
64
65
66 def format_size(size):
67 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
68 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
69 if size < 1024:
70 return f"{size:.2f} {unit}"
71 size /= 1024
General Comments 0
You need to be logged in to leave comments. Login now