##// END OF EJS Templates
archive-cache: synced with CE codebase
super-admin -
r1257:60bf3236 default
parent child Browse files
Show More
@@ -1,352 +1,355 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import os
20 20 import functools
21 21 import logging
22 22 import typing
23 23 import time
24 24 import zlib
25 25
26 26 from ...ext_json import json
27 27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 28 from ..lock import GenerationLock
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class BaseShard:
34 34 storage_type: str = ''
35 35 fs = None
36 36
37 37 @classmethod
38 38 def hash(cls, key):
39 39 """Compute portable hash for `key`.
40 40
41 41 :param key: key to hash
42 42 :return: hash value
43 43
44 44 """
45 45 mask = 0xFFFFFFFF
46 46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47 47
48 48 def _write_file(self, full_path, read_iterator, mode):
49 49 raise NotImplementedError
50 50
51 51 def _get_keyfile(self, key):
52 52 raise NotImplementedError
53 53
54 54 def random_filename(self):
55 55 raise NotImplementedError
56 56
57 57 def _store(self, key, value_reader, metadata, mode):
58 58 (filename, # hash-name
59 59 full_path # full-path/hash-name
60 60 ) = self.random_filename()
61 61
62 62 key_file, key_file_path = self._get_keyfile(key)
63 63
64 64 # STORE METADATA
65 65 _metadata = {
66 66 "version": "v1",
67 67
68 68 "key_file": key_file, # this is the .key.json file storing meta
69 69 "key_file_path": key_file_path, # full path to key_file
70 70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 71 "archive_filename": filename, # the actual filename we stored that file under
72 72 "archive_full_path": full_path,
73 73
74 74 "store_time": time.time(),
75 75 "access_count": 0,
76 76 "access_time": 0,
77 77
78 78 "size": 0
79 79 }
80 80 if metadata:
81 81 _metadata.update(metadata)
82 82
83 83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 84 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 85 _metadata['size'] = size
86 86 _metadata['sha256'] = sha256
87 87
88 88 # after archive is finished, we create a key to save the presence of the binary file
89 89 with self.fs.open(key_file_path, 'wb') as f:
90 90 f.write(json.dumps(_metadata))
91 91
92 92 return key, filename, size, _metadata
93 93
94 94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
95 95 if retry is NOT_GIVEN:
96 96 retry = False
97 97 if retry_attempts is NOT_GIVEN:
98 98 retry_attempts = 0
99 99
100 100 if retry and retry_attempts > 0:
101 101 for attempt in range(1, retry_attempts + 1):
102 102 if key in self:
103 103 break
104 104 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 105 time.sleep(retry_backoff)
106 106
107 107 if key not in self:
108 108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 109 raise KeyError(key)
110 110
111 111 key_file, key_file_path = self._get_keyfile(key)
112 112 with self.fs.open(key_file_path, 'rb') as f:
113 113 metadata = json.loads(f.read())
114 114
115 115 archive_path = metadata['archive_full_path']
116 116
117 117 try:
118 118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 119 finally:
120 120 # update usage stats, count and accessed
121 121 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 122 metadata["access_time"] = time.time()
123 123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 124 key_file, metadata['access_count'], metadata['access_time'])
125 125 with self.fs.open(key_file_path, 'wb') as f:
126 126 f.write(json.dumps(metadata))
127 127
128 128 def _remove(self, key):
129 129 if key not in self:
130 130 log.exception(f'requested key={key} not found in {self}')
131 131 raise KeyError(key)
132 132
133 133 key_file, key_file_path = self._get_keyfile(key)
134 134 with self.fs.open(key_file_path, 'rb') as f:
135 135 metadata = json.loads(f.read())
136 136
137 137 archive_path = metadata['archive_full_path']
138 138 self.fs.rm(archive_path)
139 139 self.fs.rm(key_file_path)
140 140 return 1
141 141
142 142 @property
143 143 def storage_medium(self):
144 144 return getattr(self, self.storage_type)
145 145
146 146 @property
147 147 def key_suffix(self):
148 148 return 'key.json'
149 149
150 150 def __contains__(self, key):
151 151 """Return `True` if `key` matching item is found in cache.
152 152
153 153 :param key: key matching item
154 154 :return: True if key matching item
155 155
156 156 """
157 157 key_file, key_file_path = self._get_keyfile(key)
158 158 return self.fs.exists(key_file_path)
159 159
160 160
161 161 class BaseCache:
162 162 _locking_url: str = ''
163 163 _storage_path: str = ''
164 164 _config = {}
165 165 retry = False
166 166 retry_attempts = 0
167 167 retry_backoff = 1
168 168 _shards = tuple()
169 shard_cls = BaseShard
169 170
170 171 def __contains__(self, key):
171 172 """Return `True` if `key` matching item is found in cache.
172 173
173 174 :param key: key matching item
174 175 :return: True if key matching item
175 176
176 177 """
177 178 return self.has_key(key)
178 179
179 180 def __repr__(self):
180 181 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181 182
182 183 @classmethod
183 184 def gb_to_bytes(cls, gb):
184 185 return gb * (1024 ** 3)
185 186
186 187 @property
187 188 def storage_path(self):
188 189 return self._storage_path
189 190
190 191 @classmethod
191 192 def get_stats_db(cls):
192 193 return StatsDB()
193 194
194 195 def get_conf(self, key, pop=False):
195 196 if key not in self._config:
196 197 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 198 val = self._config[key]
198 199 if pop:
199 200 del self._config[key]
200 201 return val
201 202
202 def _get_shard(self, key):
203 raise NotImplementedError
203 def _get_shard(self, key) -> shard_cls:
204 index = self._hash(key) % self._shard_count
205 shard = self._shards[index]
206 return shard
204 207
205 208 def _get_size(self, shard, archive_path):
206 209 raise NotImplementedError
207 210
208 211 def store(self, key, value_reader, metadata=None):
209 212 shard = self._get_shard(key)
210 213 return shard.store(key, value_reader, metadata)
211 214
212 215 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
213 216 """
214 217 Return file handle corresponding to `key` from specific shard cache.
215 218 """
216 219 if retry is NOT_GIVEN:
217 220 retry = self.retry
218 221 if retry_attempts is NOT_GIVEN:
219 222 retry_attempts = self.retry_attempts
220 223 retry_backoff = self.retry_backoff
221 224
222 225 shard = self._get_shard(key)
223 226 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
224 227
225 228 def remove(self, key):
226 229 shard = self._get_shard(key)
227 230 return shard.remove(key)
228 231
229 232 def has_key(self, archive_key):
230 233 """Return `True` if `key` matching item is found in cache.
231 234
232 235 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
233 236 :return: True if key is found
234 237
235 238 """
236 239 shard = self._get_shard(archive_key)
237 240 return archive_key in shard
238 241
239 242 def iter_keys(self):
240 243 for shard in self._shards:
241 244 if shard.fs.exists(shard.storage_medium):
242 245 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
243 246 for key_file_path in _files:
244 247 if key_file_path.endswith(shard.key_suffix):
245 248 yield shard, key_file_path
246 249
247 250 def get_lock(self, lock_key):
248 251 return GenerationLock(lock_key, self._locking_url)
249 252
250 253 def evict(self, policy=None, size_limit=None) -> dict:
251 254 """
252 255 Remove old items based on the conditions
253 256
254 257
255 258 explanation of this algo:
256 259 iterate over each shard, then for each shard iterate over the .key files
257 260 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
258 261 access data, time creation, and access counts.
259 262
260 263 Store that into a memory DB in order we can run different sorting strategies easily.
261 264 Summing the size is a sum sql query.
262 265
263 266 Then we run a sorting strategy based on eviction policy.
264 267 We iterate over sorted keys, and remove each checking if we hit the overall limit.
265 268 """
266 269 removal_info = {
267 270 "removed_items": 0,
268 271 "removed_size": 0
269 272 }
270 273 policy = policy or self._eviction_policy
271 274 size_limit = size_limit or self._cache_size_limit
272 275
273 276 select_policy = EVICTION_POLICY[policy]['evict']
274 277
275 278 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
276 279 policy, format_size(size_limit))
277 280
278 281 if select_policy is None:
279 282 return removal_info
280 283
281 284 db = self.get_stats_db()
282 285
283 286 data = []
284 287 cnt = 1
285 288
286 289 for shard, key_file in self.iter_keys():
287 290 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
288 291 metadata = json.loads(f.read())
289 292
290 293 key_file_path = os.path.join(shard.storage_medium, key_file)
291 294
292 295 archive_key = metadata['archive_key']
293 296 archive_path = metadata['archive_full_path']
294 297
295 298 size = metadata.get('size')
296 299 if not size:
297 300 # in case we don't have size re-calc it...
298 301 size = self._get_size(shard, archive_path)
299 302
300 303 data.append([
301 304 cnt,
302 305 key_file,
303 306 key_file_path,
304 307 archive_key,
305 308 archive_path,
306 309 metadata.get('store_time', 0),
307 310 metadata.get('access_time', 0),
308 311 metadata.get('access_count', 0),
309 312 size,
310 313 ])
311 314 cnt += 1
312 315
313 316 # Insert bulk data using executemany
314 317 db.bulk_insert(data)
315 318
316 319 total_size = db.get_total_size()
317 320 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
318 321 len(data), format_size(total_size), format_size(size_limit))
319 322
320 323 removed_items = 0
321 324 removed_size = 0
322 325 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
323 326 # simulate removal impact BEFORE removal
324 327 total_size -= size
325 328
326 329 if total_size <= size_limit:
327 330 # we obtained what we wanted...
328 331 break
329 332
330 333 self.remove(archive_key)
331 334 removed_items += 1
332 335 removed_size += size
333 336 removal_info['removed_items'] = removed_items
334 337 removal_info['removed_size'] = removed_size
335 338 log.debug('Removed %s cache archives, and reduced size by: %s',
336 339 removed_items, format_size(removed_size))
337 340 return removal_info
338 341
339 342 def get_statistics(self):
340 343 total_files = 0
341 344 total_size = 0
342 345 meta = {}
343 346
344 347 for shard, key_file in self.iter_keys():
345 348 json_key = f"{shard.storage_medium}/{key_file}"
346 349 with shard.fs.open(json_key, 'rb') as f:
347 350 total_files += 1
348 351 metadata = json.loads(f.read())
349 352 total_size += metadata['size']
350 353
351 354 return total_files, total_size, meta
352 355
@@ -1,167 +1,174 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23
24 24 import fsspec
25 25
26 26 from .base import BaseCache, BaseShard
27 27 from ..utils import ShardFileReader, NOT_GIVEN
28 28 from ...type_utils import str2bool
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class FileSystemShard(BaseShard):
34 34
35 def __init__(self, index, directory, **settings):
35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 36 self._index = index
37 37 self._directory = directory
38 self._directory_folder = directory_folder
38 39 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
40
41 self.fs = fs
40 42
41 43 @property
42 44 def directory(self):
43 """Cache directory."""
44 return self._directory
45 """Cache directory final path."""
46 return os.path.join(self._directory, self._directory_folder)
45 47
46 48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 49 key_file = f'{archive_key}.{self.key_suffix}'
48 50 return key_file, os.path.join(self.directory, key_file)
49 51
50 52 def _get_writer(self, path, mode):
51 53 for count in range(1, 11):
52 54 try:
53 55 # Another cache may have deleted the directory before
54 56 # the file could be opened.
55 57 return self.fs.open(path, mode)
56 58 except OSError:
57 59 if count == 10:
58 60 # Give up after 10 tries to open the file.
59 61 raise
60 62 continue
61 63
62 64 def _write_file(self, full_path, iterator, mode):
63 65 # ensure dir exists
64 66 destination, _ = os.path.split(full_path)
65 67 if not self.fs.exists(destination):
66 68 self.fs.makedirs(destination)
67 69
68 70 writer = self._get_writer(full_path, mode)
69 71
70 72 digest = hashlib.sha256()
71 73 with writer:
72 74 size = 0
73 75 for chunk in iterator:
74 76 size += len(chunk)
75 77 digest.update(chunk)
76 78 writer.write(chunk)
77 79 writer.flush()
78 80 # Get the file descriptor
79 81 fd = writer.fileno()
80 82
81 83 # Sync the file descriptor to disk, helps with NFS cases...
82 84 os.fsync(fd)
83 85 sha256 = digest.hexdigest()
84 86 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
85 87 return size, sha256
86 88
87 89 def store(self, key, value_reader, metadata: dict | None = None):
88 90 return self._store(key, value_reader, metadata, mode='xb')
89 91
90 92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
91 93 return self._fetch(key, retry, retry_attempts, retry_backoff)
92 94
93 95 def remove(self, key):
94 96 return self._remove(key)
95 97
96 98 def random_filename(self):
97 99 """Return filename and full-path tuple for file storage.
98 100
99 101 Filename will be a randomly generated 28 character hexadecimal string
100 102 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
101 103 reduce the size of directories. On older filesystems, lookups in
102 104 directories with many files may be slow.
103 105 """
104 106
105 107 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
106 108
107 109 archive_name = hex_name[4:] + '.archive_cache'
108 110 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
109 111
110 112 full_path = os.path.join(self.directory, filename)
111 113 return archive_name, full_path
112 114
113 115 def __repr__(self):
114 116 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
115 117
116 118
117 119 class FileSystemFanoutCache(BaseCache):
118 120 shard_name = 'shard_%03d'
121 shard_cls = FileSystemShard
119 122
120 123 def __init__(self, locking_url, **settings):
121 124 """
122 125 Initialize file system cache instance.
123 126
124 127 :param str locking_url: redis url for a lock
125 128 :param settings: settings dict
126 129
127 130 """
128 131 self._locking_url = locking_url
129 132 self._config = settings
130 133 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
131 134 directory = str(cache_dir)
132 135 directory = os.path.expanduser(directory)
133 136 directory = os.path.expandvars(directory)
134 137 self._directory = directory
135 self._storage_path = directory
138 self._storage_path = directory # common path for all from BaseCache
136 139
137 # check if it's ok to write, and re-create the archive cache
138 if not os.path.isdir(self._directory):
139 os.makedirs(self._directory, exist_ok=True)
140
141 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 if self._shard_count < 1:
142 raise ValueError('cache_shards must be 1 or more')
142 143
143 144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
144 145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
145 146
146 147 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
147 148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
148 149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
149 150
150 log.debug('Initializing archival cache instance under %s', self._directory)
151 log.debug('Initializing %s archival cache instance under %s', self)
152 fs = fsspec.filesystem('file')
153 # check if it's ok to write, and re-create the archive cache main dir
154 # A directory is the virtual equivalent of a physical file cabinet.
155 # In other words, it's a container for organizing digital data.
156 # Unlike a folder, which can only store files, a directory can store files,
157 # subdirectories, and other directories.
158 if not fs.exists(self._directory):
159 fs.makedirs(self._directory, exist_ok=True)
160
151 161 self._shards = tuple(
152 FileSystemShard(
162 self.shard_cls(
153 163 index=num,
154 directory=os.path.join(directory, self.shard_name % num),
164 directory=directory,
165 directory_folder=self.shard_name % num,
166 fs=fs,
155 167 **settings,
156 168 )
157 for num in range(self._count)
169 for num in range(self._shard_count)
158 170 )
159 171 self._hash = self._shards[0].hash
160 172
161 def _get_shard(self, key) -> FileSystemShard:
162 index = self._hash(key) % self._count
163 shard = self._shards[index]
164 return shard
165
166 173 def _get_size(self, shard, archive_path):
167 174 return os.stat(archive_path).st_size
@@ -1,158 +1,164 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23
24 24 import fsspec
25 25
26 26 from .base import BaseCache, BaseShard
27 27 from ..utils import ShardFileReader, NOT_GIVEN
28 28 from ...type_utils import str2bool
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class S3Shard(BaseShard):
34 34
35 def __init__(self, index, bucket, **settings):
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 36 self._index = index
37 self._bucket = bucket
37 self._bucket_folder = bucket_folder
38 38 self.storage_type = 'bucket'
39 self._bucket_main = bucket
39 40
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
44 # TODO: Add it all over the place...
45 self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root')
46
47 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
41 self.fs = fs
48 42
49 43 @property
50 44 def bucket(self):
51 """Cache bucket."""
52 return os.path.join(self._bucket_root, self._bucket)
45 """Cache bucket final path."""
46 return os.path.join(self._bucket_main, self._bucket_folder)
53 47
54 48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
55 49 key_file = f'{archive_key}-{self.key_suffix}'
56 50 return key_file, os.path.join(self.bucket, key_file)
57 51
58 52 def _get_writer(self, path, mode):
59 53 return self.fs.open(path, 'wb')
60 54
61 55 def _write_file(self, full_path, iterator, mode):
62 if self._bucket_root:
63 if not self.fs.exists(self._bucket_root):
64 self.fs.mkdir(self._bucket_root)
65 56
66 # ensure bucket exists
57 # ensure folder in bucket exists
67 58 destination = self.bucket
68 59 if not self.fs.exists(destination):
69 60 self.fs.mkdir(destination, s3_additional_kwargs={})
70 61
71 62 writer = self._get_writer(full_path, mode)
72 63
73 64 digest = hashlib.sha256()
74 65 with writer:
75 66 size = 0
76 67 for chunk in iterator:
77 68 size += len(chunk)
78 69 digest.update(chunk)
79 70 writer.write(chunk)
80 71
81 72 sha256 = digest.hexdigest()
82 73 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
83 74 return size, sha256
84 75
85 76 def store(self, key, value_reader, metadata: dict | None = None):
86 77 return self._store(key, value_reader, metadata, mode='wb')
87 78
88 79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
89 80 return self._fetch(key, retry, retry_attempts, retry_backoff)
90 81
91 82 def remove(self, key):
92 83 return self._remove(key)
93 84
94 85 def random_filename(self):
95 86 """Return filename and full-path tuple for file storage.
96 87
97 88 Filename will be a randomly generated 28 character hexadecimal string
98 89 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
99 90 reduce the size of directories. On older filesystems, lookups in
100 91 directories with many files may be slow.
101 92 """
102 93
103 94 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
104 95
105 96 archive_name = hex_name[4:] + '.archive_cache'
106 97 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
107 98
108 99 full_path = os.path.join(self.bucket, filename)
109 100 return archive_name, full_path
110 101
111 102 def __repr__(self):
112 103 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
113 104
114 105
115 106 class ObjectStoreCache(BaseCache):
116 shard_name = 'shard-bucket-%03d'
107 shard_name = 'shard-%03d'
108 shard_cls = S3Shard
117 109
118 110 def __init__(self, locking_url, **settings):
119 111 """
120 112 Initialize objectstore cache instance.
121 113
122 114 :param str locking_url: redis url for a lock
123 115 :param settings: settings dict
124 116
125 117 """
126 118 self._locking_url = locking_url
127 119 self._config = settings
128 120
129 121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
130 self._storage_path = objectstore_url
122 self._storage_path = objectstore_url # common path for all from BaseCache
131 123
132 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 if self._shard_count < 1:
126 raise ValueError('cache_shards must be 1 or more')
127
128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 if not self._bucket:
130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
133 131
134 132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
135 133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
136 134
137 135 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
138 136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
139 137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
140 138
141 log.debug('Initializing archival cache instance under %s', objectstore_url)
139 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 key = settings.pop('archive_cache.objectstore.key')
141 secret = settings.pop('archive_cache.objectstore.secret')
142
143 log.debug('Initializing %s archival cache instance under %s', self)
144
145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146
147 # init main bucket
148 if not fs.exists(self._bucket):
149 fs.mkdir(self._bucket)
150
142 151 self._shards = tuple(
143 S3Shard(
152 self.shard_cls(
144 153 index=num,
145 bucket=self.shard_name % num,
154 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
156 fs=fs,
146 157 **settings,
147 158 )
148 for num in range(self._count)
159 for num in range(self._shard_count)
149 160 )
150 161 self._hash = self._shards[0].hash
151 162
152 def _get_shard(self, key) -> S3Shard:
153 index = self._hash(key) % self._count
154 shard = self._shards[index]
155 return shard
156
157 163 def _get_size(self, shard, archive_path):
158 164 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now