Show More
@@ -19,7 +19,7 b' from .fanout_cache import get_archival_c' | |||
|
19 | 19 | from .fanout_cache import get_archival_config |
|
20 | 20 | |
|
21 | 21 | from .utils import archive_iterator |
|
22 | from .utils import ArchiveCacheLock | |
|
22 | from .utils import ArchiveCacheGenerationLock | |
|
23 | 23 | |
|
24 | 24 | |
|
25 | 25 | def includeme(config): |
@@ -111,6 +111,11 b' class FileSystemCache:' | |||
|
111 | 111 | self._index = index |
|
112 | 112 | self._directory = directory |
|
113 | 113 | |
|
114 | @property | |
|
115 | def directory(self): | |
|
116 | """Cache directory.""" | |
|
117 | return self._directory | |
|
118 | ||
|
114 | 119 | def _write_file(self, full_path, iterator, mode, encoding=None): |
|
115 | 120 | full_dir, _ = os.path.split(full_path) |
|
116 | 121 | |
@@ -168,8 +173,17 b' class FileSystemCache:' | |||
|
168 | 173 | |
|
169 | 174 | return key, size, MODE_BINARY, filename, _metadata |
|
170 | 175 | |
|
171 | def fetch(self, key) -> tuple[typing.BinaryIO, dict]: | |
|
176 | def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]: | |
|
177 | ||
|
178 | if retry: | |
|
179 | for attempt in range(retry_attempts): | |
|
180 | if key in self: | |
|
181 | break | |
|
182 | # we dind't find the key, wait 1s, and re-check | |
|
183 | time.sleep(1) | |
|
184 | ||
|
172 | 185 | if key not in self: |
|
186 | log.exception('requested {key} not found in {self}', key, self) | |
|
173 | 187 | raise KeyError(key) |
|
174 | 188 | |
|
175 | 189 | key_file = self._get_keyfile(key) |
@@ -179,7 +193,7 b' class FileSystemCache:' | |||
|
179 | 193 | filename = metadata['filename'] |
|
180 | 194 | |
|
181 | 195 | try: |
|
182 |
return open(os.path.join(self. |
|
|
196 | return open(os.path.join(self.directory, filename), 'rb'), metadata | |
|
183 | 197 | finally: |
|
184 | 198 | # update usage stats, count and accessed |
|
185 | 199 | metadata["access_count"] = metadata.get("access_count", 0) + 1 |
@@ -201,7 +215,7 b' class FileSystemCache:' | |||
|
201 | 215 | sub_dir = os.path.join(hex_name[:2], hex_name[2:4]) |
|
202 | 216 | name = hex_name[4:] + '.archive_cache' |
|
203 | 217 | filename = os.path.join(sub_dir, name) |
|
204 |
full_path = os.path.join(self. |
|
|
218 | full_path = os.path.join(self.directory, filename) | |
|
205 | 219 | return filename, full_path |
|
206 | 220 | |
|
207 | 221 | def hash(self, key): |
@@ -224,6 +238,9 b' class FileSystemCache:' | |||
|
224 | 238 | key_file = self._get_keyfile(key) |
|
225 | 239 | return os.path.exists(key_file) |
|
226 | 240 | |
|
241 | def __repr__(self): | |
|
242 | return f'FileSystemCache(index={self._index}, dir={self.directory})' | |
|
243 | ||
|
227 | 244 | |
|
228 | 245 | class FanoutCache: |
|
229 | 246 | """Cache that shards keys and values.""" |
@@ -261,6 +278,11 b' class FanoutCache:' | |||
|
261 | 278 | ) |
|
262 | 279 | self._hash = self._shards[0].hash |
|
263 | 280 | |
|
281 | @property | |
|
282 | def directory(self): | |
|
283 | """Cache directory.""" | |
|
284 | return self._directory | |
|
285 | ||
|
264 | 286 | def get_lock(self, lock_key): |
|
265 | 287 | return GenerationLock(lock_key, self._locking_url) |
|
266 | 288 | |
@@ -273,11 +295,11 b' class FanoutCache:' | |||
|
273 | 295 | shard = self._get_shard(key) |
|
274 | 296 | return shard.store(key, value_reader, metadata) |
|
275 | 297 | |
|
276 | def fetch(self, key): | |
|
298 | def fetch(self, key, retry=False, retry_attempts=10): | |
|
277 | 299 | """Return file handle corresponding to `key` from cache. |
|
278 | 300 | """ |
|
279 | 301 | shard = self._get_shard(key) |
|
280 | return shard.fetch(key) | |
|
302 | return shard.fetch(key, retry=retry, retry_attempts=retry_attempts) | |
|
281 | 303 | |
|
282 | 304 | def has_key(self, key): |
|
283 | 305 | """Return `True` if `key` matching item is found in cache. |
@@ -325,9 +347,9 b' class FanoutCache:' | |||
|
325 | 347 | data = [] |
|
326 | 348 | cnt = 1 |
|
327 | 349 | for shard in self._shards: |
|
328 |
for key_file in os.listdir(shard. |
|
|
350 | for key_file in os.listdir(shard.directory): | |
|
329 | 351 | if key_file.endswith('.key'): |
|
330 |
key_file_path = os.path.join(shard. |
|
|
352 | key_file_path = os.path.join(shard.directory, key_file) | |
|
331 | 353 | with open(key_file_path, 'rb') as f: |
|
332 | 354 | metadata = json.loads(f.read()) |
|
333 | 355 |
@@ -17,7 +17,7 b'' | |||
|
17 | 17 | |
|
18 | 18 | import redis |
|
19 | 19 | from ..._vendor import redis_lock |
|
20 | from .utils import ArchiveCacheLock | |
|
20 | from .utils import ArchiveCacheGenerationLock | |
|
21 | 21 | |
|
22 | 22 | |
|
23 | 23 | class GenerationLock: |
@@ -52,7 +52,7 b' class GenerationLock:' | |||
|
52 | 52 | def __enter__(self): |
|
53 | 53 | acquired = self.lock.acquire(blocking=False) |
|
54 | 54 | if not acquired: |
|
55 | raise ArchiveCacheLock('Failed to create a lock') | |
|
55 | raise ArchiveCacheGenerationLock('Failed to create a lock') | |
|
56 | 56 | |
|
57 | 57 | def __exit__(self, exc_type, exc_val, exc_tb): |
|
58 | 58 | self.lock.release() |
General Comments 0
You need to be logged in to leave comments.
Login now