##// END OF EJS Templates
feat(archive-cache): added presigned_url support for objectstore
super-admin -
r5450:6c0bdadc default
parent child Browse files
Show More
@@ -1,355 +1,372 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import os
20 20 import functools
21 21 import logging
22 22 import typing
23 23 import time
24 24 import zlib
25 25
26 26 from ...ext_json import json
27 27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 28 from ..lock import GenerationLock
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class BaseShard:
34 34 storage_type: str = ''
35 35 fs = None
36 36
37 37 @classmethod
38 38 def hash(cls, key):
39 39 """Compute portable hash for `key`.
40 40
41 41 :param key: key to hash
42 42 :return: hash value
43 43
44 44 """
45 45 mask = 0xFFFFFFFF
46 46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47 47
48 48 def _write_file(self, full_path, read_iterator, mode):
49 49 raise NotImplementedError
50 50
51 51 def _get_keyfile(self, key):
52 52 raise NotImplementedError
53 53
54 54 def random_filename(self):
55 55 raise NotImplementedError
56 56
57 def store(self, *args, **kwargs):
58 raise NotImplementedError
59
57 60 def _store(self, key, value_reader, metadata, mode):
58 61 (filename, # hash-name
59 62 full_path # full-path/hash-name
60 63 ) = self.random_filename()
61 64
62 65 key_file, key_file_path = self._get_keyfile(key)
63 66
64 67 # STORE METADATA
65 68 _metadata = {
66 69 "version": "v1",
67 70
68 71 "key_file": key_file, # this is the .key.json file storing meta
69 72 "key_file_path": key_file_path, # full path to key_file
70 73 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 74 "archive_filename": filename, # the actual filename we stored that file under
72 75 "archive_full_path": full_path,
73 76
74 77 "store_time": time.time(),
75 78 "access_count": 0,
76 79 "access_time": 0,
77 80
78 81 "size": 0
79 82 }
80 83 if metadata:
81 84 _metadata.update(metadata)
82 85
83 86 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 87 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 88 _metadata['size'] = size
86 89 _metadata['sha256'] = sha256
87 90
88 91 # after archive is finished, we create a key to save the presence of the binary file
89 92 with self.fs.open(key_file_path, 'wb') as f:
90 93 f.write(json.dumps(_metadata))
91 94
92 95 return key, filename, size, _metadata
93 96
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
97 def fetch(self, *args, **kwargs):
98 raise NotImplementedError
99
100 def _fetch(self, key, retry, retry_attempts, retry_backoff,
101 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
95 102 if retry is NOT_GIVEN:
96 103 retry = False
97 104 if retry_attempts is NOT_GIVEN:
98 105 retry_attempts = 0
99 106
100 107 if retry and retry_attempts > 0:
101 108 for attempt in range(1, retry_attempts + 1):
102 109 if key in self:
103 110 break
104 111 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 112 time.sleep(retry_backoff)
106 113
107 114 if key not in self:
108 115 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 116 raise KeyError(key)
110 117
111 118 key_file, key_file_path = self._get_keyfile(key)
112 119 with self.fs.open(key_file_path, 'rb') as f:
113 120 metadata = json.loads(f.read())
114 121
115 122 archive_path = metadata['archive_full_path']
123 if presigned_url_expires and presigned_url_expires > 0:
124 metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires)
116 125
117 126 try:
118 127 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 128 finally:
120 129 # update usage stats, count and accessed
121 130 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 131 metadata["access_time"] = time.time()
123 132 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 133 key_file, metadata['access_count'], metadata['access_time'])
125 134 with self.fs.open(key_file_path, 'wb') as f:
126 135 f.write(json.dumps(metadata))
127 136
137 def remove(self, *args, **kwargs):
138 raise NotImplementedError
139
128 140 def _remove(self, key):
129 141 if key not in self:
130 142 log.exception(f'requested key={key} not found in {self}')
131 143 raise KeyError(key)
132 144
133 145 key_file, key_file_path = self._get_keyfile(key)
134 146 with self.fs.open(key_file_path, 'rb') as f:
135 147 metadata = json.loads(f.read())
136 148
137 149 archive_path = metadata['archive_full_path']
138 150 self.fs.rm(archive_path)
139 151 self.fs.rm(key_file_path)
140 152 return 1
141 153
142 154 @property
143 155 def storage_medium(self):
144 156 return getattr(self, self.storage_type)
145 157
146 158 @property
147 159 def key_suffix(self):
148 160 return 'key.json'
149 161
150 162 def __contains__(self, key):
151 163 """Return `True` if `key` matching item is found in cache.
152 164
153 165 :param key: key matching item
154 166 :return: True if key matching item
155 167
156 168 """
157 169 key_file, key_file_path = self._get_keyfile(key)
158 170 return self.fs.exists(key_file_path)
159 171
160 172
161 173 class BaseCache:
162 174 _locking_url: str = ''
163 175 _storage_path: str = ''
164 _config = {}
176 _config: dict = {}
165 177 retry = False
166 retry_attempts = 0
167 retry_backoff = 1
178 retry_attempts: int = 0
179 retry_backoff: int | float = 1
168 180 _shards = tuple()
169 181 shard_cls = BaseShard
182 # define the presigned url expiration, 0 == disabled
183 presigned_url_expires: int = 0
170 184
171 185 def __contains__(self, key):
172 186 """Return `True` if `key` matching item is found in cache.
173 187
174 188 :param key: key matching item
175 189 :return: True if key matching item
176 190
177 191 """
178 192 return self.has_key(key)
179 193
180 194 def __repr__(self):
181 195 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
182 196
183 197 @classmethod
184 198 def gb_to_bytes(cls, gb):
185 199 return gb * (1024 ** 3)
186 200
187 201 @property
188 202 def storage_path(self):
189 203 return self._storage_path
190 204
191 205 @classmethod
192 206 def get_stats_db(cls):
193 207 return StatsDB()
194 208
195 209 def get_conf(self, key, pop=False):
196 210 if key not in self._config:
197 211 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
198 212 val = self._config[key]
199 213 if pop:
200 214 del self._config[key]
201 215 return val
202 216
203 217 def _get_shard(self, key) -> shard_cls:
204 218 index = self._hash(key) % self._shard_count
205 219 shard = self._shards[index]
206 220 return shard
207 221
208 222 def _get_size(self, shard, archive_path):
209 223 raise NotImplementedError
210 224
211 225 def store(self, key, value_reader, metadata=None):
212 226 shard = self._get_shard(key)
213 227 return shard.store(key, value_reader, metadata)
214 228
215 229 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
216 230 """
217 231 Return file handle corresponding to `key` from specific shard cache.
218 232 """
219 233 if retry is NOT_GIVEN:
220 234 retry = self.retry
221 235 if retry_attempts is NOT_GIVEN:
222 236 retry_attempts = self.retry_attempts
223 237 retry_backoff = self.retry_backoff
238 presigned_url_expires = self.presigned_url_expires
224 239
225 240 shard = self._get_shard(key)
226 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
241 return shard.fetch(key, retry=retry,
242 retry_attempts=retry_attempts,
243 retry_backoff=retry_backoff,
244 presigned_url_expires=presigned_url_expires)
227 245
228 246 def remove(self, key):
229 247 shard = self._get_shard(key)
230 248 return shard.remove(key)
231 249
232 250 def has_key(self, archive_key):
233 251 """Return `True` if `key` matching item is found in cache.
234 252
235 253 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
236 254 :return: True if key is found
237 255
238 256 """
239 257 shard = self._get_shard(archive_key)
240 258 return archive_key in shard
241 259
242 260 def iter_keys(self):
243 261 for shard in self._shards:
244 262 if shard.fs.exists(shard.storage_medium):
245 263 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
246 264 for key_file_path in _files:
247 265 if key_file_path.endswith(shard.key_suffix):
248 266 yield shard, key_file_path
249 267
250 268 def get_lock(self, lock_key):
251 269 return GenerationLock(lock_key, self._locking_url)
252 270
253 271 def evict(self, policy=None, size_limit=None) -> dict:
254 272 """
255 273 Remove old items based on the conditions
256 274
257 275
258 276 explanation of this algo:
259 277 iterate over each shard, then for each shard iterate over the .key files
260 278 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
261 279 access data, time creation, and access counts.
262 280
263 281 Store that into a memory DB in order we can run different sorting strategies easily.
264 282 Summing the size is a sum sql query.
265 283
266 284 Then we run a sorting strategy based on eviction policy.
267 285 We iterate over sorted keys, and remove each checking if we hit the overall limit.
268 286 """
269 287 removal_info = {
270 288 "removed_items": 0,
271 289 "removed_size": 0
272 290 }
273 291 policy = policy or self._eviction_policy
274 292 size_limit = size_limit or self._cache_size_limit
275 293
276 294 select_policy = EVICTION_POLICY[policy]['evict']
277 295
278 296 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
279 297 policy, format_size(size_limit))
280 298
281 299 if select_policy is None:
282 300 return removal_info
283 301
284 302 db = self.get_stats_db()
285 303
286 304 data = []
287 305 cnt = 1
288 306
289 307 for shard, key_file in self.iter_keys():
290 308 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
291 309 metadata = json.loads(f.read())
292 310
293 311 key_file_path = os.path.join(shard.storage_medium, key_file)
294 312
295 313 archive_key = metadata['archive_key']
296 314 archive_path = metadata['archive_full_path']
297 315
298 316 size = metadata.get('size')
299 317 if not size:
300 318 # in case we don't have size re-calc it...
301 319 size = self._get_size(shard, archive_path)
302 320
303 321 data.append([
304 322 cnt,
305 323 key_file,
306 324 key_file_path,
307 325 archive_key,
308 326 archive_path,
309 327 metadata.get('store_time', 0),
310 328 metadata.get('access_time', 0),
311 329 metadata.get('access_count', 0),
312 330 size,
313 331 ])
314 332 cnt += 1
315 333
316 334 # Insert bulk data using executemany
317 335 db.bulk_insert(data)
318 336
319 337 total_size = db.get_total_size()
320 338 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
321 339 len(data), format_size(total_size), format_size(size_limit))
322 340
323 341 removed_items = 0
324 342 removed_size = 0
325 343 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
326 344 # simulate removal impact BEFORE removal
327 345 total_size -= size
328 346
329 347 if total_size <= size_limit:
330 348 # we obtained what we wanted...
331 349 break
332 350
333 351 self.remove(archive_key)
334 352 removed_items += 1
335 353 removed_size += size
336 354 removal_info['removed_items'] = removed_items
337 355 removal_info['removed_size'] = removed_size
338 356 log.debug('Removed %s cache archives, and reduced size by: %s',
339 357 removed_items, format_size(removed_size))
340 358 return removal_info
341 359
342 360 def get_statistics(self):
343 361 total_files = 0
344 362 total_size = 0
345 363 meta = {}
346 364
347 365 for shard, key_file in self.iter_keys():
348 366 json_key = f"{shard.storage_medium}/{key_file}"
349 367 with shard.fs.open(json_key, 'rb') as f:
350 368 total_files += 1
351 369 metadata = json.loads(f.read())
352 370 total_size += metadata['size']
353 371
354 372 return total_files, total_size, meta
355
@@ -1,174 +1,177 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 import typing
23 24
24 25 import fsspec
25 26
26 27 from .base import BaseCache, BaseShard
27 28 from ..utils import ShardFileReader, NOT_GIVEN
28 29 from ...type_utils import str2bool
29 30
30 31 log = logging.getLogger(__name__)
31 32
32 33
33 34 class FileSystemShard(BaseShard):
34 35
35 36 def __init__(self, index, directory, directory_folder, fs, **settings):
36 self._index = index
37 self._directory = directory
38 self._directory_folder = directory_folder
39 self.storage_type = 'directory'
37 self._index: int = index
38 self._directory: str = directory
39 self._directory_folder: str = directory_folder
40 self.storage_type: str = 'directory'
40 41
41 42 self.fs = fs
42 43
43 44 @property
44 def directory(self):
45 def directory(self) -> str:
45 46 """Cache directory final path."""
46 47 return os.path.join(self._directory, self._directory_folder)
47 48
48 49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}.{self.key_suffix}'
50 key_file: str = f'{archive_key}.{self.key_suffix}'
50 51 return key_file, os.path.join(self.directory, key_file)
51 52
52 53 def _get_writer(self, path, mode):
53 54 for count in range(1, 11):
54 55 try:
55 56 # Another cache may have deleted the directory before
56 57 # the file could be opened.
57 58 return self.fs.open(path, mode)
58 59 except OSError:
59 60 if count == 10:
60 61 # Give up after 10 tries to open the file.
61 62 raise
62 63 continue
63 64
64 65 def _write_file(self, full_path, iterator, mode):
66
65 67 # ensure dir exists
66 68 destination, _ = os.path.split(full_path)
67 69 if not self.fs.exists(destination):
68 70 self.fs.makedirs(destination)
69 71
70 72 writer = self._get_writer(full_path, mode)
71 73
72 74 digest = hashlib.sha256()
73 75 with writer:
74 76 size = 0
75 77 for chunk in iterator:
76 78 size += len(chunk)
77 79 digest.update(chunk)
78 80 writer.write(chunk)
79 81 writer.flush()
80 82 # Get the file descriptor
81 83 fd = writer.fileno()
82 84
83 85 # Sync the file descriptor to disk, helps with NFS cases...
84 86 os.fsync(fd)
85 87 sha256 = digest.hexdigest()
86 88 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
87 89 return size, sha256
88 90
89 91 def store(self, key, value_reader, metadata: dict | None = None):
90 92 return self._store(key, value_reader, metadata, mode='xb')
91 93
92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
94 def fetch(self, key, retry=NOT_GIVEN,
95 retry_attempts=NOT_GIVEN, retry_backoff=1, **kwargs) -> tuple[ShardFileReader, dict]:
93 96 return self._fetch(key, retry, retry_attempts, retry_backoff)
94 97
95 98 def remove(self, key):
96 99 return self._remove(key)
97 100
98 101 def random_filename(self):
99 102 """Return filename and full-path tuple for file storage.
100 103
101 104 Filename will be a randomly generated 28 character hexadecimal string
102 105 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
103 106 reduce the size of directories. On older filesystems, lookups in
104 107 directories with many files may be slow.
105 108 """
106 109
107 110 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
108 111
109 112 archive_name = hex_name[4:] + '.archive_cache'
110 113 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
111 114
112 115 full_path = os.path.join(self.directory, filename)
113 116 return archive_name, full_path
114 117
115 118 def __repr__(self):
116 119 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
117 120
118 121
119 122 class FileSystemFanoutCache(BaseCache):
120 shard_name = 'shard_%03d'
123 shard_name: str = 'shard_{:03d}'
121 124 shard_cls = FileSystemShard
122 125
123 126 def __init__(self, locking_url, **settings):
124 127 """
125 128 Initialize file system cache instance.
126 129
127 130 :param str locking_url: redis url for a lock
128 131 :param settings: settings dict
129 132
130 133 """
131 134 self._locking_url = locking_url
132 135 self._config = settings
133 136 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
134 137 directory = str(cache_dir)
135 138 directory = os.path.expanduser(directory)
136 139 directory = os.path.expandvars(directory)
137 140 self._directory = directory
138 141 self._storage_path = directory # common path for all from BaseCache
139 142
140 143 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 144 if self._shard_count < 1:
142 145 raise ValueError('cache_shards must be 1 or more')
143 146
144 147 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
145 148 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
146 149
147 150 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
148 151 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
149 152 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
150 153
151 154 log.debug('Initializing %s archival cache instance', self)
152 155 fs = fsspec.filesystem('file')
153 156 # check if it's ok to write, and re-create the archive cache main dir
154 157 # A directory is the virtual equivalent of a physical file cabinet.
155 158 # In other words, it's a container for organizing digital data.
156 159 # Unlike a folder, which can only store files, a directory can store files,
157 160 # subdirectories, and other directories.
158 161 if not fs.exists(self._directory):
159 162 fs.makedirs(self._directory, exist_ok=True)
160 163
161 164 self._shards = tuple(
162 165 self.shard_cls(
163 166 index=num,
164 167 directory=directory,
165 directory_folder=self.shard_name % num,
168 directory_folder=self.shard_name.format(num),
166 169 fs=fs,
167 170 **settings,
168 171 )
169 172 for num in range(self._shard_count)
170 173 )
171 174 self._hash = self._shards[0].hash
172 175
173 176 def _get_size(self, shard, archive_path):
174 177 return os.stat(archive_path).st_size
@@ -1,164 +1,170 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 import typing
23 24
24 25 import fsspec
25 26
26 27 from .base import BaseCache, BaseShard
27 28 from ..utils import ShardFileReader, NOT_GIVEN
28 29 from ...type_utils import str2bool
29 30
30 31 log = logging.getLogger(__name__)
31 32
32 33
33 34 class S3Shard(BaseShard):
34 35
35 36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 self._index = index
37 self._bucket_folder = bucket_folder
38 self.storage_type = 'bucket'
39 self._bucket_main = bucket
37 self._index: int = index
38 self._bucket_folder: str = bucket_folder
39 self.storage_type: str = 'bucket'
40 self._bucket_main: str = bucket
40 41
41 42 self.fs = fs
42 43
43 44 @property
44 def bucket(self):
45 def bucket(self) -> str:
45 46 """Cache bucket final path."""
46 47 return os.path.join(self._bucket_main, self._bucket_folder)
47 48
48 49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}-{self.key_suffix}'
50 key_file: str = f'{archive_key}-{self.key_suffix}'
50 51 return key_file, os.path.join(self.bucket, key_file)
51 52
52 53 def _get_writer(self, path, mode):
53 54 return self.fs.open(path, 'wb')
54 55
55 56 def _write_file(self, full_path, iterator, mode):
56 57
57 58 # ensure folder in bucket exists
58 59 destination = self.bucket
59 60 if not self.fs.exists(destination):
60 61 self.fs.mkdir(destination, s3_additional_kwargs={})
61 62
62 63 writer = self._get_writer(full_path, mode)
63 64
64 65 digest = hashlib.sha256()
65 66 with writer:
66 67 size = 0
67 68 for chunk in iterator:
68 69 size += len(chunk)
69 70 digest.update(chunk)
70 71 writer.write(chunk)
71 72
72 73 sha256 = digest.hexdigest()
73 74 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
74 75 return size, sha256
75 76
76 77 def store(self, key, value_reader, metadata: dict | None = None):
77 78 return self._store(key, value_reader, metadata, mode='wb')
78 79
79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
80 return self._fetch(key, retry, retry_attempts, retry_backoff)
80 def fetch(self, key, retry=NOT_GIVEN,
81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
81 84
82 85 def remove(self, key):
83 86 return self._remove(key)
84 87
85 88 def random_filename(self):
86 89 """Return filename and full-path tuple for file storage.
87 90
88 91 Filename will be a randomly generated 28 character hexadecimal string
89 92 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
90 93 reduce the size of directories. On older filesystems, lookups in
91 94 directories with many files may be slow.
92 95 """
93 96
94 97 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
95 98
96 99 archive_name = hex_name[4:] + '.archive_cache'
97 100 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
98 101
99 102 full_path = os.path.join(self.bucket, filename)
100 103 return archive_name, full_path
101 104
102 105 def __repr__(self):
103 106 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
104 107
105 108
106 109 class ObjectStoreCache(BaseCache):
107 shard_name = 'shard-%03d'
110 shard_name: str = 'shard-{:03d}'
108 111 shard_cls = S3Shard
109 112
110 113 def __init__(self, locking_url, **settings):
111 114 """
112 115 Initialize objectstore cache instance.
113 116
114 117 :param str locking_url: redis url for a lock
115 118 :param settings: settings dict
116 119
117 120 """
118 121 self._locking_url = locking_url
119 122 self._config = settings
120 123
121 124 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 125 self._storage_path = objectstore_url # common path for all from BaseCache
123 126
124 127 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 128 if self._shard_count < 1:
126 129 raise ValueError('cache_shards must be 1 or more')
127 130
128 131 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 132 if not self._bucket:
130 133 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
131 134
132 135 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
133 136 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
134 137
135 138 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
136 139 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
137 140 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
138 141
139 142 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 143 key = settings.pop('archive_cache.objectstore.key')
141 144 secret = settings.pop('archive_cache.objectstore.secret')
142 145
143 146 log.debug('Initializing %s archival cache instance', self)
144 147
145 148 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146 149
147 150 # init main bucket
148 151 if not fs.exists(self._bucket):
149 152 fs.mkdir(self._bucket)
150 153
151 154 self._shards = tuple(
152 155 self.shard_cls(
153 156 index=num,
154 157 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
158 bucket_folder=self.shard_name.format(num),
156 159 fs=fs,
157 160 **settings,
158 161 )
159 162 for num in range(self._shard_count)
160 163 )
161 164 self._hash = self._shards[0].hash
162 165
163 166 def _get_size(self, shard, archive_path):
164 167 return shard.fs.info(archive_path)['size']
168
169 def set_presigned_url_expiry(self, val: int) -> None:
170 self.presigned_url_expires = val
General Comments 0
You need to be logged in to leave comments. Login now