##// END OF EJS Templates
feat(archive-cache): implement more usage stats for later easier evictions
super-admin -
r5422:386f3a63 default
parent child Browse files
Show More
@@ -1,264 +1,278 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import contextlib
21 21 import functools
22 22 import os
23 23 import logging
24 24 import time
25 25 import typing
26 26 import zlib
27 27
28 28 from rhodecode.lib.ext_json import json
29 29 from .lock import GenerationLock
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33 cache_meta = None
34 34
35 35 UNKNOWN = -241
36 36 NO_VAL = -917
37 37
38 38 MODE_BINARY = 'BINARY'
39 39
40 40
41 41 class FileSystemCache:
42 42
43 43 def __init__(self, index, directory, **settings):
44 44 self._index = index
45 45 self._directory = directory
46 46
47 47 def _write_file(self, full_path, iterator, mode, encoding=None):
48 48 full_dir, _ = os.path.split(full_path)
49 49
50 50 for count in range(1, 11):
51 51 with contextlib.suppress(OSError):
52 52 os.makedirs(full_dir)
53 53
54 54 try:
55 55 # Another cache may have deleted the directory before
56 56 # the file could be opened.
57 57 writer = open(full_path, mode, encoding=encoding)
58 58 except OSError:
59 59 if count == 10:
60 60 # Give up after 10 tries to open the file.
61 61 raise
62 62 continue
63 63
64 64 with writer:
65 65 size = 0
66 66 for chunk in iterator:
67 67 size += len(chunk)
68 68 writer.write(chunk)
69 69 return size
70 70
71 71 def _get_keyfile(self, key):
72 72 return os.path.join(self._directory, f'{key}.key')
73 73
74 74 def store(self, key, value_reader, metadata):
75 75 filename, full_path = self.random_filename()
76 76 key_file = self._get_keyfile(key)
77 77
78 78 # STORE METADATA
79 79 _metadata = {
80 80 "version": "v1",
81 "timestamp": time.time(),
82 81 "filename": filename,
83 82 "full_path": full_path,
84 83 "key_file": key_file,
84 "store_time": time.time(),
85 "access_count": 1,
86 "access_time": 0,
87 "size": 0
85 88 }
86 89 if metadata:
87 90 _metadata.update(metadata)
88 91
89 92 reader = functools.partial(value_reader.read, 2**22)
90 93
91 94 iterator = iter(reader, b'')
92 95 size = self._write_file(full_path, iterator, 'xb')
96 metadata['size'] = size
93 97
94 98 # after archive is finished, we create a key to save the presence of the binary file
95 99 with open(key_file, 'wb') as f:
96 100 f.write(json.dumps(_metadata))
97 101
98 102 return key, size, MODE_BINARY, filename, _metadata
99 103
100 104 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
101 105 if key not in self:
102 106 raise KeyError(key)
103 107
104 108 key_file = self._get_keyfile(key)
105 109 with open(key_file, 'rb') as f:
106 110 metadata = json.loads(f.read())
107 111
108 112 filename = metadata['filename']
109 113
110 return open(os.path.join(self._directory, filename), 'rb'), metadata
114 try:
115 return open(os.path.join(self._directory, filename), 'rb'), metadata
116 finally:
117 # update usage stats, count and accessed
118 metadata["access_count"] = metadata.get("access_count", 0) + 1
119 metadata["access_time"] = time.time()
120
121 with open(key_file, 'wb') as f:
122 f.write(json.dumps(metadata))
111 123
112 124 def random_filename(self):
113 125 """Return filename and full-path tuple for file storage.
114 126
115 127 Filename will be a randomly generated 28 character hexadecimal string
116 128 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
117 129 reduce the size of directories. On older filesystems, lookups in
118 130 directories with many files may be slow.
119 131 """
120 132
121 133 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
122 134 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
123 135 name = hex_name[4:] + '.archive_cache'
124 136 filename = os.path.join(sub_dir, name)
125 137 full_path = os.path.join(self._directory, filename)
126 138 return filename, full_path
127 139
128 140 def hash(self, key):
129 141 """Compute portable hash for `key`.
130 142
131 143 :param key: key to hash
132 144 :return: hash value
133 145
134 146 """
135 147 mask = 0xFFFFFFFF
136 148 return zlib.adler32(key.encode('utf-8')) & mask # noqa
137 149
138 150 def __contains__(self, key):
139 151 """Return `True` if `key` matching item is found in cache.
140 152
141 153 :param key: key matching item
142 154 :return: True if key matching item
143 155
144 156 """
145 157 key_file = self._get_keyfile(key)
146 158 return os.path.exists(key_file)
147 159
148 160
149 161 class FanoutCache:
150 162 """Cache that shards keys and values."""
151 163
152 164 def __init__(
153 165 self, directory=None, **settings
154 166 ):
155 167 """Initialize cache instance.
156 168
157 169 :param str directory: cache directory
158 170 :param settings: settings dict
159 171
160 172 """
161 173 if directory is None:
162 174 raise ValueError('directory cannot be None')
163 175
164 176 directory = str(directory)
165 177 directory = os.path.expanduser(directory)
166 178 directory = os.path.expandvars(directory)
167 179 self._directory = directory
168 180
169 181 self._count = settings.pop('cache_shards')
170 182 self._locking_url = settings.pop('locking_url')
171 183
184 self._eviction_policy = settings['cache_eviction_policy']
185 self._cache_size_limit = settings['cache_size_limit']
186
172 187 self._shards = tuple(
173 188 FileSystemCache(
174 189 index=num,
175 190 directory=os.path.join(directory, 'shard_%03d' % num),
176 191 **settings,
177 192 )
178 193 for num in range(self._count)
179 194 )
180 195 self._hash = self._shards[0].hash
181 196
182 197 def get_lock(self, lock_key):
183 198 return GenerationLock(lock_key, self._locking_url)
184 199
185 200 def _get_shard(self, key) -> FileSystemCache:
186 201 index = self._hash(key) % self._count
187 202 shard = self._shards[index]
188 203 return shard
189 204
190 205 def store(self, key, value_reader, metadata=None):
191 206 shard = self._get_shard(key)
192 207 return shard.store(key, value_reader, metadata)
193 208
194 209 def fetch(self, key):
195 210 """Return file handle corresponding to `key` from cache.
196 211 """
197 212 shard = self._get_shard(key)
198 213 return shard.fetch(key)
199 214
200 215 def has_key(self, key):
201 216 """Return `True` if `key` matching item is found in cache.
202 217
203 218 :param key: key for item
204 219 :return: True if key is found
205 220
206 221 """
207 222 shard = self._get_shard(key)
208 223 return key in shard
209 224
210 225 def __contains__(self, item):
211 226 return self.has_key(item)
212 227
213 228 def evict(self):
214 229 """Remove old items based on the conditions"""
215 230 # TODO: Implement this...
216 231 return
217 232
218 233
219 234 def get_archival_config(config):
220 235
221 236 final_config = {
222 237
223 238 }
224 239
225 240 for k, v in config.items():
226 241 if k.startswith('archive_cache'):
227 242 final_config[k] = v
228 243
229 244 return final_config
230 245
231 246
232 247 def get_archival_cache_store(config):
233 248
234 249 global cache_meta
235 250 if cache_meta is not None:
236 251 return cache_meta
237 252
238 253 config = get_archival_config(config)
239 254 backend = config['archive_cache.backend.type']
240 255 if backend != 'filesystem':
241 256 raise ValueError('archive_cache.backend.type only supports "filesystem"')
242 257
243 258 archive_cache_locking_url = config['archive_cache.locking.url']
244 259 archive_cache_dir = config['archive_cache.filesystem.store_dir']
245 260 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
246 261 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
247 262 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
248 263
249 264 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
250 265
251 266 # check if it's ok to write, and re-create the archive cache
252 267 if not os.path.isdir(archive_cache_dir):
253 268 os.makedirs(archive_cache_dir, exist_ok=True)
254 269
255 270 d_cache = FanoutCache(
256 271 archive_cache_dir,
257 272 locking_url=archive_cache_locking_url,
258 273 cache_shards=archive_cache_shards,
259 274 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
260 275 cache_eviction_policy=archive_cache_eviction_policy
261 276 )
262 277 cache_meta = d_cache
263 278 return cache_meta
264
@@ -1,71 +1,72 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18 19 import os
19 20
20 21
21 22 class ArchiveCacheLock(Exception):
22 23 pass
23 24
24 25
25 26 def archive_iterator(_reader, block_size: int = 4096 * 512):
26 27 # 4096 * 64 = 64KB
27 28 while 1:
28 29 data = _reader.read(block_size)
29 30 if not data:
30 31 break
31 32 yield data
32 33
33 34
34 35 def get_directory_statistics(start_path):
35 36 """
36 37 total_files, total_size, directory_stats = get_directory_statistics(start_path)
37 38
38 39 print(f"Directory statistics for: {start_path}\n")
39 40 print(f"Total files: {total_files}")
40 41 print(f"Total size: {format_size(total_size)}\n")
41 42
42 43 :param start_path:
43 44 :return:
44 45 """
45 46
46 47 total_files = 0
47 48 total_size = 0
48 49 directory_stats = {}
49 50
50 51 for dir_path, dir_names, file_names in os.walk(start_path):
51 52 dir_size = 0
52 53 file_count = len(file_names)
53 54
54 55 for file in file_names:
55 56 filepath = os.path.join(dir_path, file)
56 57 file_size = os.path.getsize(filepath)
57 58 dir_size += file_size
58 59
59 60 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
60 61 total_files += file_count
61 62 total_size += dir_size
62 63
63 64 return total_files, total_size, directory_stats
64 65
65 66
66 67 def format_size(size):
67 68 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
68 69 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
69 70 if size < 1024:
70 71 return f"{size:.2f} {unit}"
71 72 size /= 1024
General Comments 0
You need to be logged in to leave comments. Login now