Show More
@@ -54,6 +54,9 b' class BaseShard:' | |||||
54 | def random_filename(self): |
|
54 | def random_filename(self): | |
55 | raise NotImplementedError |
|
55 | raise NotImplementedError | |
56 |
|
56 | |||
|
57 | def store(self, *args, **kwargs): | |||
|
58 | raise NotImplementedError | |||
|
59 | ||||
57 | def _store(self, key, value_reader, metadata, mode): |
|
60 | def _store(self, key, value_reader, metadata, mode): | |
58 | (filename, # hash-name |
|
61 | (filename, # hash-name | |
59 | full_path # full-path/hash-name |
|
62 | full_path # full-path/hash-name | |
@@ -91,7 +94,11 b' class BaseShard:' | |||||
91 |
|
94 | |||
92 | return key, filename, size, _metadata |
|
95 | return key, filename, size, _metadata | |
93 |
|
96 | |||
94 | def _fetch(self, key, retry, retry_attempts, retry_backoff): |
|
97 | def fetch(self, *args, **kwargs): | |
|
98 | raise NotImplementedError | |||
|
99 | ||||
|
100 | def _fetch(self, key, retry, retry_attempts, retry_backoff, | |||
|
101 | presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: | |||
95 | if retry is NOT_GIVEN: |
|
102 | if retry is NOT_GIVEN: | |
96 | retry = False |
|
103 | retry = False | |
97 | if retry_attempts is NOT_GIVEN: |
|
104 | if retry_attempts is NOT_GIVEN: | |
@@ -113,6 +120,8 b' class BaseShard:' | |||||
113 | metadata = json.loads(f.read()) |
|
120 | metadata = json.loads(f.read()) | |
114 |
|
121 | |||
115 | archive_path = metadata['archive_full_path'] |
|
122 | archive_path = metadata['archive_full_path'] | |
|
123 | if presigned_url_expires and presigned_url_expires > 0: | |||
|
124 | metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires) | |||
116 |
|
125 | |||
117 | try: |
|
126 | try: | |
118 | return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata |
|
127 | return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata | |
@@ -125,6 +134,9 b' class BaseShard:' | |||||
125 | with self.fs.open(key_file_path, 'wb') as f: |
|
134 | with self.fs.open(key_file_path, 'wb') as f: | |
126 | f.write(json.dumps(metadata)) |
|
135 | f.write(json.dumps(metadata)) | |
127 |
|
136 | |||
|
137 | def remove(self, *args, **kwargs): | |||
|
138 | raise NotImplementedError | |||
|
139 | ||||
128 | def _remove(self, key): |
|
140 | def _remove(self, key): | |
129 | if key not in self: |
|
141 | if key not in self: | |
130 | log.exception(f'requested key={key} not found in {self}') |
|
142 | log.exception(f'requested key={key} not found in {self}') | |
@@ -161,12 +173,14 b' class BaseShard:' | |||||
161 | class BaseCache: |
|
173 | class BaseCache: | |
162 | _locking_url: str = '' |
|
174 | _locking_url: str = '' | |
163 | _storage_path: str = '' |
|
175 | _storage_path: str = '' | |
164 | _config = {} |
|
176 | _config: dict = {} | |
165 | retry = False |
|
177 | retry = False | |
166 | retry_attempts = 0 |
|
178 | retry_attempts: int = 0 | |
167 | retry_backoff = 1 |
|
179 | retry_backoff: int | float = 1 | |
168 | _shards = tuple() |
|
180 | _shards = tuple() | |
169 | shard_cls = BaseShard |
|
181 | shard_cls = BaseShard | |
|
182 | # define the presigned url expiration, 0 == disabled | |||
|
183 | presigned_url_expires: int = 0 | |||
170 |
|
184 | |||
171 | def __contains__(self, key): |
|
185 | def __contains__(self, key): | |
172 | """Return `True` if `key` matching item is found in cache. |
|
186 | """Return `True` if `key` matching item is found in cache. | |
@@ -221,9 +235,13 b' class BaseCache:' | |||||
221 | if retry_attempts is NOT_GIVEN: |
|
235 | if retry_attempts is NOT_GIVEN: | |
222 | retry_attempts = self.retry_attempts |
|
236 | retry_attempts = self.retry_attempts | |
223 | retry_backoff = self.retry_backoff |
|
237 | retry_backoff = self.retry_backoff | |
|
238 | presigned_url_expires = self.presigned_url_expires | |||
224 |
|
239 | |||
225 | shard = self._get_shard(key) |
|
240 | shard = self._get_shard(key) | |
226 | return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff) |
|
241 | return shard.fetch(key, retry=retry, | |
|
242 | retry_attempts=retry_attempts, | |||
|
243 | retry_backoff=retry_backoff, | |||
|
244 | presigned_url_expires=presigned_url_expires) | |||
227 |
|
245 | |||
228 | def remove(self, key): |
|
246 | def remove(self, key): | |
229 | shard = self._get_shard(key) |
|
247 | shard = self._get_shard(key) | |
@@ -352,4 +370,3 b' class BaseCache:' | |||||
352 | total_size += metadata['size'] |
|
370 | total_size += metadata['size'] | |
353 |
|
371 | |||
354 | return total_files, total_size, meta |
|
372 | return total_files, total_size, meta | |
355 |
|
@@ -20,6 +20,7 b' import codecs' | |||||
20 | import hashlib |
|
20 | import hashlib | |
21 | import logging |
|
21 | import logging | |
22 | import os |
|
22 | import os | |
|
23 | import typing | |||
23 |
|
24 | |||
24 | import fsspec |
|
25 | import fsspec | |
25 |
|
26 | |||
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)' | |||||
33 | class FileSystemShard(BaseShard): |
|
34 | class FileSystemShard(BaseShard): | |
34 |
|
35 | |||
35 | def __init__(self, index, directory, directory_folder, fs, **settings): |
|
36 | def __init__(self, index, directory, directory_folder, fs, **settings): | |
36 | self._index = index |
|
37 | self._index: int = index | |
37 | self._directory = directory |
|
38 | self._directory: str = directory | |
38 | self._directory_folder = directory_folder |
|
39 | self._directory_folder: str = directory_folder | |
39 | self.storage_type = 'directory' |
|
40 | self.storage_type: str = 'directory' | |
40 |
|
41 | |||
41 | self.fs = fs |
|
42 | self.fs = fs | |
42 |
|
43 | |||
43 | @property |
|
44 | @property | |
44 | def directory(self): |
|
45 | def directory(self) -> str: | |
45 | """Cache directory final path.""" |
|
46 | """Cache directory final path.""" | |
46 | return os.path.join(self._directory, self._directory_folder) |
|
47 | return os.path.join(self._directory, self._directory_folder) | |
47 |
|
48 | |||
48 | def _get_keyfile(self, archive_key) -> tuple[str, str]: |
|
49 | def _get_keyfile(self, archive_key) -> tuple[str, str]: | |
49 | key_file = f'{archive_key}.{self.key_suffix}' |
|
50 | key_file: str = f'{archive_key}.{self.key_suffix}' | |
50 | return key_file, os.path.join(self.directory, key_file) |
|
51 | return key_file, os.path.join(self.directory, key_file) | |
51 |
|
52 | |||
52 | def _get_writer(self, path, mode): |
|
53 | def _get_writer(self, path, mode): | |
@@ -62,6 +63,7 b' class FileSystemShard(BaseShard):' | |||||
62 | continue |
|
63 | continue | |
63 |
|
64 | |||
64 | def _write_file(self, full_path, iterator, mode): |
|
65 | def _write_file(self, full_path, iterator, mode): | |
|
66 | ||||
65 | # ensure dir exists |
|
67 | # ensure dir exists | |
66 | destination, _ = os.path.split(full_path) |
|
68 | destination, _ = os.path.split(full_path) | |
67 | if not self.fs.exists(destination): |
|
69 | if not self.fs.exists(destination): | |
@@ -89,7 +91,8 b' class FileSystemShard(BaseShard):' | |||||
89 | def store(self, key, value_reader, metadata: dict | None = None): |
|
91 | def store(self, key, value_reader, metadata: dict | None = None): | |
90 | return self._store(key, value_reader, metadata, mode='xb') |
|
92 | return self._store(key, value_reader, metadata, mode='xb') | |
91 |
|
93 | |||
92 | def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: |
|
94 | def fetch(self, key, retry=NOT_GIVEN, | |
|
95 | retry_attempts=NOT_GIVEN, retry_backoff=1, **kwargs) -> tuple[ShardFileReader, dict]: | |||
93 | return self._fetch(key, retry, retry_attempts, retry_backoff) |
|
96 | return self._fetch(key, retry, retry_attempts, retry_backoff) | |
94 |
|
97 | |||
95 | def remove(self, key): |
|
98 | def remove(self, key): | |
@@ -117,7 +120,7 b' class FileSystemShard(BaseShard):' | |||||
117 |
|
120 | |||
118 |
|
121 | |||
119 | class FileSystemFanoutCache(BaseCache): |
|
122 | class FileSystemFanoutCache(BaseCache): | |
120 |
shard_name = 'shard_ |
|
123 | shard_name: str = 'shard_{:03d}' | |
121 | shard_cls = FileSystemShard |
|
124 | shard_cls = FileSystemShard | |
122 |
|
125 | |||
123 | def __init__(self, locking_url, **settings): |
|
126 | def __init__(self, locking_url, **settings): | |
@@ -162,7 +165,7 b' class FileSystemFanoutCache(BaseCache):' | |||||
162 | self.shard_cls( |
|
165 | self.shard_cls( | |
163 | index=num, |
|
166 | index=num, | |
164 | directory=directory, |
|
167 | directory=directory, | |
165 |
directory_folder=self.shard_name |
|
168 | directory_folder=self.shard_name.format(num), | |
166 | fs=fs, |
|
169 | fs=fs, | |
167 | **settings, |
|
170 | **settings, | |
168 | ) |
|
171 | ) |
@@ -20,6 +20,7 b' import codecs' | |||||
20 | import hashlib |
|
20 | import hashlib | |
21 | import logging |
|
21 | import logging | |
22 | import os |
|
22 | import os | |
|
23 | import typing | |||
23 |
|
24 | |||
24 | import fsspec |
|
25 | import fsspec | |
25 |
|
26 | |||
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)' | |||||
33 | class S3Shard(BaseShard): |
|
34 | class S3Shard(BaseShard): | |
34 |
|
35 | |||
35 | def __init__(self, index, bucket, bucket_folder, fs, **settings): |
|
36 | def __init__(self, index, bucket, bucket_folder, fs, **settings): | |
36 | self._index = index |
|
37 | self._index: int = index | |
37 | self._bucket_folder = bucket_folder |
|
38 | self._bucket_folder: str = bucket_folder | |
38 | self.storage_type = 'bucket' |
|
39 | self.storage_type: str = 'bucket' | |
39 | self._bucket_main = bucket |
|
40 | self._bucket_main: str = bucket | |
40 |
|
41 | |||
41 | self.fs = fs |
|
42 | self.fs = fs | |
42 |
|
43 | |||
43 | @property |
|
44 | @property | |
44 | def bucket(self): |
|
45 | def bucket(self) -> str: | |
45 | """Cache bucket final path.""" |
|
46 | """Cache bucket final path.""" | |
46 | return os.path.join(self._bucket_main, self._bucket_folder) |
|
47 | return os.path.join(self._bucket_main, self._bucket_folder) | |
47 |
|
48 | |||
48 | def _get_keyfile(self, archive_key) -> tuple[str, str]: |
|
49 | def _get_keyfile(self, archive_key) -> tuple[str, str]: | |
49 | key_file = f'{archive_key}-{self.key_suffix}' |
|
50 | key_file: str = f'{archive_key}-{self.key_suffix}' | |
50 | return key_file, os.path.join(self.bucket, key_file) |
|
51 | return key_file, os.path.join(self.bucket, key_file) | |
51 |
|
52 | |||
52 | def _get_writer(self, path, mode): |
|
53 | def _get_writer(self, path, mode): | |
@@ -76,8 +77,10 b' class S3Shard(BaseShard):' | |||||
76 | def store(self, key, value_reader, metadata: dict | None = None): |
|
77 | def store(self, key, value_reader, metadata: dict | None = None): | |
77 | return self._store(key, value_reader, metadata, mode='wb') |
|
78 | return self._store(key, value_reader, metadata, mode='wb') | |
78 |
|
79 | |||
79 | def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]: |
|
80 | def fetch(self, key, retry=NOT_GIVEN, | |
80 |
|
|
81 | retry_attempts=NOT_GIVEN, retry_backoff=1, | |
|
82 | presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]: | |||
|
83 | return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires) | |||
81 |
|
84 | |||
82 | def remove(self, key): |
|
85 | def remove(self, key): | |
83 | return self._remove(key) |
|
86 | return self._remove(key) | |
@@ -104,7 +107,7 b' class S3Shard(BaseShard):' | |||||
104 |
|
107 | |||
105 |
|
108 | |||
106 | class ObjectStoreCache(BaseCache): |
|
109 | class ObjectStoreCache(BaseCache): | |
107 |
shard_name = 'shard- |
|
110 | shard_name: str = 'shard-{:03d}' | |
108 | shard_cls = S3Shard |
|
111 | shard_cls = S3Shard | |
109 |
|
112 | |||
110 | def __init__(self, locking_url, **settings): |
|
113 | def __init__(self, locking_url, **settings): | |
@@ -152,7 +155,7 b' class ObjectStoreCache(BaseCache):' | |||||
152 | self.shard_cls( |
|
155 | self.shard_cls( | |
153 | index=num, |
|
156 | index=num, | |
154 | bucket=self._bucket, |
|
157 | bucket=self._bucket, | |
155 |
bucket_folder=self.shard_name |
|
158 | bucket_folder=self.shard_name.format(num), | |
156 | fs=fs, |
|
159 | fs=fs, | |
157 | **settings, |
|
160 | **settings, | |
158 | ) |
|
161 | ) | |
@@ -162,3 +165,6 b' class ObjectStoreCache(BaseCache):' | |||||
162 |
|
165 | |||
163 | def _get_size(self, shard, archive_path): |
|
166 | def _get_size(self, shard, archive_path): | |
164 | return shard.fs.info(archive_path)['size'] |
|
167 | return shard.fs.info(archive_path)['size'] | |
|
168 | ||||
|
169 | def set_presigned_url_expiry(self, val: int) -> None: | |||
|
170 | self.presigned_url_expires = val |
General Comments 0
You need to be logged in to leave comments.
Login now