Show More
@@ -54,6 +54,9 b' class BaseShard:' | |||
|
54 | 54 | def random_filename(self): |
|
55 | 55 | raise NotImplementedError |
|
56 | 56 | |
|
57 | def store(self, *args, **kwargs): | |
|
58 | raise NotImplementedError | |
|
59 | ||
|
57 | 60 | def _store(self, key, value_reader, metadata, mode): |
|
58 | 61 | (filename, # hash-name |
|
59 | 62 | full_path # full-path/hash-name |
@@ -91,7 +94,11 b' class BaseShard:' | |||
|
91 | 94 | |
|
92 | 95 | return key, filename, size, _metadata |
|
93 | 96 | |
|
94 | def _fetch(self, key, retry, retry_attempts, retry_backoff): | |
|
97 | def fetch(self, *args, **kwargs): | |
|
98 | raise NotImplementedError | |
|
99 | ||
|
100 | def _fetch(self, key, retry, retry_attempts, retry_backoff, | |
|
101 | presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: | |
|
95 | 102 | if retry is NOT_GIVEN: |
|
96 | 103 | retry = False |
|
97 | 104 | if retry_attempts is NOT_GIVEN: |
@@ -113,6 +120,8 b' class BaseShard:' | |||
|
113 | 120 | metadata = json.loads(f.read()) |
|
114 | 121 | |
|
115 | 122 | archive_path = metadata['archive_full_path'] |
|
123 | if presigned_url_expires and presigned_url_expires > 0: | |
|
124 | metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires) | |
|
116 | 125 | |
|
117 | 126 | try: |
|
118 | 127 | return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata |
@@ -125,6 +134,9 b' class BaseShard:' | |||
|
125 | 134 | with self.fs.open(key_file_path, 'wb') as f: |
|
126 | 135 | f.write(json.dumps(metadata)) |
|
127 | 136 | |
|
137 | def remove(self, *args, **kwargs): | |
|
138 | raise NotImplementedError | |
|
139 | ||
|
128 | 140 | def _remove(self, key): |
|
129 | 141 | if key not in self: |
|
130 | 142 | log.exception(f'requested key={key} not found in {self}') |
@@ -161,12 +173,14 b' class BaseShard:' | |||
|
161 | 173 | class BaseCache: |
|
162 | 174 | _locking_url: str = '' |
|
163 | 175 | _storage_path: str = '' |
|
164 | _config = {} | |
|
176 | _config: dict = {} | |
|
165 | 177 | retry = False |
|
166 | retry_attempts = 0 | |
|
167 | retry_backoff = 1 | |
|
178 | retry_attempts: int = 0 | |
|
179 | retry_backoff: int | float = 1 | |
|
168 | 180 | _shards = tuple() |
|
169 | 181 | shard_cls = BaseShard |
|
182 | # define the presigned url expiration, 0 == disabled | |
|
183 | presigned_url_expires: int = 0 | |
|
170 | 184 | |
|
171 | 185 | def __contains__(self, key): |
|
172 | 186 | """Return `True` if `key` matching item is found in cache. |
@@ -221,9 +235,13 b' class BaseCache:' | |||
|
221 | 235 | if retry_attempts is NOT_GIVEN: |
|
222 | 236 | retry_attempts = self.retry_attempts |
|
223 | 237 | retry_backoff = self.retry_backoff |
|
238 | presigned_url_expires = self.presigned_url_expires | |
|
224 | 239 | |
|
225 | 240 | shard = self._get_shard(key) |
|
226 | return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff) | |
|
241 | return shard.fetch(key, retry=retry, | |
|
242 | retry_attempts=retry_attempts, | |
|
243 | retry_backoff=retry_backoff, | |
|
244 | presigned_url_expires=presigned_url_expires) | |
|
227 | 245 | |
|
228 | 246 | def remove(self, key): |
|
229 | 247 | shard = self._get_shard(key) |
@@ -352,4 +370,3 b' class BaseCache:' | |||
|
352 | 370 | total_size += metadata['size'] |
|
353 | 371 | |
|
354 | 372 | return total_files, total_size, meta |
|
355 |
@@ -20,6 +20,7 b' import codecs' | |||
|
20 | 20 | import hashlib |
|
21 | 21 | import logging |
|
22 | 22 | import os |
|
23 | import typing | |
|
23 | 24 | |
|
24 | 25 | import fsspec |
|
25 | 26 | |
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)' | |||
|
33 | 34 | class FileSystemShard(BaseShard): |
|
34 | 35 | |
|
35 | 36 | def __init__(self, index, directory, directory_folder, fs, **settings): |
|
36 | self._index = index | |
|
37 | self._directory = directory | |
|
38 | self._directory_folder = directory_folder | |
|
39 | self.storage_type = 'directory' | |
|
37 | self._index: int = index | |
|
38 | self._directory: str = directory | |
|
39 | self._directory_folder: str = directory_folder | |
|
40 | self.storage_type: str = 'directory' | |
|
40 | 41 | |
|
41 | 42 | self.fs = fs |
|
42 | 43 | |
|
43 | 44 | @property |
|
44 | def directory(self): | |
|
45 | def directory(self) -> str: | |
|
45 | 46 | """Cache directory final path.""" |
|
46 | 47 | return os.path.join(self._directory, self._directory_folder) |
|
47 | 48 | |
|
48 | 49 | def _get_keyfile(self, archive_key) -> tuple[str, str]: |
|
49 | key_file = f'{archive_key}.{self.key_suffix}' | |
|
50 | key_file: str = f'{archive_key}.{self.key_suffix}' | |
|
50 | 51 | return key_file, os.path.join(self.directory, key_file) |
|
51 | 52 | |
|
52 | 53 | def _get_writer(self, path, mode): |
@@ -62,6 +63,7 b' class FileSystemShard(BaseShard):' | |||
|
62 | 63 | continue |
|
63 | 64 | |
|
64 | 65 | def _write_file(self, full_path, iterator, mode): |
|
66 | ||
|
65 | 67 | # ensure dir exists |
|
66 | 68 | destination, _ = os.path.split(full_path) |
|
67 | 69 | if not self.fs.exists(destination): |
@@ -89,7 +91,8 b' class FileSystemShard(BaseShard):' | |||
|
89 | 91 | def store(self, key, value_reader, metadata: dict | None = None): |
|
90 | 92 | return self._store(key, value_reader, metadata, mode='xb') |
|
91 | 93 | |
|
92 | def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: | |
|
94 | def fetch(self, key, retry=NOT_GIVEN, | |
|
95 | retry_attempts=NOT_GIVEN, retry_backoff=1, **kwargs) -> tuple[ShardFileReader, dict]: | |
|
93 | 96 | return self._fetch(key, retry, retry_attempts, retry_backoff) |
|
94 | 97 | |
|
95 | 98 | def remove(self, key): |
@@ -117,7 +120,7 b' class FileSystemShard(BaseShard):' | |||
|
117 | 120 | |
|
118 | 121 | |
|
119 | 122 | class FileSystemFanoutCache(BaseCache): |
|
120 |
shard_name = 'shard_ |
|
|
123 | shard_name: str = 'shard_{:03d}' | |
|
121 | 124 | shard_cls = FileSystemShard |
|
122 | 125 | |
|
123 | 126 | def __init__(self, locking_url, **settings): |
@@ -162,7 +165,7 b' class FileSystemFanoutCache(BaseCache):' | |||
|
162 | 165 | self.shard_cls( |
|
163 | 166 | index=num, |
|
164 | 167 | directory=directory, |
|
165 |
directory_folder=self.shard_name |
|
|
168 | directory_folder=self.shard_name.format(num), | |
|
166 | 169 | fs=fs, |
|
167 | 170 | **settings, |
|
168 | 171 | ) |
@@ -20,6 +20,7 b' import codecs' | |||
|
20 | 20 | import hashlib |
|
21 | 21 | import logging |
|
22 | 22 | import os |
|
23 | import typing | |
|
23 | 24 | |
|
24 | 25 | import fsspec |
|
25 | 26 | |
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)' | |||
|
33 | 34 | class S3Shard(BaseShard): |
|
34 | 35 | |
|
35 | 36 | def __init__(self, index, bucket, bucket_folder, fs, **settings): |
|
36 | self._index = index | |
|
37 | self._bucket_folder = bucket_folder | |
|
38 | self.storage_type = 'bucket' | |
|
39 | self._bucket_main = bucket | |
|
37 | self._index: int = index | |
|
38 | self._bucket_folder: str = bucket_folder | |
|
39 | self.storage_type: str = 'bucket' | |
|
40 | self._bucket_main: str = bucket | |
|
40 | 41 | |
|
41 | 42 | self.fs = fs |
|
42 | 43 | |
|
43 | 44 | @property |
|
44 | def bucket(self): | |
|
45 | def bucket(self) -> str: | |
|
45 | 46 | """Cache bucket final path.""" |
|
46 | 47 | return os.path.join(self._bucket_main, self._bucket_folder) |
|
47 | 48 | |
|
48 | 49 | def _get_keyfile(self, archive_key) -> tuple[str, str]: |
|
49 | key_file = f'{archive_key}-{self.key_suffix}' | |
|
50 | key_file: str = f'{archive_key}-{self.key_suffix}' | |
|
50 | 51 | return key_file, os.path.join(self.bucket, key_file) |
|
51 | 52 | |
|
52 | 53 | def _get_writer(self, path, mode): |
@@ -76,8 +77,10 b' class S3Shard(BaseShard):' | |||
|
76 | 77 | def store(self, key, value_reader, metadata: dict | None = None): |
|
77 | 78 | return self._store(key, value_reader, metadata, mode='wb') |
|
78 | 79 | |
|
79 | def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: | |
|
80 |
|
|
|
80 | def fetch(self, key, retry=NOT_GIVEN, | |
|
81 | retry_attempts=NOT_GIVEN, retry_backoff=1, | |
|
82 | presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: | |
|
83 | return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires) | |
|
81 | 84 | |
|
82 | 85 | def remove(self, key): |
|
83 | 86 | return self._remove(key) |
@@ -104,7 +107,7 b' class S3Shard(BaseShard):' | |||
|
104 | 107 | |
|
105 | 108 | |
|
106 | 109 | class ObjectStoreCache(BaseCache): |
|
107 |
shard_name = 'shard- |
|
|
110 | shard_name: str = 'shard-{:03d}' | |
|
108 | 111 | shard_cls = S3Shard |
|
109 | 112 | |
|
110 | 113 | def __init__(self, locking_url, **settings): |
@@ -152,7 +155,7 b' class ObjectStoreCache(BaseCache):' | |||
|
152 | 155 | self.shard_cls( |
|
153 | 156 | index=num, |
|
154 | 157 | bucket=self._bucket, |
|
155 |
bucket_folder=self.shard_name |
|
|
158 | bucket_folder=self.shard_name.format(num), | |
|
156 | 159 | fs=fs, |
|
157 | 160 | **settings, |
|
158 | 161 | ) |
@@ -162,3 +165,6 b' class ObjectStoreCache(BaseCache):' | |||
|
162 | 165 | |
|
163 | 166 | def _get_size(self, shard, archive_path): |
|
164 | 167 | return shard.fs.info(archive_path)['size'] |
|
168 | ||
|
169 | def set_presigned_url_expiry(self, val: int) -> None: | |
|
170 | self.presigned_url_expires = val |
General Comments 0
You need to be logged in to leave comments.
Login now