##// END OF EJS Templates
archive-cache: synced with CE codebase
super-admin -
r1259:507df4ab default
parent child Browse files
Show More
@@ -1,355 +1,372 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import os
19 import os
20 import functools
20 import functools
21 import logging
21 import logging
22 import typing
22 import typing
23 import time
23 import time
24 import zlib
24 import zlib
25
25
26 from ...ext_json import json
26 from ...ext_json import json
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 from ..lock import GenerationLock
28 from ..lock import GenerationLock
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class BaseShard:
33 class BaseShard:
34 storage_type: str = ''
34 storage_type: str = ''
35 fs = None
35 fs = None
36
36
37 @classmethod
37 @classmethod
38 def hash(cls, key):
38 def hash(cls, key):
39 """Compute portable hash for `key`.
39 """Compute portable hash for `key`.
40
40
41 :param key: key to hash
41 :param key: key to hash
42 :return: hash value
42 :return: hash value
43
43
44 """
44 """
45 mask = 0xFFFFFFFF
45 mask = 0xFFFFFFFF
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47
47
48 def _write_file(self, full_path, read_iterator, mode):
48 def _write_file(self, full_path, read_iterator, mode):
49 raise NotImplementedError
49 raise NotImplementedError
50
50
51 def _get_keyfile(self, key):
51 def _get_keyfile(self, key):
52 raise NotImplementedError
52 raise NotImplementedError
53
53
54 def random_filename(self):
54 def random_filename(self):
55 raise NotImplementedError
55 raise NotImplementedError
56
56
57 def store(self, *args, **kwargs):
58 raise NotImplementedError
59
57 def _store(self, key, value_reader, metadata, mode):
60 def _store(self, key, value_reader, metadata, mode):
58 (filename, # hash-name
61 (filename, # hash-name
59 full_path # full-path/hash-name
62 full_path # full-path/hash-name
60 ) = self.random_filename()
63 ) = self.random_filename()
61
64
62 key_file, key_file_path = self._get_keyfile(key)
65 key_file, key_file_path = self._get_keyfile(key)
63
66
64 # STORE METADATA
67 # STORE METADATA
65 _metadata = {
68 _metadata = {
66 "version": "v1",
69 "version": "v1",
67
70
68 "key_file": key_file, # this is the .key.json file storing meta
71 "key_file": key_file, # this is the .key.json file storing meta
69 "key_file_path": key_file_path, # full path to key_file
72 "key_file_path": key_file_path, # full path to key_file
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
73 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 "archive_filename": filename, # the actual filename we stored that file under
74 "archive_filename": filename, # the actual filename we stored that file under
72 "archive_full_path": full_path,
75 "archive_full_path": full_path,
73
76
74 "store_time": time.time(),
77 "store_time": time.time(),
75 "access_count": 0,
78 "access_count": 0,
76 "access_time": 0,
79 "access_time": 0,
77
80
78 "size": 0
81 "size": 0
79 }
82 }
80 if metadata:
83 if metadata:
81 _metadata.update(metadata)
84 _metadata.update(metadata)
82
85
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
86 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
87 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 _metadata['size'] = size
88 _metadata['size'] = size
86 _metadata['sha256'] = sha256
89 _metadata['sha256'] = sha256
87
90
88 # after archive is finished, we create a key to save the presence of the binary file
91 # after archive is finished, we create a key to save the presence of the binary file
89 with self.fs.open(key_file_path, 'wb') as f:
92 with self.fs.open(key_file_path, 'wb') as f:
90 f.write(json.dumps(_metadata))
93 f.write(json.dumps(_metadata))
91
94
92 return key, filename, size, _metadata
95 return key, filename, size, _metadata
93
96
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
97 def fetch(self, *args, **kwargs):
98 raise NotImplementedError
99
100 def _fetch(self, key, retry, retry_attempts, retry_backoff,
101 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
95 if retry is NOT_GIVEN:
102 if retry is NOT_GIVEN:
96 retry = False
103 retry = False
97 if retry_attempts is NOT_GIVEN:
104 if retry_attempts is NOT_GIVEN:
98 retry_attempts = 0
105 retry_attempts = 0
99
106
100 if retry and retry_attempts > 0:
107 if retry and retry_attempts > 0:
101 for attempt in range(1, retry_attempts + 1):
108 for attempt in range(1, retry_attempts + 1):
102 if key in self:
109 if key in self:
103 break
110 break
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
111 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 time.sleep(retry_backoff)
112 time.sleep(retry_backoff)
106
113
107 if key not in self:
114 if key not in self:
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
115 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 raise KeyError(key)
116 raise KeyError(key)
110
117
111 key_file, key_file_path = self._get_keyfile(key)
118 key_file, key_file_path = self._get_keyfile(key)
112 with self.fs.open(key_file_path, 'rb') as f:
119 with self.fs.open(key_file_path, 'rb') as f:
113 metadata = json.loads(f.read())
120 metadata = json.loads(f.read())
114
121
115 archive_path = metadata['archive_full_path']
122 archive_path = metadata['archive_full_path']
123 if presigned_url_expires and presigned_url_expires > 0:
124 metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires)
116
125
117 try:
126 try:
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
127 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 finally:
128 finally:
120 # update usage stats, count and accessed
129 # update usage stats, count and accessed
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
130 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 metadata["access_time"] = time.time()
131 metadata["access_time"] = time.time()
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
132 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 key_file, metadata['access_count'], metadata['access_time'])
133 key_file, metadata['access_count'], metadata['access_time'])
125 with self.fs.open(key_file_path, 'wb') as f:
134 with self.fs.open(key_file_path, 'wb') as f:
126 f.write(json.dumps(metadata))
135 f.write(json.dumps(metadata))
127
136
137 def remove(self, *args, **kwargs):
138 raise NotImplementedError
139
128 def _remove(self, key):
140 def _remove(self, key):
129 if key not in self:
141 if key not in self:
130 log.exception(f'requested key={key} not found in {self}')
142 log.exception(f'requested key={key} not found in {self}')
131 raise KeyError(key)
143 raise KeyError(key)
132
144
133 key_file, key_file_path = self._get_keyfile(key)
145 key_file, key_file_path = self._get_keyfile(key)
134 with self.fs.open(key_file_path, 'rb') as f:
146 with self.fs.open(key_file_path, 'rb') as f:
135 metadata = json.loads(f.read())
147 metadata = json.loads(f.read())
136
148
137 archive_path = metadata['archive_full_path']
149 archive_path = metadata['archive_full_path']
138 self.fs.rm(archive_path)
150 self.fs.rm(archive_path)
139 self.fs.rm(key_file_path)
151 self.fs.rm(key_file_path)
140 return 1
152 return 1
141
153
142 @property
154 @property
143 def storage_medium(self):
155 def storage_medium(self):
144 return getattr(self, self.storage_type)
156 return getattr(self, self.storage_type)
145
157
146 @property
158 @property
147 def key_suffix(self):
159 def key_suffix(self):
148 return 'key.json'
160 return 'key.json'
149
161
150 def __contains__(self, key):
162 def __contains__(self, key):
151 """Return `True` if `key` matching item is found in cache.
163 """Return `True` if `key` matching item is found in cache.
152
164
153 :param key: key matching item
165 :param key: key matching item
154 :return: True if key matching item
166 :return: True if key matching item
155
167
156 """
168 """
157 key_file, key_file_path = self._get_keyfile(key)
169 key_file, key_file_path = self._get_keyfile(key)
158 return self.fs.exists(key_file_path)
170 return self.fs.exists(key_file_path)
159
171
160
172
161 class BaseCache:
173 class BaseCache:
162 _locking_url: str = ''
174 _locking_url: str = ''
163 _storage_path: str = ''
175 _storage_path: str = ''
164 _config = {}
176 _config: dict = {}
165 retry = False
177 retry = False
166 retry_attempts = 0
178 retry_attempts: int = 0
167 retry_backoff = 1
179 retry_backoff: int | float = 1
168 _shards = tuple()
180 _shards = tuple()
169 shard_cls = BaseShard
181 shard_cls = BaseShard
182 # define the presigned url expiration, 0 == disabled
183 presigned_url_expires: int = 0
170
184
171 def __contains__(self, key):
185 def __contains__(self, key):
172 """Return `True` if `key` matching item is found in cache.
186 """Return `True` if `key` matching item is found in cache.
173
187
174 :param key: key matching item
188 :param key: key matching item
175 :return: True if key matching item
189 :return: True if key matching item
176
190
177 """
191 """
178 return self.has_key(key)
192 return self.has_key(key)
179
193
180 def __repr__(self):
194 def __repr__(self):
181 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
195 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
182
196
183 @classmethod
197 @classmethod
184 def gb_to_bytes(cls, gb):
198 def gb_to_bytes(cls, gb):
185 return gb * (1024 ** 3)
199 return gb * (1024 ** 3)
186
200
187 @property
201 @property
188 def storage_path(self):
202 def storage_path(self):
189 return self._storage_path
203 return self._storage_path
190
204
191 @classmethod
205 @classmethod
192 def get_stats_db(cls):
206 def get_stats_db(cls):
193 return StatsDB()
207 return StatsDB()
194
208
195 def get_conf(self, key, pop=False):
209 def get_conf(self, key, pop=False):
196 if key not in self._config:
210 if key not in self._config:
197 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
211 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
198 val = self._config[key]
212 val = self._config[key]
199 if pop:
213 if pop:
200 del self._config[key]
214 del self._config[key]
201 return val
215 return val
202
216
203 def _get_shard(self, key) -> shard_cls:
217 def _get_shard(self, key) -> shard_cls:
204 index = self._hash(key) % self._shard_count
218 index = self._hash(key) % self._shard_count
205 shard = self._shards[index]
219 shard = self._shards[index]
206 return shard
220 return shard
207
221
208 def _get_size(self, shard, archive_path):
222 def _get_size(self, shard, archive_path):
209 raise NotImplementedError
223 raise NotImplementedError
210
224
211 def store(self, key, value_reader, metadata=None):
225 def store(self, key, value_reader, metadata=None):
212 shard = self._get_shard(key)
226 shard = self._get_shard(key)
213 return shard.store(key, value_reader, metadata)
227 return shard.store(key, value_reader, metadata)
214
228
215 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
229 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
216 """
230 """
217 Return file handle corresponding to `key` from specific shard cache.
231 Return file handle corresponding to `key` from specific shard cache.
218 """
232 """
219 if retry is NOT_GIVEN:
233 if retry is NOT_GIVEN:
220 retry = self.retry
234 retry = self.retry
221 if retry_attempts is NOT_GIVEN:
235 if retry_attempts is NOT_GIVEN:
222 retry_attempts = self.retry_attempts
236 retry_attempts = self.retry_attempts
223 retry_backoff = self.retry_backoff
237 retry_backoff = self.retry_backoff
238 presigned_url_expires = self.presigned_url_expires
224
239
225 shard = self._get_shard(key)
240 shard = self._get_shard(key)
226 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
241 return shard.fetch(key, retry=retry,
242 retry_attempts=retry_attempts,
243 retry_backoff=retry_backoff,
244 presigned_url_expires=presigned_url_expires)
227
245
228 def remove(self, key):
246 def remove(self, key):
229 shard = self._get_shard(key)
247 shard = self._get_shard(key)
230 return shard.remove(key)
248 return shard.remove(key)
231
249
232 def has_key(self, archive_key):
250 def has_key(self, archive_key):
233 """Return `True` if `key` matching item is found in cache.
251 """Return `True` if `key` matching item is found in cache.
234
252
235 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
253 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
236 :return: True if key is found
254 :return: True if key is found
237
255
238 """
256 """
239 shard = self._get_shard(archive_key)
257 shard = self._get_shard(archive_key)
240 return archive_key in shard
258 return archive_key in shard
241
259
242 def iter_keys(self):
260 def iter_keys(self):
243 for shard in self._shards:
261 for shard in self._shards:
244 if shard.fs.exists(shard.storage_medium):
262 if shard.fs.exists(shard.storage_medium):
245 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
263 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
246 for key_file_path in _files:
264 for key_file_path in _files:
247 if key_file_path.endswith(shard.key_suffix):
265 if key_file_path.endswith(shard.key_suffix):
248 yield shard, key_file_path
266 yield shard, key_file_path
249
267
250 def get_lock(self, lock_key):
268 def get_lock(self, lock_key):
251 return GenerationLock(lock_key, self._locking_url)
269 return GenerationLock(lock_key, self._locking_url)
252
270
253 def evict(self, policy=None, size_limit=None) -> dict:
271 def evict(self, policy=None, size_limit=None) -> dict:
254 """
272 """
255 Remove old items based on the conditions
273 Remove old items based on the conditions
256
274
257
275
258 explanation of this algo:
276 explanation of this algo:
259 iterate over each shard, then for each shard iterate over the .key files
277 iterate over each shard, then for each shard iterate over the .key files
260 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
278 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
261 access data, time creation, and access counts.
279 access data, time creation, and access counts.
262
280
263 Store that into a memory DB in order we can run different sorting strategies easily.
281 Store that into a memory DB in order we can run different sorting strategies easily.
264 Summing the size is a sum sql query.
282 Summing the size is a sum sql query.
265
283
266 Then we run a sorting strategy based on eviction policy.
284 Then we run a sorting strategy based on eviction policy.
267 We iterate over sorted keys, and remove each checking if we hit the overall limit.
285 We iterate over sorted keys, and remove each checking if we hit the overall limit.
268 """
286 """
269 removal_info = {
287 removal_info = {
270 "removed_items": 0,
288 "removed_items": 0,
271 "removed_size": 0
289 "removed_size": 0
272 }
290 }
273 policy = policy or self._eviction_policy
291 policy = policy or self._eviction_policy
274 size_limit = size_limit or self._cache_size_limit
292 size_limit = size_limit or self._cache_size_limit
275
293
276 select_policy = EVICTION_POLICY[policy]['evict']
294 select_policy = EVICTION_POLICY[policy]['evict']
277
295
278 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
296 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
279 policy, format_size(size_limit))
297 policy, format_size(size_limit))
280
298
281 if select_policy is None:
299 if select_policy is None:
282 return removal_info
300 return removal_info
283
301
284 db = self.get_stats_db()
302 db = self.get_stats_db()
285
303
286 data = []
304 data = []
287 cnt = 1
305 cnt = 1
288
306
289 for shard, key_file in self.iter_keys():
307 for shard, key_file in self.iter_keys():
290 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
308 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
291 metadata = json.loads(f.read())
309 metadata = json.loads(f.read())
292
310
293 key_file_path = os.path.join(shard.storage_medium, key_file)
311 key_file_path = os.path.join(shard.storage_medium, key_file)
294
312
295 archive_key = metadata['archive_key']
313 archive_key = metadata['archive_key']
296 archive_path = metadata['archive_full_path']
314 archive_path = metadata['archive_full_path']
297
315
298 size = metadata.get('size')
316 size = metadata.get('size')
299 if not size:
317 if not size:
300 # in case we don't have size re-calc it...
318 # in case we don't have size re-calc it...
301 size = self._get_size(shard, archive_path)
319 size = self._get_size(shard, archive_path)
302
320
303 data.append([
321 data.append([
304 cnt,
322 cnt,
305 key_file,
323 key_file,
306 key_file_path,
324 key_file_path,
307 archive_key,
325 archive_key,
308 archive_path,
326 archive_path,
309 metadata.get('store_time', 0),
327 metadata.get('store_time', 0),
310 metadata.get('access_time', 0),
328 metadata.get('access_time', 0),
311 metadata.get('access_count', 0),
329 metadata.get('access_count', 0),
312 size,
330 size,
313 ])
331 ])
314 cnt += 1
332 cnt += 1
315
333
316 # Insert bulk data using executemany
334 # Insert bulk data using executemany
317 db.bulk_insert(data)
335 db.bulk_insert(data)
318
336
319 total_size = db.get_total_size()
337 total_size = db.get_total_size()
320 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
338 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
321 len(data), format_size(total_size), format_size(size_limit))
339 len(data), format_size(total_size), format_size(size_limit))
322
340
323 removed_items = 0
341 removed_items = 0
324 removed_size = 0
342 removed_size = 0
325 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
343 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
326 # simulate removal impact BEFORE removal
344 # simulate removal impact BEFORE removal
327 total_size -= size
345 total_size -= size
328
346
329 if total_size <= size_limit:
347 if total_size <= size_limit:
330 # we obtained what we wanted...
348 # we obtained what we wanted...
331 break
349 break
332
350
333 self.remove(archive_key)
351 self.remove(archive_key)
334 removed_items += 1
352 removed_items += 1
335 removed_size += size
353 removed_size += size
336 removal_info['removed_items'] = removed_items
354 removal_info['removed_items'] = removed_items
337 removal_info['removed_size'] = removed_size
355 removal_info['removed_size'] = removed_size
338 log.debug('Removed %s cache archives, and reduced size by: %s',
356 log.debug('Removed %s cache archives, and reduced size by: %s',
339 removed_items, format_size(removed_size))
357 removed_items, format_size(removed_size))
340 return removal_info
358 return removal_info
341
359
342 def get_statistics(self):
360 def get_statistics(self):
343 total_files = 0
361 total_files = 0
344 total_size = 0
362 total_size = 0
345 meta = {}
363 meta = {}
346
364
347 for shard, key_file in self.iter_keys():
365 for shard, key_file in self.iter_keys():
348 json_key = f"{shard.storage_medium}/{key_file}"
366 json_key = f"{shard.storage_medium}/{key_file}"
349 with shard.fs.open(json_key, 'rb') as f:
367 with shard.fs.open(json_key, 'rb') as f:
350 total_files += 1
368 total_files += 1
351 metadata = json.loads(f.read())
369 metadata = json.loads(f.read())
352 total_size += metadata['size']
370 total_size += metadata['size']
353
371
354 return total_files, total_size, meta
372 return total_files, total_size, meta
355
@@ -1,174 +1,177 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23 import typing
23
24
24 import fsspec
25 import fsspec
25
26
26 from .base import BaseCache, BaseShard
27 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
29 from ...type_utils import str2bool
29
30
30 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
31
32
32
33
33 class FileSystemShard(BaseShard):
34 class FileSystemShard(BaseShard):
34
35
35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 def __init__(self, index, directory, directory_folder, fs, **settings):
36 self._index = index
37 self._index: int = index
37 self._directory = directory
38 self._directory: str = directory
38 self._directory_folder = directory_folder
39 self._directory_folder: str = directory_folder
39 self.storage_type = 'directory'
40 self.storage_type: str = 'directory'
40
41
41 self.fs = fs
42 self.fs = fs
42
43
43 @property
44 @property
44 def directory(self):
45 def directory(self) -> str:
45 """Cache directory final path."""
46 """Cache directory final path."""
46 return os.path.join(self._directory, self._directory_folder)
47 return os.path.join(self._directory, self._directory_folder)
47
48
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}.{self.key_suffix}'
50 key_file: str = f'{archive_key}.{self.key_suffix}'
50 return key_file, os.path.join(self.directory, key_file)
51 return key_file, os.path.join(self.directory, key_file)
51
52
52 def _get_writer(self, path, mode):
53 def _get_writer(self, path, mode):
53 for count in range(1, 11):
54 for count in range(1, 11):
54 try:
55 try:
55 # Another cache may have deleted the directory before
56 # Another cache may have deleted the directory before
56 # the file could be opened.
57 # the file could be opened.
57 return self.fs.open(path, mode)
58 return self.fs.open(path, mode)
58 except OSError:
59 except OSError:
59 if count == 10:
60 if count == 10:
60 # Give up after 10 tries to open the file.
61 # Give up after 10 tries to open the file.
61 raise
62 raise
62 continue
63 continue
63
64
64 def _write_file(self, full_path, iterator, mode):
65 def _write_file(self, full_path, iterator, mode):
66
65 # ensure dir exists
67 # ensure dir exists
66 destination, _ = os.path.split(full_path)
68 destination, _ = os.path.split(full_path)
67 if not self.fs.exists(destination):
69 if not self.fs.exists(destination):
68 self.fs.makedirs(destination)
70 self.fs.makedirs(destination)
69
71
70 writer = self._get_writer(full_path, mode)
72 writer = self._get_writer(full_path, mode)
71
73
72 digest = hashlib.sha256()
74 digest = hashlib.sha256()
73 with writer:
75 with writer:
74 size = 0
76 size = 0
75 for chunk in iterator:
77 for chunk in iterator:
76 size += len(chunk)
78 size += len(chunk)
77 digest.update(chunk)
79 digest.update(chunk)
78 writer.write(chunk)
80 writer.write(chunk)
79 writer.flush()
81 writer.flush()
80 # Get the file descriptor
82 # Get the file descriptor
81 fd = writer.fileno()
83 fd = writer.fileno()
82
84
83 # Sync the file descriptor to disk, helps with NFS cases...
85 # Sync the file descriptor to disk, helps with NFS cases...
84 os.fsync(fd)
86 os.fsync(fd)
85 sha256 = digest.hexdigest()
87 sha256 = digest.hexdigest()
86 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
88 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
87 return size, sha256
89 return size, sha256
88
90
89 def store(self, key, value_reader, metadata: dict | None = None):
91 def store(self, key, value_reader, metadata: dict | None = None):
90 return self._store(key, value_reader, metadata, mode='xb')
92 return self._store(key, value_reader, metadata, mode='xb')
91
93
92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
94 def fetch(self, key, retry=NOT_GIVEN,
95 retry_attempts=NOT_GIVEN, retry_backoff=1, **kwargs) -> tuple[ShardFileReader, dict]:
93 return self._fetch(key, retry, retry_attempts, retry_backoff)
96 return self._fetch(key, retry, retry_attempts, retry_backoff)
94
97
95 def remove(self, key):
98 def remove(self, key):
96 return self._remove(key)
99 return self._remove(key)
97
100
98 def random_filename(self):
101 def random_filename(self):
99 """Return filename and full-path tuple for file storage.
102 """Return filename and full-path tuple for file storage.
100
103
101 Filename will be a randomly generated 28 character hexadecimal string
104 Filename will be a randomly generated 28 character hexadecimal string
102 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
105 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
103 reduce the size of directories. On older filesystems, lookups in
106 reduce the size of directories. On older filesystems, lookups in
104 directories with many files may be slow.
107 directories with many files may be slow.
105 """
108 """
106
109
107 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
110 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
108
111
109 archive_name = hex_name[4:] + '.archive_cache'
112 archive_name = hex_name[4:] + '.archive_cache'
110 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
113 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
111
114
112 full_path = os.path.join(self.directory, filename)
115 full_path = os.path.join(self.directory, filename)
113 return archive_name, full_path
116 return archive_name, full_path
114
117
115 def __repr__(self):
118 def __repr__(self):
116 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
119 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
117
120
118
121
119 class FileSystemFanoutCache(BaseCache):
122 class FileSystemFanoutCache(BaseCache):
120 shard_name = 'shard_%03d'
123 shard_name: str = 'shard_{:03d}'
121 shard_cls = FileSystemShard
124 shard_cls = FileSystemShard
122
125
123 def __init__(self, locking_url, **settings):
126 def __init__(self, locking_url, **settings):
124 """
127 """
125 Initialize file system cache instance.
128 Initialize file system cache instance.
126
129
127 :param str locking_url: redis url for a lock
130 :param str locking_url: redis url for a lock
128 :param settings: settings dict
131 :param settings: settings dict
129
132
130 """
133 """
131 self._locking_url = locking_url
134 self._locking_url = locking_url
132 self._config = settings
135 self._config = settings
133 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
136 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
134 directory = str(cache_dir)
137 directory = str(cache_dir)
135 directory = os.path.expanduser(directory)
138 directory = os.path.expanduser(directory)
136 directory = os.path.expandvars(directory)
139 directory = os.path.expandvars(directory)
137 self._directory = directory
140 self._directory = directory
138 self._storage_path = directory # common path for all from BaseCache
141 self._storage_path = directory # common path for all from BaseCache
139
142
140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
143 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 if self._shard_count < 1:
144 if self._shard_count < 1:
142 raise ValueError('cache_shards must be 1 or more')
145 raise ValueError('cache_shards must be 1 or more')
143
146
144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
147 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
148 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
146
149
147 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
150 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
151 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
152 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
150
153
151 log.debug('Initializing %s archival cache instance', self)
154 log.debug('Initializing %s archival cache instance', self)
152 fs = fsspec.filesystem('file')
155 fs = fsspec.filesystem('file')
153 # check if it's ok to write, and re-create the archive cache main dir
156 # check if it's ok to write, and re-create the archive cache main dir
154 # A directory is the virtual equivalent of a physical file cabinet.
157 # A directory is the virtual equivalent of a physical file cabinet.
155 # In other words, it's a container for organizing digital data.
158 # In other words, it's a container for organizing digital data.
156 # Unlike a folder, which can only store files, a directory can store files,
159 # Unlike a folder, which can only store files, a directory can store files,
157 # subdirectories, and other directories.
160 # subdirectories, and other directories.
158 if not fs.exists(self._directory):
161 if not fs.exists(self._directory):
159 fs.makedirs(self._directory, exist_ok=True)
162 fs.makedirs(self._directory, exist_ok=True)
160
163
161 self._shards = tuple(
164 self._shards = tuple(
162 self.shard_cls(
165 self.shard_cls(
163 index=num,
166 index=num,
164 directory=directory,
167 directory=directory,
165 directory_folder=self.shard_name % num,
168 directory_folder=self.shard_name.format(num),
166 fs=fs,
169 fs=fs,
167 **settings,
170 **settings,
168 )
171 )
169 for num in range(self._shard_count)
172 for num in range(self._shard_count)
170 )
173 )
171 self._hash = self._shards[0].hash
174 self._hash = self._shards[0].hash
172
175
173 def _get_size(self, shard, archive_path):
176 def _get_size(self, shard, archive_path):
174 return os.stat(archive_path).st_size
177 return os.stat(archive_path).st_size
@@ -1,164 +1,170 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23 import typing
23
24
24 import fsspec
25 import fsspec
25
26
26 from .base import BaseCache, BaseShard
27 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
29 from ...type_utils import str2bool
29
30
30 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
31
32
32
33
33 class S3Shard(BaseShard):
34 class S3Shard(BaseShard):
34
35
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 self._index = index
37 self._index: int = index
37 self._bucket_folder = bucket_folder
38 self._bucket_folder: str = bucket_folder
38 self.storage_type = 'bucket'
39 self.storage_type: str = 'bucket'
39 self._bucket_main = bucket
40 self._bucket_main: str = bucket
40
41
41 self.fs = fs
42 self.fs = fs
42
43
43 @property
44 @property
44 def bucket(self):
45 def bucket(self) -> str:
45 """Cache bucket final path."""
46 """Cache bucket final path."""
46 return os.path.join(self._bucket_main, self._bucket_folder)
47 return os.path.join(self._bucket_main, self._bucket_folder)
47
48
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}-{self.key_suffix}'
50 key_file: str = f'{archive_key}-{self.key_suffix}'
50 return key_file, os.path.join(self.bucket, key_file)
51 return key_file, os.path.join(self.bucket, key_file)
51
52
52 def _get_writer(self, path, mode):
53 def _get_writer(self, path, mode):
53 return self.fs.open(path, 'wb')
54 return self.fs.open(path, 'wb')
54
55
55 def _write_file(self, full_path, iterator, mode):
56 def _write_file(self, full_path, iterator, mode):
56
57
57 # ensure folder in bucket exists
58 # ensure folder in bucket exists
58 destination = self.bucket
59 destination = self.bucket
59 if not self.fs.exists(destination):
60 if not self.fs.exists(destination):
60 self.fs.mkdir(destination, s3_additional_kwargs={})
61 self.fs.mkdir(destination, s3_additional_kwargs={})
61
62
62 writer = self._get_writer(full_path, mode)
63 writer = self._get_writer(full_path, mode)
63
64
64 digest = hashlib.sha256()
65 digest = hashlib.sha256()
65 with writer:
66 with writer:
66 size = 0
67 size = 0
67 for chunk in iterator:
68 for chunk in iterator:
68 size += len(chunk)
69 size += len(chunk)
69 digest.update(chunk)
70 digest.update(chunk)
70 writer.write(chunk)
71 writer.write(chunk)
71
72
72 sha256 = digest.hexdigest()
73 sha256 = digest.hexdigest()
73 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
74 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
74 return size, sha256
75 return size, sha256
75
76
76 def store(self, key, value_reader, metadata: dict | None = None):
77 def store(self, key, value_reader, metadata: dict | None = None):
77 return self._store(key, value_reader, metadata, mode='wb')
78 return self._store(key, value_reader, metadata, mode='wb')
78
79
79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
80 def fetch(self, key, retry=NOT_GIVEN,
80 return self._fetch(key, retry, retry_attempts, retry_backoff)
81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
81
84
82 def remove(self, key):
85 def remove(self, key):
83 return self._remove(key)
86 return self._remove(key)
84
87
85 def random_filename(self):
88 def random_filename(self):
86 """Return filename and full-path tuple for file storage.
89 """Return filename and full-path tuple for file storage.
87
90
88 Filename will be a randomly generated 28 character hexadecimal string
91 Filename will be a randomly generated 28 character hexadecimal string
89 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
92 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
90 reduce the size of directories. On older filesystems, lookups in
93 reduce the size of directories. On older filesystems, lookups in
91 directories with many files may be slow.
94 directories with many files may be slow.
92 """
95 """
93
96
94 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
97 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
95
98
96 archive_name = hex_name[4:] + '.archive_cache'
99 archive_name = hex_name[4:] + '.archive_cache'
97 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
100 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
98
101
99 full_path = os.path.join(self.bucket, filename)
102 full_path = os.path.join(self.bucket, filename)
100 return archive_name, full_path
103 return archive_name, full_path
101
104
102 def __repr__(self):
105 def __repr__(self):
103 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
106 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
104
107
105
108
106 class ObjectStoreCache(BaseCache):
109 class ObjectStoreCache(BaseCache):
107 shard_name = 'shard-%03d'
110 shard_name: str = 'shard-{:03d}'
108 shard_cls = S3Shard
111 shard_cls = S3Shard
109
112
110 def __init__(self, locking_url, **settings):
113 def __init__(self, locking_url, **settings):
111 """
114 """
112 Initialize objectstore cache instance.
115 Initialize objectstore cache instance.
113
116
114 :param str locking_url: redis url for a lock
117 :param str locking_url: redis url for a lock
115 :param settings: settings dict
118 :param settings: settings dict
116
119
117 """
120 """
118 self._locking_url = locking_url
121 self._locking_url = locking_url
119 self._config = settings
122 self._config = settings
120
123
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
124 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 self._storage_path = objectstore_url # common path for all from BaseCache
125 self._storage_path = objectstore_url # common path for all from BaseCache
123
126
124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
127 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 if self._shard_count < 1:
128 if self._shard_count < 1:
126 raise ValueError('cache_shards must be 1 or more')
129 raise ValueError('cache_shards must be 1 or more')
127
130
128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
131 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 if not self._bucket:
132 if not self._bucket:
130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
133 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
131
134
132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
135 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
136 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
134
137
135 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
138 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
139 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
140 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
138
141
139 endpoint_url = settings.pop('archive_cache.objectstore.url')
142 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 key = settings.pop('archive_cache.objectstore.key')
143 key = settings.pop('archive_cache.objectstore.key')
141 secret = settings.pop('archive_cache.objectstore.secret')
144 secret = settings.pop('archive_cache.objectstore.secret')
142
145
143 log.debug('Initializing %s archival cache instance', self)
146 log.debug('Initializing %s archival cache instance', self)
144
147
145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
148 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146
149
147 # init main bucket
150 # init main bucket
148 if not fs.exists(self._bucket):
151 if not fs.exists(self._bucket):
149 fs.mkdir(self._bucket)
152 fs.mkdir(self._bucket)
150
153
151 self._shards = tuple(
154 self._shards = tuple(
152 self.shard_cls(
155 self.shard_cls(
153 index=num,
156 index=num,
154 bucket=self._bucket,
157 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
158 bucket_folder=self.shard_name.format(num),
156 fs=fs,
159 fs=fs,
157 **settings,
160 **settings,
158 )
161 )
159 for num in range(self._shard_count)
162 for num in range(self._shard_count)
160 )
163 )
161 self._hash = self._shards[0].hash
164 self._hash = self._shards[0].hash
162
165
163 def _get_size(self, shard, archive_path):
166 def _get_size(self, shard, archive_path):
164 return shard.fs.info(archive_path)['size']
167 return shard.fs.info(archive_path)['size']
168
169 def set_presigned_url_expiry(self, val: int) -> None:
170 self.presigned_url_expires = val
General Comments 0
You need to be logged in to leave comments. Login now