##// END OF EJS Templates
feat(archive-cache): implement more usage stats for later easier evictions
super-admin -
r5422:386f3a63 default
parent child Browse files
Show More
@@ -1,264 +1,278 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import contextlib
20 import contextlib
21 import functools
21 import functools
22 import os
22 import os
23 import logging
23 import logging
24 import time
24 import time
25 import typing
25 import typing
26 import zlib
26 import zlib
27
27
28 from rhodecode.lib.ext_json import json
28 from rhodecode.lib.ext_json import json
29 from .lock import GenerationLock
29 from .lock import GenerationLock
30
30
31 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
32
32
33 cache_meta = None
33 cache_meta = None
34
34
35 UNKNOWN = -241
35 UNKNOWN = -241
36 NO_VAL = -917
36 NO_VAL = -917
37
37
38 MODE_BINARY = 'BINARY'
38 MODE_BINARY = 'BINARY'
39
39
40
40
41 class FileSystemCache:
41 class FileSystemCache:
42
42
43 def __init__(self, index, directory, **settings):
43 def __init__(self, index, directory, **settings):
44 self._index = index
44 self._index = index
45 self._directory = directory
45 self._directory = directory
46
46
47 def _write_file(self, full_path, iterator, mode, encoding=None):
47 def _write_file(self, full_path, iterator, mode, encoding=None):
48 full_dir, _ = os.path.split(full_path)
48 full_dir, _ = os.path.split(full_path)
49
49
50 for count in range(1, 11):
50 for count in range(1, 11):
51 with contextlib.suppress(OSError):
51 with contextlib.suppress(OSError):
52 os.makedirs(full_dir)
52 os.makedirs(full_dir)
53
53
54 try:
54 try:
55 # Another cache may have deleted the directory before
55 # Another cache may have deleted the directory before
56 # the file could be opened.
56 # the file could be opened.
57 writer = open(full_path, mode, encoding=encoding)
57 writer = open(full_path, mode, encoding=encoding)
58 except OSError:
58 except OSError:
59 if count == 10:
59 if count == 10:
60 # Give up after 10 tries to open the file.
60 # Give up after 10 tries to open the file.
61 raise
61 raise
62 continue
62 continue
63
63
64 with writer:
64 with writer:
65 size = 0
65 size = 0
66 for chunk in iterator:
66 for chunk in iterator:
67 size += len(chunk)
67 size += len(chunk)
68 writer.write(chunk)
68 writer.write(chunk)
69 return size
69 return size
70
70
71 def _get_keyfile(self, key):
71 def _get_keyfile(self, key):
72 return os.path.join(self._directory, f'{key}.key')
72 return os.path.join(self._directory, f'{key}.key')
73
73
74 def store(self, key, value_reader, metadata):
74 def store(self, key, value_reader, metadata):
75 filename, full_path = self.random_filename()
75 filename, full_path = self.random_filename()
76 key_file = self._get_keyfile(key)
76 key_file = self._get_keyfile(key)
77
77
78 # STORE METADATA
78 # STORE METADATA
79 _metadata = {
79 _metadata = {
80 "version": "v1",
80 "version": "v1",
81 "timestamp": time.time(),
82 "filename": filename,
81 "filename": filename,
83 "full_path": full_path,
82 "full_path": full_path,
84 "key_file": key_file,
83 "key_file": key_file,
84 "store_time": time.time(),
85 "access_count": 1,
86 "access_time": 0,
87 "size": 0
85 }
88 }
86 if metadata:
89 if metadata:
87 _metadata.update(metadata)
90 _metadata.update(metadata)
88
91
89 reader = functools.partial(value_reader.read, 2**22)
92 reader = functools.partial(value_reader.read, 2**22)
90
93
91 iterator = iter(reader, b'')
94 iterator = iter(reader, b'')
92 size = self._write_file(full_path, iterator, 'xb')
95 size = self._write_file(full_path, iterator, 'xb')
96 metadata['size'] = size
93
97
94 # after archive is finished, we create a key to save the presence of the binary file
98 # after archive is finished, we create a key to save the presence of the binary file
95 with open(key_file, 'wb') as f:
99 with open(key_file, 'wb') as f:
96 f.write(json.dumps(_metadata))
100 f.write(json.dumps(_metadata))
97
101
98 return key, size, MODE_BINARY, filename, _metadata
102 return key, size, MODE_BINARY, filename, _metadata
99
103
100 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
104 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
101 if key not in self:
105 if key not in self:
102 raise KeyError(key)
106 raise KeyError(key)
103
107
104 key_file = self._get_keyfile(key)
108 key_file = self._get_keyfile(key)
105 with open(key_file, 'rb') as f:
109 with open(key_file, 'rb') as f:
106 metadata = json.loads(f.read())
110 metadata = json.loads(f.read())
107
111
108 filename = metadata['filename']
112 filename = metadata['filename']
109
113
114 try:
110 return open(os.path.join(self._directory, filename), 'rb'), metadata
115 return open(os.path.join(self._directory, filename), 'rb'), metadata
116 finally:
117 # update usage stats, count and accessed
118 metadata["access_count"] = metadata.get("access_count", 0) + 1
119 metadata["access_time"] = time.time()
120
121 with open(key_file, 'wb') as f:
122 f.write(json.dumps(metadata))
111
123
112 def random_filename(self):
124 def random_filename(self):
113 """Return filename and full-path tuple for file storage.
125 """Return filename and full-path tuple for file storage.
114
126
115 Filename will be a randomly generated 28 character hexadecimal string
127 Filename will be a randomly generated 28 character hexadecimal string
116 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
128 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
117 reduce the size of directories. On older filesystems, lookups in
129 reduce the size of directories. On older filesystems, lookups in
118 directories with many files may be slow.
130 directories with many files may be slow.
119 """
131 """
120
132
121 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
133 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
122 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
134 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
123 name = hex_name[4:] + '.archive_cache'
135 name = hex_name[4:] + '.archive_cache'
124 filename = os.path.join(sub_dir, name)
136 filename = os.path.join(sub_dir, name)
125 full_path = os.path.join(self._directory, filename)
137 full_path = os.path.join(self._directory, filename)
126 return filename, full_path
138 return filename, full_path
127
139
128 def hash(self, key):
140 def hash(self, key):
129 """Compute portable hash for `key`.
141 """Compute portable hash for `key`.
130
142
131 :param key: key to hash
143 :param key: key to hash
132 :return: hash value
144 :return: hash value
133
145
134 """
146 """
135 mask = 0xFFFFFFFF
147 mask = 0xFFFFFFFF
136 return zlib.adler32(key.encode('utf-8')) & mask # noqa
148 return zlib.adler32(key.encode('utf-8')) & mask # noqa
137
149
138 def __contains__(self, key):
150 def __contains__(self, key):
139 """Return `True` if `key` matching item is found in cache.
151 """Return `True` if `key` matching item is found in cache.
140
152
141 :param key: key matching item
153 :param key: key matching item
142 :return: True if key matching item
154 :return: True if key matching item
143
155
144 """
156 """
145 key_file = self._get_keyfile(key)
157 key_file = self._get_keyfile(key)
146 return os.path.exists(key_file)
158 return os.path.exists(key_file)
147
159
148
160
149 class FanoutCache:
161 class FanoutCache:
150 """Cache that shards keys and values."""
162 """Cache that shards keys and values."""
151
163
152 def __init__(
164 def __init__(
153 self, directory=None, **settings
165 self, directory=None, **settings
154 ):
166 ):
155 """Initialize cache instance.
167 """Initialize cache instance.
156
168
157 :param str directory: cache directory
169 :param str directory: cache directory
158 :param settings: settings dict
170 :param settings: settings dict
159
171
160 """
172 """
161 if directory is None:
173 if directory is None:
162 raise ValueError('directory cannot be None')
174 raise ValueError('directory cannot be None')
163
175
164 directory = str(directory)
176 directory = str(directory)
165 directory = os.path.expanduser(directory)
177 directory = os.path.expanduser(directory)
166 directory = os.path.expandvars(directory)
178 directory = os.path.expandvars(directory)
167 self._directory = directory
179 self._directory = directory
168
180
169 self._count = settings.pop('cache_shards')
181 self._count = settings.pop('cache_shards')
170 self._locking_url = settings.pop('locking_url')
182 self._locking_url = settings.pop('locking_url')
171
183
184 self._eviction_policy = settings['cache_eviction_policy']
185 self._cache_size_limit = settings['cache_size_limit']
186
172 self._shards = tuple(
187 self._shards = tuple(
173 FileSystemCache(
188 FileSystemCache(
174 index=num,
189 index=num,
175 directory=os.path.join(directory, 'shard_%03d' % num),
190 directory=os.path.join(directory, 'shard_%03d' % num),
176 **settings,
191 **settings,
177 )
192 )
178 for num in range(self._count)
193 for num in range(self._count)
179 )
194 )
180 self._hash = self._shards[0].hash
195 self._hash = self._shards[0].hash
181
196
182 def get_lock(self, lock_key):
197 def get_lock(self, lock_key):
183 return GenerationLock(lock_key, self._locking_url)
198 return GenerationLock(lock_key, self._locking_url)
184
199
185 def _get_shard(self, key) -> FileSystemCache:
200 def _get_shard(self, key) -> FileSystemCache:
186 index = self._hash(key) % self._count
201 index = self._hash(key) % self._count
187 shard = self._shards[index]
202 shard = self._shards[index]
188 return shard
203 return shard
189
204
190 def store(self, key, value_reader, metadata=None):
205 def store(self, key, value_reader, metadata=None):
191 shard = self._get_shard(key)
206 shard = self._get_shard(key)
192 return shard.store(key, value_reader, metadata)
207 return shard.store(key, value_reader, metadata)
193
208
194 def fetch(self, key):
209 def fetch(self, key):
195 """Return file handle corresponding to `key` from cache.
210 """Return file handle corresponding to `key` from cache.
196 """
211 """
197 shard = self._get_shard(key)
212 shard = self._get_shard(key)
198 return shard.fetch(key)
213 return shard.fetch(key)
199
214
200 def has_key(self, key):
215 def has_key(self, key):
201 """Return `True` if `key` matching item is found in cache.
216 """Return `True` if `key` matching item is found in cache.
202
217
203 :param key: key for item
218 :param key: key for item
204 :return: True if key is found
219 :return: True if key is found
205
220
206 """
221 """
207 shard = self._get_shard(key)
222 shard = self._get_shard(key)
208 return key in shard
223 return key in shard
209
224
210 def __contains__(self, item):
225 def __contains__(self, item):
211 return self.has_key(item)
226 return self.has_key(item)
212
227
213 def evict(self):
228 def evict(self):
214 """Remove old items based on the conditions"""
229 """Remove old items based on the conditions"""
215 # TODO: Implement this...
230 # TODO: Implement this...
216 return
231 return
217
232
218
233
219 def get_archival_config(config):
234 def get_archival_config(config):
220
235
221 final_config = {
236 final_config = {
222
237
223 }
238 }
224
239
225 for k, v in config.items():
240 for k, v in config.items():
226 if k.startswith('archive_cache'):
241 if k.startswith('archive_cache'):
227 final_config[k] = v
242 final_config[k] = v
228
243
229 return final_config
244 return final_config
230
245
231
246
232 def get_archival_cache_store(config):
247 def get_archival_cache_store(config):
233
248
234 global cache_meta
249 global cache_meta
235 if cache_meta is not None:
250 if cache_meta is not None:
236 return cache_meta
251 return cache_meta
237
252
238 config = get_archival_config(config)
253 config = get_archival_config(config)
239 backend = config['archive_cache.backend.type']
254 backend = config['archive_cache.backend.type']
240 if backend != 'filesystem':
255 if backend != 'filesystem':
241 raise ValueError('archive_cache.backend.type only supports "filesystem"')
256 raise ValueError('archive_cache.backend.type only supports "filesystem"')
242
257
243 archive_cache_locking_url = config['archive_cache.locking.url']
258 archive_cache_locking_url = config['archive_cache.locking.url']
244 archive_cache_dir = config['archive_cache.filesystem.store_dir']
259 archive_cache_dir = config['archive_cache.filesystem.store_dir']
245 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
260 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
246 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
261 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
247 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
262 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
248
263
249 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
264 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
250
265
251 # check if it's ok to write, and re-create the archive cache
266 # check if it's ok to write, and re-create the archive cache
252 if not os.path.isdir(archive_cache_dir):
267 if not os.path.isdir(archive_cache_dir):
253 os.makedirs(archive_cache_dir, exist_ok=True)
268 os.makedirs(archive_cache_dir, exist_ok=True)
254
269
255 d_cache = FanoutCache(
270 d_cache = FanoutCache(
256 archive_cache_dir,
271 archive_cache_dir,
257 locking_url=archive_cache_locking_url,
272 locking_url=archive_cache_locking_url,
258 cache_shards=archive_cache_shards,
273 cache_shards=archive_cache_shards,
259 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
274 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
260 cache_eviction_policy=archive_cache_eviction_policy
275 cache_eviction_policy=archive_cache_eviction_policy
261 )
276 )
262 cache_meta = d_cache
277 cache_meta = d_cache
263 return cache_meta
278 return cache_meta
264
@@ -1,71 +1,72 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18 import os
19 import os
19
20
20
21
21 class ArchiveCacheLock(Exception):
22 class ArchiveCacheLock(Exception):
22 pass
23 pass
23
24
24
25
25 def archive_iterator(_reader, block_size: int = 4096 * 512):
26 def archive_iterator(_reader, block_size: int = 4096 * 512):
26 # 4096 * 64 = 64KB
27 # 4096 * 64 = 64KB
27 while 1:
28 while 1:
28 data = _reader.read(block_size)
29 data = _reader.read(block_size)
29 if not data:
30 if not data:
30 break
31 break
31 yield data
32 yield data
32
33
33
34
34 def get_directory_statistics(start_path):
35 def get_directory_statistics(start_path):
35 """
36 """
36 total_files, total_size, directory_stats = get_directory_statistics(start_path)
37 total_files, total_size, directory_stats = get_directory_statistics(start_path)
37
38
38 print(f"Directory statistics for: {start_path}\n")
39 print(f"Directory statistics for: {start_path}\n")
39 print(f"Total files: {total_files}")
40 print(f"Total files: {total_files}")
40 print(f"Total size: {format_size(total_size)}\n")
41 print(f"Total size: {format_size(total_size)}\n")
41
42
42 :param start_path:
43 :param start_path:
43 :return:
44 :return:
44 """
45 """
45
46
46 total_files = 0
47 total_files = 0
47 total_size = 0
48 total_size = 0
48 directory_stats = {}
49 directory_stats = {}
49
50
50 for dir_path, dir_names, file_names in os.walk(start_path):
51 for dir_path, dir_names, file_names in os.walk(start_path):
51 dir_size = 0
52 dir_size = 0
52 file_count = len(file_names)
53 file_count = len(file_names)
53
54
54 for file in file_names:
55 for file in file_names:
55 filepath = os.path.join(dir_path, file)
56 filepath = os.path.join(dir_path, file)
56 file_size = os.path.getsize(filepath)
57 file_size = os.path.getsize(filepath)
57 dir_size += file_size
58 dir_size += file_size
58
59
59 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
60 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
60 total_files += file_count
61 total_files += file_count
61 total_size += dir_size
62 total_size += dir_size
62
63
63 return total_files, total_size, directory_stats
64 return total_files, total_size, directory_stats
64
65
65
66
66 def format_size(size):
67 def format_size(size):
67 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
68 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
68 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
69 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
69 if size < 1024:
70 if size < 1024:
70 return f"{size:.2f} {unit}"
71 return f"{size:.2f} {unit}"
71 size /= 1024
72 size /= 1024
General Comments 0
You need to be logged in to leave comments. Login now