##// END OF EJS Templates
feat(archive-cache): added retry mechanism, and some code cleanups
super-admin -
r5426:47a7e5de default
parent child Browse files
Show More
@@ -40,7 +40,7 b' from rhodecode.lib import diffs, helpers'
40 40 from rhodecode.lib import audit_logger
41 41 from rhodecode.lib.hash_utils import sha1_safe
42 42 from rhodecode.lib.rc_cache.archive_cache import (
43 get_archival_cache_store, get_archival_config, ArchiveCacheLock, archive_iterator)
43 get_archival_cache_store, get_archival_config, ArchiveCacheGenerationLock, archive_iterator)
44 44 from rhodecode.lib.str_utils import safe_bytes, convert_special_chars
45 45 from rhodecode.lib.view_utils import parse_path_ref
46 46 from rhodecode.lib.exceptions import NonRelativePathError
@@ -444,7 +444,8 b' class RepoFilesView(RepoAppView):'
444 444 archive_at_path=at_path, cache_config=d_cache_conf)
445 445 except ImproperArchiveTypeError:
446 446 return _('Unknown archive type')
447 except ArchiveCacheLock:
447
448 except ArchiveCacheGenerationLock:
448 449 retry_after = round(random.uniform(0.3, 3.0), 1)
449 450 time.sleep(retry_after)
450 451
@@ -453,7 +454,7 b' class RepoFilesView(RepoAppView):'
453 454 f"archive {archive_name_key} generation in progress, Retry-After={retry_after}, Location={location}"
454 455 )
455 456 response.headers["Retry-After"] = str(retry_after)
456 response.status_code = 307 # temporary redirect
457 response.status_code = 307 # temporary redirect
457 458
458 459 response.location = location
459 460 return response
@@ -20,7 +20,7 b' from .fanout_cache import get_archival_c'
20 20 from .fanout_cache import get_archival_config
21 21
22 22 from .utils import archive_iterator
23 from .utils import ArchiveCacheLock
23 from .utils import ArchiveCacheGenerationLock
24 24
25 25
26 26 def includeme(config):
@@ -112,6 +112,11 b' class FileSystemCache:'
112 112 self._index = index
113 113 self._directory = directory
114 114
115 @property
116 def directory(self):
117 """Cache directory."""
118 return self._directory
119
115 120 def _write_file(self, full_path, iterator, mode, encoding=None):
116 121 full_dir, _ = os.path.split(full_path)
117 122
@@ -169,8 +174,17 b' class FileSystemCache:'
169 174
170 175 return key, size, MODE_BINARY, filename, _metadata
171 176
172 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
177 def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
178
179 if retry:
180 for attempt in range(retry_attempts):
181 if key in self:
182 break
183 # we dind't find the key, wait 1s, and re-check
184 time.sleep(1)
185
173 186 if key not in self:
187 log.exception('requested {key} not found in {self}', key, self)
174 188 raise KeyError(key)
175 189
176 190 key_file = self._get_keyfile(key)
@@ -180,7 +194,7 b' class FileSystemCache:'
180 194 filename = metadata['filename']
181 195
182 196 try:
183 return open(os.path.join(self._directory, filename), 'rb'), metadata
197 return open(os.path.join(self.directory, filename), 'rb'), metadata
184 198 finally:
185 199 # update usage stats, count and accessed
186 200 metadata["access_count"] = metadata.get("access_count", 0) + 1
@@ -202,7 +216,7 b' class FileSystemCache:'
202 216 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
203 217 name = hex_name[4:] + '.archive_cache'
204 218 filename = os.path.join(sub_dir, name)
205 full_path = os.path.join(self._directory, filename)
219 full_path = os.path.join(self.directory, filename)
206 220 return filename, full_path
207 221
208 222 def hash(self, key):
@@ -225,6 +239,9 b' class FileSystemCache:'
225 239 key_file = self._get_keyfile(key)
226 240 return os.path.exists(key_file)
227 241
242 def __repr__(self):
243 return f'FileSystemCache(index={self._index}, dir={self.directory})'
244
228 245
229 246 class FanoutCache:
230 247 """Cache that shards keys and values."""
@@ -262,6 +279,11 b' class FanoutCache:'
262 279 )
263 280 self._hash = self._shards[0].hash
264 281
282 @property
283 def directory(self):
284 """Cache directory."""
285 return self._directory
286
265 287 def get_lock(self, lock_key):
266 288 return GenerationLock(lock_key, self._locking_url)
267 289
@@ -274,11 +296,11 b' class FanoutCache:'
274 296 shard = self._get_shard(key)
275 297 return shard.store(key, value_reader, metadata)
276 298
277 def fetch(self, key):
299 def fetch(self, key, retry=False, retry_attempts=10):
278 300 """Return file handle corresponding to `key` from cache.
279 301 """
280 302 shard = self._get_shard(key)
281 return shard.fetch(key)
303 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
282 304
283 305 def has_key(self, key):
284 306 """Return `True` if `key` matching item is found in cache.
@@ -326,9 +348,9 b' class FanoutCache:'
326 348 data = []
327 349 cnt = 1
328 350 for shard in self._shards:
329 for key_file in os.listdir(shard._directory):
351 for key_file in os.listdir(shard.directory):
330 352 if key_file.endswith('.key'):
331 key_file_path = os.path.join(shard._directory, key_file)
353 key_file_path = os.path.join(shard.directory, key_file)
332 354 with open(key_file_path, 'rb') as f:
333 355 metadata = json.loads(f.read())
334 356
@@ -18,7 +18,7 b''
18 18
19 19 import redis
20 20 from ..._vendor import redis_lock
21 from .utils import ArchiveCacheLock
21 from .utils import ArchiveCacheGenerationLock
22 22
23 23
24 24 class GenerationLock:
@@ -53,7 +53,7 b' class GenerationLock:'
53 53 def __enter__(self):
54 54 acquired = self.lock.acquire(blocking=False)
55 55 if not acquired:
56 raise ArchiveCacheLock('Failed to create a lock')
56 raise ArchiveCacheGenerationLock('Failed to create a lock')
57 57
58 58 def __exit__(self, exc_type, exc_val, exc_tb):
59 59 self.lock.release()
@@ -19,7 +19,7 b''
19 19 import os
20 20
21 21
22 class ArchiveCacheLock(Exception):
22 class ArchiveCacheGenerationLock(Exception):
23 23 pass
24 24
25 25
General Comments 0
You need to be logged in to leave comments. Login now