##// END OF EJS Templates
archive-cache: synced with CE codebase
super-admin -
r1257:60bf3236 default
parent child Browse files
Show More
@@ -1,352 +1,355 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import os
19 import os
20 import functools
20 import functools
21 import logging
21 import logging
22 import typing
22 import typing
23 import time
23 import time
24 import zlib
24 import zlib
25
25
26 from ...ext_json import json
26 from ...ext_json import json
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 from ..lock import GenerationLock
28 from ..lock import GenerationLock
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class BaseShard:
33 class BaseShard:
34 storage_type: str = ''
34 storage_type: str = ''
35 fs = None
35 fs = None
36
36
37 @classmethod
37 @classmethod
38 def hash(cls, key):
38 def hash(cls, key):
39 """Compute portable hash for `key`.
39 """Compute portable hash for `key`.
40
40
41 :param key: key to hash
41 :param key: key to hash
42 :return: hash value
42 :return: hash value
43
43
44 """
44 """
45 mask = 0xFFFFFFFF
45 mask = 0xFFFFFFFF
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47
47
48 def _write_file(self, full_path, read_iterator, mode):
48 def _write_file(self, full_path, read_iterator, mode):
49 raise NotImplementedError
49 raise NotImplementedError
50
50
51 def _get_keyfile(self, key):
51 def _get_keyfile(self, key):
52 raise NotImplementedError
52 raise NotImplementedError
53
53
54 def random_filename(self):
54 def random_filename(self):
55 raise NotImplementedError
55 raise NotImplementedError
56
56
57 def _store(self, key, value_reader, metadata, mode):
57 def _store(self, key, value_reader, metadata, mode):
58 (filename, # hash-name
58 (filename, # hash-name
59 full_path # full-path/hash-name
59 full_path # full-path/hash-name
60 ) = self.random_filename()
60 ) = self.random_filename()
61
61
62 key_file, key_file_path = self._get_keyfile(key)
62 key_file, key_file_path = self._get_keyfile(key)
63
63
64 # STORE METADATA
64 # STORE METADATA
65 _metadata = {
65 _metadata = {
66 "version": "v1",
66 "version": "v1",
67
67
68 "key_file": key_file, # this is the .key.json file storing meta
68 "key_file": key_file, # this is the .key.json file storing meta
69 "key_file_path": key_file_path, # full path to key_file
69 "key_file_path": key_file_path, # full path to key_file
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 "archive_filename": filename, # the actual filename we stored that file under
71 "archive_filename": filename, # the actual filename we stored that file under
72 "archive_full_path": full_path,
72 "archive_full_path": full_path,
73
73
74 "store_time": time.time(),
74 "store_time": time.time(),
75 "access_count": 0,
75 "access_count": 0,
76 "access_time": 0,
76 "access_time": 0,
77
77
78 "size": 0
78 "size": 0
79 }
79 }
80 if metadata:
80 if metadata:
81 _metadata.update(metadata)
81 _metadata.update(metadata)
82
82
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 _metadata['size'] = size
85 _metadata['size'] = size
86 _metadata['sha256'] = sha256
86 _metadata['sha256'] = sha256
87
87
88 # after archive is finished, we create a key to save the presence of the binary file
88 # after archive is finished, we create a key to save the presence of the binary file
89 with self.fs.open(key_file_path, 'wb') as f:
89 with self.fs.open(key_file_path, 'wb') as f:
90 f.write(json.dumps(_metadata))
90 f.write(json.dumps(_metadata))
91
91
92 return key, filename, size, _metadata
92 return key, filename, size, _metadata
93
93
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
95 if retry is NOT_GIVEN:
95 if retry is NOT_GIVEN:
96 retry = False
96 retry = False
97 if retry_attempts is NOT_GIVEN:
97 if retry_attempts is NOT_GIVEN:
98 retry_attempts = 0
98 retry_attempts = 0
99
99
100 if retry and retry_attempts > 0:
100 if retry and retry_attempts > 0:
101 for attempt in range(1, retry_attempts + 1):
101 for attempt in range(1, retry_attempts + 1):
102 if key in self:
102 if key in self:
103 break
103 break
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 time.sleep(retry_backoff)
105 time.sleep(retry_backoff)
106
106
107 if key not in self:
107 if key not in self:
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 raise KeyError(key)
109 raise KeyError(key)
110
110
111 key_file, key_file_path = self._get_keyfile(key)
111 key_file, key_file_path = self._get_keyfile(key)
112 with self.fs.open(key_file_path, 'rb') as f:
112 with self.fs.open(key_file_path, 'rb') as f:
113 metadata = json.loads(f.read())
113 metadata = json.loads(f.read())
114
114
115 archive_path = metadata['archive_full_path']
115 archive_path = metadata['archive_full_path']
116
116
117 try:
117 try:
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 finally:
119 finally:
120 # update usage stats, count and accessed
120 # update usage stats, count and accessed
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 metadata["access_time"] = time.time()
122 metadata["access_time"] = time.time()
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 key_file, metadata['access_count'], metadata['access_time'])
124 key_file, metadata['access_count'], metadata['access_time'])
125 with self.fs.open(key_file_path, 'wb') as f:
125 with self.fs.open(key_file_path, 'wb') as f:
126 f.write(json.dumps(metadata))
126 f.write(json.dumps(metadata))
127
127
128 def _remove(self, key):
128 def _remove(self, key):
129 if key not in self:
129 if key not in self:
130 log.exception(f'requested key={key} not found in {self}')
130 log.exception(f'requested key={key} not found in {self}')
131 raise KeyError(key)
131 raise KeyError(key)
132
132
133 key_file, key_file_path = self._get_keyfile(key)
133 key_file, key_file_path = self._get_keyfile(key)
134 with self.fs.open(key_file_path, 'rb') as f:
134 with self.fs.open(key_file_path, 'rb') as f:
135 metadata = json.loads(f.read())
135 metadata = json.loads(f.read())
136
136
137 archive_path = metadata['archive_full_path']
137 archive_path = metadata['archive_full_path']
138 self.fs.rm(archive_path)
138 self.fs.rm(archive_path)
139 self.fs.rm(key_file_path)
139 self.fs.rm(key_file_path)
140 return 1
140 return 1
141
141
142 @property
142 @property
143 def storage_medium(self):
143 def storage_medium(self):
144 return getattr(self, self.storage_type)
144 return getattr(self, self.storage_type)
145
145
146 @property
146 @property
147 def key_suffix(self):
147 def key_suffix(self):
148 return 'key.json'
148 return 'key.json'
149
149
150 def __contains__(self, key):
150 def __contains__(self, key):
151 """Return `True` if `key` matching item is found in cache.
151 """Return `True` if `key` matching item is found in cache.
152
152
153 :param key: key matching item
153 :param key: key matching item
154 :return: True if key matching item
154 :return: True if key matching item
155
155
156 """
156 """
157 key_file, key_file_path = self._get_keyfile(key)
157 key_file, key_file_path = self._get_keyfile(key)
158 return self.fs.exists(key_file_path)
158 return self.fs.exists(key_file_path)
159
159
160
160
161 class BaseCache:
161 class BaseCache:
162 _locking_url: str = ''
162 _locking_url: str = ''
163 _storage_path: str = ''
163 _storage_path: str = ''
164 _config = {}
164 _config = {}
165 retry = False
165 retry = False
166 retry_attempts = 0
166 retry_attempts = 0
167 retry_backoff = 1
167 retry_backoff = 1
168 _shards = tuple()
168 _shards = tuple()
169 shard_cls = BaseShard
169
170
170 def __contains__(self, key):
171 def __contains__(self, key):
171 """Return `True` if `key` matching item is found in cache.
172 """Return `True` if `key` matching item is found in cache.
172
173
173 :param key: key matching item
174 :param key: key matching item
174 :return: True if key matching item
175 :return: True if key matching item
175
176
176 """
177 """
177 return self.has_key(key)
178 return self.has_key(key)
178
179
179 def __repr__(self):
180 def __repr__(self):
180 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181
182
182 @classmethod
183 @classmethod
183 def gb_to_bytes(cls, gb):
184 def gb_to_bytes(cls, gb):
184 return gb * (1024 ** 3)
185 return gb * (1024 ** 3)
185
186
186 @property
187 @property
187 def storage_path(self):
188 def storage_path(self):
188 return self._storage_path
189 return self._storage_path
189
190
190 @classmethod
191 @classmethod
191 def get_stats_db(cls):
192 def get_stats_db(cls):
192 return StatsDB()
193 return StatsDB()
193
194
194 def get_conf(self, key, pop=False):
195 def get_conf(self, key, pop=False):
195 if key not in self._config:
196 if key not in self._config:
196 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 val = self._config[key]
198 val = self._config[key]
198 if pop:
199 if pop:
199 del self._config[key]
200 del self._config[key]
200 return val
201 return val
201
202
202 def _get_shard(self, key):
203 def _get_shard(self, key) -> shard_cls:
203 raise NotImplementedError
204 index = self._hash(key) % self._shard_count
205 shard = self._shards[index]
206 return shard
204
207
205 def _get_size(self, shard, archive_path):
208 def _get_size(self, shard, archive_path):
206 raise NotImplementedError
209 raise NotImplementedError
207
210
208 def store(self, key, value_reader, metadata=None):
211 def store(self, key, value_reader, metadata=None):
209 shard = self._get_shard(key)
212 shard = self._get_shard(key)
210 return shard.store(key, value_reader, metadata)
213 return shard.store(key, value_reader, metadata)
211
214
212 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
215 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
213 """
216 """
214 Return file handle corresponding to `key` from specific shard cache.
217 Return file handle corresponding to `key` from specific shard cache.
215 """
218 """
216 if retry is NOT_GIVEN:
219 if retry is NOT_GIVEN:
217 retry = self.retry
220 retry = self.retry
218 if retry_attempts is NOT_GIVEN:
221 if retry_attempts is NOT_GIVEN:
219 retry_attempts = self.retry_attempts
222 retry_attempts = self.retry_attempts
220 retry_backoff = self.retry_backoff
223 retry_backoff = self.retry_backoff
221
224
222 shard = self._get_shard(key)
225 shard = self._get_shard(key)
223 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
226 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
224
227
225 def remove(self, key):
228 def remove(self, key):
226 shard = self._get_shard(key)
229 shard = self._get_shard(key)
227 return shard.remove(key)
230 return shard.remove(key)
228
231
229 def has_key(self, archive_key):
232 def has_key(self, archive_key):
230 """Return `True` if `key` matching item is found in cache.
233 """Return `True` if `key` matching item is found in cache.
231
234
232 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
235 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
233 :return: True if key is found
236 :return: True if key is found
234
237
235 """
238 """
236 shard = self._get_shard(archive_key)
239 shard = self._get_shard(archive_key)
237 return archive_key in shard
240 return archive_key in shard
238
241
239 def iter_keys(self):
242 def iter_keys(self):
240 for shard in self._shards:
243 for shard in self._shards:
241 if shard.fs.exists(shard.storage_medium):
244 if shard.fs.exists(shard.storage_medium):
242 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
245 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
243 for key_file_path in _files:
246 for key_file_path in _files:
244 if key_file_path.endswith(shard.key_suffix):
247 if key_file_path.endswith(shard.key_suffix):
245 yield shard, key_file_path
248 yield shard, key_file_path
246
249
247 def get_lock(self, lock_key):
250 def get_lock(self, lock_key):
248 return GenerationLock(lock_key, self._locking_url)
251 return GenerationLock(lock_key, self._locking_url)
249
252
250 def evict(self, policy=None, size_limit=None) -> dict:
253 def evict(self, policy=None, size_limit=None) -> dict:
251 """
254 """
252 Remove old items based on the conditions
255 Remove old items based on the conditions
253
256
254
257
255 explanation of this algo:
258 explanation of this algo:
256 iterate over each shard, then for each shard iterate over the .key files
259 iterate over each shard, then for each shard iterate over the .key files
257 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
260 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
258 access data, time creation, and access counts.
261 access data, time creation, and access counts.
259
262
260 Store that into a memory DB in order we can run different sorting strategies easily.
263 Store that into a memory DB in order we can run different sorting strategies easily.
261 Summing the size is a sum sql query.
264 Summing the size is a sum sql query.
262
265
263 Then we run a sorting strategy based on eviction policy.
266 Then we run a sorting strategy based on eviction policy.
264 We iterate over sorted keys, and remove each checking if we hit the overall limit.
267 We iterate over sorted keys, and remove each checking if we hit the overall limit.
265 """
268 """
266 removal_info = {
269 removal_info = {
267 "removed_items": 0,
270 "removed_items": 0,
268 "removed_size": 0
271 "removed_size": 0
269 }
272 }
270 policy = policy or self._eviction_policy
273 policy = policy or self._eviction_policy
271 size_limit = size_limit or self._cache_size_limit
274 size_limit = size_limit or self._cache_size_limit
272
275
273 select_policy = EVICTION_POLICY[policy]['evict']
276 select_policy = EVICTION_POLICY[policy]['evict']
274
277
275 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
278 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
276 policy, format_size(size_limit))
279 policy, format_size(size_limit))
277
280
278 if select_policy is None:
281 if select_policy is None:
279 return removal_info
282 return removal_info
280
283
281 db = self.get_stats_db()
284 db = self.get_stats_db()
282
285
283 data = []
286 data = []
284 cnt = 1
287 cnt = 1
285
288
286 for shard, key_file in self.iter_keys():
289 for shard, key_file in self.iter_keys():
287 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
290 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
288 metadata = json.loads(f.read())
291 metadata = json.loads(f.read())
289
292
290 key_file_path = os.path.join(shard.storage_medium, key_file)
293 key_file_path = os.path.join(shard.storage_medium, key_file)
291
294
292 archive_key = metadata['archive_key']
295 archive_key = metadata['archive_key']
293 archive_path = metadata['archive_full_path']
296 archive_path = metadata['archive_full_path']
294
297
295 size = metadata.get('size')
298 size = metadata.get('size')
296 if not size:
299 if not size:
297 # in case we don't have size re-calc it...
300 # in case we don't have size re-calc it...
298 size = self._get_size(shard, archive_path)
301 size = self._get_size(shard, archive_path)
299
302
300 data.append([
303 data.append([
301 cnt,
304 cnt,
302 key_file,
305 key_file,
303 key_file_path,
306 key_file_path,
304 archive_key,
307 archive_key,
305 archive_path,
308 archive_path,
306 metadata.get('store_time', 0),
309 metadata.get('store_time', 0),
307 metadata.get('access_time', 0),
310 metadata.get('access_time', 0),
308 metadata.get('access_count', 0),
311 metadata.get('access_count', 0),
309 size,
312 size,
310 ])
313 ])
311 cnt += 1
314 cnt += 1
312
315
313 # Insert bulk data using executemany
316 # Insert bulk data using executemany
314 db.bulk_insert(data)
317 db.bulk_insert(data)
315
318
316 total_size = db.get_total_size()
319 total_size = db.get_total_size()
317 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
320 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
318 len(data), format_size(total_size), format_size(size_limit))
321 len(data), format_size(total_size), format_size(size_limit))
319
322
320 removed_items = 0
323 removed_items = 0
321 removed_size = 0
324 removed_size = 0
322 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
325 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
323 # simulate removal impact BEFORE removal
326 # simulate removal impact BEFORE removal
324 total_size -= size
327 total_size -= size
325
328
326 if total_size <= size_limit:
329 if total_size <= size_limit:
327 # we obtained what we wanted...
330 # we obtained what we wanted...
328 break
331 break
329
332
330 self.remove(archive_key)
333 self.remove(archive_key)
331 removed_items += 1
334 removed_items += 1
332 removed_size += size
335 removed_size += size
333 removal_info['removed_items'] = removed_items
336 removal_info['removed_items'] = removed_items
334 removal_info['removed_size'] = removed_size
337 removal_info['removed_size'] = removed_size
335 log.debug('Removed %s cache archives, and reduced size by: %s',
338 log.debug('Removed %s cache archives, and reduced size by: %s',
336 removed_items, format_size(removed_size))
339 removed_items, format_size(removed_size))
337 return removal_info
340 return removal_info
338
341
339 def get_statistics(self):
342 def get_statistics(self):
340 total_files = 0
343 total_files = 0
341 total_size = 0
344 total_size = 0
342 meta = {}
345 meta = {}
343
346
344 for shard, key_file in self.iter_keys():
347 for shard, key_file in self.iter_keys():
345 json_key = f"{shard.storage_medium}/{key_file}"
348 json_key = f"{shard.storage_medium}/{key_file}"
346 with shard.fs.open(json_key, 'rb') as f:
349 with shard.fs.open(json_key, 'rb') as f:
347 total_files += 1
350 total_files += 1
348 metadata = json.loads(f.read())
351 metadata = json.loads(f.read())
349 total_size += metadata['size']
352 total_size += metadata['size']
350
353
351 return total_files, total_size, meta
354 return total_files, total_size, meta
352
355
@@ -1,167 +1,174 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23
23
24 import fsspec
24 import fsspec
25
25
26 from .base import BaseCache, BaseShard
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
28 from ...type_utils import str2bool
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class FileSystemShard(BaseShard):
33 class FileSystemShard(BaseShard):
34
34
35 def __init__(self, index, directory, **settings):
35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 self._index = index
36 self._index = index
37 self._directory = directory
37 self._directory = directory
38 self._directory_folder = directory_folder
38 self.storage_type = 'directory'
39 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
40
41 self.fs = fs
40
42
41 @property
43 @property
42 def directory(self):
44 def directory(self):
43 """Cache directory."""
45 """Cache directory final path."""
44 return self._directory
46 return os.path.join(self._directory, self._directory_folder)
45
47
46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 key_file = f'{archive_key}.{self.key_suffix}'
49 key_file = f'{archive_key}.{self.key_suffix}'
48 return key_file, os.path.join(self.directory, key_file)
50 return key_file, os.path.join(self.directory, key_file)
49
51
50 def _get_writer(self, path, mode):
52 def _get_writer(self, path, mode):
51 for count in range(1, 11):
53 for count in range(1, 11):
52 try:
54 try:
53 # Another cache may have deleted the directory before
55 # Another cache may have deleted the directory before
54 # the file could be opened.
56 # the file could be opened.
55 return self.fs.open(path, mode)
57 return self.fs.open(path, mode)
56 except OSError:
58 except OSError:
57 if count == 10:
59 if count == 10:
58 # Give up after 10 tries to open the file.
60 # Give up after 10 tries to open the file.
59 raise
61 raise
60 continue
62 continue
61
63
62 def _write_file(self, full_path, iterator, mode):
64 def _write_file(self, full_path, iterator, mode):
63 # ensure dir exists
65 # ensure dir exists
64 destination, _ = os.path.split(full_path)
66 destination, _ = os.path.split(full_path)
65 if not self.fs.exists(destination):
67 if not self.fs.exists(destination):
66 self.fs.makedirs(destination)
68 self.fs.makedirs(destination)
67
69
68 writer = self._get_writer(full_path, mode)
70 writer = self._get_writer(full_path, mode)
69
71
70 digest = hashlib.sha256()
72 digest = hashlib.sha256()
71 with writer:
73 with writer:
72 size = 0
74 size = 0
73 for chunk in iterator:
75 for chunk in iterator:
74 size += len(chunk)
76 size += len(chunk)
75 digest.update(chunk)
77 digest.update(chunk)
76 writer.write(chunk)
78 writer.write(chunk)
77 writer.flush()
79 writer.flush()
78 # Get the file descriptor
80 # Get the file descriptor
79 fd = writer.fileno()
81 fd = writer.fileno()
80
82
81 # Sync the file descriptor to disk, helps with NFS cases...
83 # Sync the file descriptor to disk, helps with NFS cases...
82 os.fsync(fd)
84 os.fsync(fd)
83 sha256 = digest.hexdigest()
85 sha256 = digest.hexdigest()
84 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
86 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
85 return size, sha256
87 return size, sha256
86
88
87 def store(self, key, value_reader, metadata: dict | None = None):
89 def store(self, key, value_reader, metadata: dict | None = None):
88 return self._store(key, value_reader, metadata, mode='xb')
90 return self._store(key, value_reader, metadata, mode='xb')
89
91
90 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
91 return self._fetch(key, retry, retry_attempts, retry_backoff)
93 return self._fetch(key, retry, retry_attempts, retry_backoff)
92
94
93 def remove(self, key):
95 def remove(self, key):
94 return self._remove(key)
96 return self._remove(key)
95
97
96 def random_filename(self):
98 def random_filename(self):
97 """Return filename and full-path tuple for file storage.
99 """Return filename and full-path tuple for file storage.
98
100
99 Filename will be a randomly generated 28 character hexadecimal string
101 Filename will be a randomly generated 28 character hexadecimal string
100 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
102 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
101 reduce the size of directories. On older filesystems, lookups in
103 reduce the size of directories. On older filesystems, lookups in
102 directories with many files may be slow.
104 directories with many files may be slow.
103 """
105 """
104
106
105 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
107 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
106
108
107 archive_name = hex_name[4:] + '.archive_cache'
109 archive_name = hex_name[4:] + '.archive_cache'
108 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
110 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
109
111
110 full_path = os.path.join(self.directory, filename)
112 full_path = os.path.join(self.directory, filename)
111 return archive_name, full_path
113 return archive_name, full_path
112
114
113 def __repr__(self):
115 def __repr__(self):
114 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
116 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
115
117
116
118
117 class FileSystemFanoutCache(BaseCache):
119 class FileSystemFanoutCache(BaseCache):
118 shard_name = 'shard_%03d'
120 shard_name = 'shard_%03d'
121 shard_cls = FileSystemShard
119
122
120 def __init__(self, locking_url, **settings):
123 def __init__(self, locking_url, **settings):
121 """
124 """
122 Initialize file system cache instance.
125 Initialize file system cache instance.
123
126
124 :param str locking_url: redis url for a lock
127 :param str locking_url: redis url for a lock
125 :param settings: settings dict
128 :param settings: settings dict
126
129
127 """
130 """
128 self._locking_url = locking_url
131 self._locking_url = locking_url
129 self._config = settings
132 self._config = settings
130 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
133 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
131 directory = str(cache_dir)
134 directory = str(cache_dir)
132 directory = os.path.expanduser(directory)
135 directory = os.path.expanduser(directory)
133 directory = os.path.expandvars(directory)
136 directory = os.path.expandvars(directory)
134 self._directory = directory
137 self._directory = directory
135 self._storage_path = directory
138 self._storage_path = directory # common path for all from BaseCache
136
139
137 # check if it's ok to write, and re-create the archive cache
140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
138 if not os.path.isdir(self._directory):
141 if self._shard_count < 1:
139 os.makedirs(self._directory, exist_ok=True)
142 raise ValueError('cache_shards must be 1 or more')
140
141 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
142
143
143 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
144 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
145
146
146 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
147 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
147 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
148 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
149
150
150 log.debug('Initializing archival cache instance under %s', self._directory)
151 log.debug('Initializing %s archival cache instance under %s', self)
152 fs = fsspec.filesystem('file')
153 # check if it's ok to write, and re-create the archive cache main dir
154 # A directory is the virtual equivalent of a physical file cabinet.
155 # In other words, it's a container for organizing digital data.
156 # Unlike a folder, which can only store files, a directory can store files,
157 # subdirectories, and other directories.
158 if not fs.exists(self._directory):
159 fs.makedirs(self._directory, exist_ok=True)
160
151 self._shards = tuple(
161 self._shards = tuple(
152 FileSystemShard(
162 self.shard_cls(
153 index=num,
163 index=num,
154 directory=os.path.join(directory, self.shard_name % num),
164 directory=directory,
165 directory_folder=self.shard_name % num,
166 fs=fs,
155 **settings,
167 **settings,
156 )
168 )
157 for num in range(self._count)
169 for num in range(self._shard_count)
158 )
170 )
159 self._hash = self._shards[0].hash
171 self._hash = self._shards[0].hash
160
172
161 def _get_shard(self, key) -> FileSystemShard:
162 index = self._hash(key) % self._count
163 shard = self._shards[index]
164 return shard
165
166 def _get_size(self, shard, archive_path):
173 def _get_size(self, shard, archive_path):
167 return os.stat(archive_path).st_size
174 return os.stat(archive_path).st_size
@@ -1,158 +1,164 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23
23
24 import fsspec
24 import fsspec
25
25
26 from .base import BaseCache, BaseShard
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
28 from ...type_utils import str2bool
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class S3Shard(BaseShard):
33 class S3Shard(BaseShard):
34
34
35 def __init__(self, index, bucket, **settings):
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 self._index = index
36 self._index = index
37 self._bucket = bucket
37 self._bucket_folder = bucket_folder
38 self.storage_type = 'bucket'
38 self.storage_type = 'bucket'
39 self._bucket_main = bucket
39
40
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 self.fs = fs
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
44 # TODO: Add it all over the place...
45 self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root')
46
47 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
48
42
49 @property
43 @property
50 def bucket(self):
44 def bucket(self):
51 """Cache bucket."""
45 """Cache bucket final path."""
52 return os.path.join(self._bucket_root, self._bucket)
46 return os.path.join(self._bucket_main, self._bucket_folder)
53
47
54 def _get_keyfile(self, archive_key) -> tuple[str, str]:
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
55 key_file = f'{archive_key}-{self.key_suffix}'
49 key_file = f'{archive_key}-{self.key_suffix}'
56 return key_file, os.path.join(self.bucket, key_file)
50 return key_file, os.path.join(self.bucket, key_file)
57
51
58 def _get_writer(self, path, mode):
52 def _get_writer(self, path, mode):
59 return self.fs.open(path, 'wb')
53 return self.fs.open(path, 'wb')
60
54
61 def _write_file(self, full_path, iterator, mode):
55 def _write_file(self, full_path, iterator, mode):
62 if self._bucket_root:
63 if not self.fs.exists(self._bucket_root):
64 self.fs.mkdir(self._bucket_root)
65
56
66 # ensure bucket exists
57 # ensure folder in bucket exists
67 destination = self.bucket
58 destination = self.bucket
68 if not self.fs.exists(destination):
59 if not self.fs.exists(destination):
69 self.fs.mkdir(destination, s3_additional_kwargs={})
60 self.fs.mkdir(destination, s3_additional_kwargs={})
70
61
71 writer = self._get_writer(full_path, mode)
62 writer = self._get_writer(full_path, mode)
72
63
73 digest = hashlib.sha256()
64 digest = hashlib.sha256()
74 with writer:
65 with writer:
75 size = 0
66 size = 0
76 for chunk in iterator:
67 for chunk in iterator:
77 size += len(chunk)
68 size += len(chunk)
78 digest.update(chunk)
69 digest.update(chunk)
79 writer.write(chunk)
70 writer.write(chunk)
80
71
81 sha256 = digest.hexdigest()
72 sha256 = digest.hexdigest()
82 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
73 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
83 return size, sha256
74 return size, sha256
84
75
85 def store(self, key, value_reader, metadata: dict | None = None):
76 def store(self, key, value_reader, metadata: dict | None = None):
86 return self._store(key, value_reader, metadata, mode='wb')
77 return self._store(key, value_reader, metadata, mode='wb')
87
78
88 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
89 return self._fetch(key, retry, retry_attempts, retry_backoff)
80 return self._fetch(key, retry, retry_attempts, retry_backoff)
90
81
91 def remove(self, key):
82 def remove(self, key):
92 return self._remove(key)
83 return self._remove(key)
93
84
94 def random_filename(self):
85 def random_filename(self):
95 """Return filename and full-path tuple for file storage.
86 """Return filename and full-path tuple for file storage.
96
87
97 Filename will be a randomly generated 28 character hexadecimal string
88 Filename will be a randomly generated 28 character hexadecimal string
98 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
89 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
99 reduce the size of directories. On older filesystems, lookups in
90 reduce the size of directories. On older filesystems, lookups in
100 directories with many files may be slow.
91 directories with many files may be slow.
101 """
92 """
102
93
103 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
94 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
104
95
105 archive_name = hex_name[4:] + '.archive_cache'
96 archive_name = hex_name[4:] + '.archive_cache'
106 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
97 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
107
98
108 full_path = os.path.join(self.bucket, filename)
99 full_path = os.path.join(self.bucket, filename)
109 return archive_name, full_path
100 return archive_name, full_path
110
101
111 def __repr__(self):
102 def __repr__(self):
112 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
103 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
113
104
114
105
115 class ObjectStoreCache(BaseCache):
106 class ObjectStoreCache(BaseCache):
116 shard_name = 'shard-bucket-%03d'
107 shard_name = 'shard-%03d'
108 shard_cls = S3Shard
117
109
118 def __init__(self, locking_url, **settings):
110 def __init__(self, locking_url, **settings):
119 """
111 """
120 Initialize objectstore cache instance.
112 Initialize objectstore cache instance.
121
113
122 :param str locking_url: redis url for a lock
114 :param str locking_url: redis url for a lock
123 :param settings: settings dict
115 :param settings: settings dict
124
116
125 """
117 """
126 self._locking_url = locking_url
118 self._locking_url = locking_url
127 self._config = settings
119 self._config = settings
128
120
129 objectstore_url = self.get_conf('archive_cache.objectstore.url')
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
130 self._storage_path = objectstore_url
122 self._storage_path = objectstore_url # common path for all from BaseCache
131
123
132 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 if self._shard_count < 1:
126 raise ValueError('cache_shards must be 1 or more')
127
128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 if not self._bucket:
130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
133
131
134 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
135 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
136
134
137 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
135 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
138 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
139 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
140
138
141 log.debug('Initializing archival cache instance under %s', objectstore_url)
139 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 key = settings.pop('archive_cache.objectstore.key')
141 secret = settings.pop('archive_cache.objectstore.secret')
142
143 log.debug('Initializing %s archival cache instance under %s', self)
144
145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146
147 # init main bucket
148 if not fs.exists(self._bucket):
149 fs.mkdir(self._bucket)
150
142 self._shards = tuple(
151 self._shards = tuple(
143 S3Shard(
152 self.shard_cls(
144 index=num,
153 index=num,
145 bucket=self.shard_name % num,
154 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
156 fs=fs,
146 **settings,
157 **settings,
147 )
158 )
148 for num in range(self._count)
159 for num in range(self._shard_count)
149 )
160 )
150 self._hash = self._shards[0].hash
161 self._hash = self._shards[0].hash
151
162
152 def _get_shard(self, key) -> S3Shard:
153 index = self._hash(key) % self._count
154 shard = self._shards[index]
155 return shard
156
157 def _get_size(self, shard, archive_path):
163 def _get_size(self, shard, archive_path):
158 return shard.fs.info(archive_path)['size']
164 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now