##// END OF EJS Templates
feat(archive-cache): added presigned_url support for objectstore
super-admin -
r5450:6c0bdadc default
parent child Browse files
Show More
@@ -54,6 +54,9 b' class BaseShard:'
54 54 def random_filename(self):
55 55 raise NotImplementedError
56 56
57 def store(self, *args, **kwargs):
58 raise NotImplementedError
59
57 60 def _store(self, key, value_reader, metadata, mode):
58 61 (filename, # hash-name
59 62 full_path # full-path/hash-name
@@ -91,7 +94,11 b' class BaseShard:'
91 94
92 95 return key, filename, size, _metadata
93 96
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
97 def fetch(self, *args, **kwargs):
98 raise NotImplementedError
99
100 def _fetch(self, key, retry, retry_attempts, retry_backoff,
101 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
95 102 if retry is NOT_GIVEN:
96 103 retry = False
97 104 if retry_attempts is NOT_GIVEN:
@@ -113,6 +120,8 b' class BaseShard:'
113 120 metadata = json.loads(f.read())
114 121
115 122 archive_path = metadata['archive_full_path']
123 if presigned_url_expires and presigned_url_expires > 0:
124 metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires)
116 125
117 126 try:
118 127 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
@@ -125,6 +134,9 b' class BaseShard:'
125 134 with self.fs.open(key_file_path, 'wb') as f:
126 135 f.write(json.dumps(metadata))
127 136
137 def remove(self, *args, **kwargs):
138 raise NotImplementedError
139
128 140 def _remove(self, key):
129 141 if key not in self:
130 142 log.exception(f'requested key={key} not found in {self}')
@@ -161,12 +173,14 b' class BaseShard:'
161 173 class BaseCache:
162 174 _locking_url: str = ''
163 175 _storage_path: str = ''
164 _config = {}
176 _config: dict = {}
165 177 retry = False
166 retry_attempts = 0
167 retry_backoff = 1
178 retry_attempts: int = 0
179 retry_backoff: int | float = 1
168 180 _shards = tuple()
169 181 shard_cls = BaseShard
182 # define the presigned url expiration, 0 == disabled
183 presigned_url_expires: int = 0
170 184
171 185 def __contains__(self, key):
172 186 """Return `True` if `key` matching item is found in cache.
@@ -221,9 +235,13 b' class BaseCache:'
221 235 if retry_attempts is NOT_GIVEN:
222 236 retry_attempts = self.retry_attempts
223 237 retry_backoff = self.retry_backoff
238 presigned_url_expires = self.presigned_url_expires
224 239
225 240 shard = self._get_shard(key)
226 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
241 return shard.fetch(key, retry=retry,
242 retry_attempts=retry_attempts,
243 retry_backoff=retry_backoff,
244 presigned_url_expires=presigned_url_expires)
227 245
228 246 def remove(self, key):
229 247 shard = self._get_shard(key)
@@ -352,4 +370,3 b' class BaseCache:'
352 370 total_size += metadata['size']
353 371
354 372 return total_files, total_size, meta
355
@@ -20,6 +20,7 b' import codecs'
20 20 import hashlib
21 21 import logging
22 22 import os
23 import typing
23 24
24 25 import fsspec
25 26
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)'
33 34 class FileSystemShard(BaseShard):
34 35
35 36 def __init__(self, index, directory, directory_folder, fs, **settings):
36 self._index = index
37 self._directory = directory
38 self._directory_folder = directory_folder
39 self.storage_type = 'directory'
37 self._index: int = index
38 self._directory: str = directory
39 self._directory_folder: str = directory_folder
40 self.storage_type: str = 'directory'
40 41
41 42 self.fs = fs
42 43
43 44 @property
44 def directory(self):
45 def directory(self) -> str:
45 46 """Cache directory final path."""
46 47 return os.path.join(self._directory, self._directory_folder)
47 48
48 49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}.{self.key_suffix}'
50 key_file: str = f'{archive_key}.{self.key_suffix}'
50 51 return key_file, os.path.join(self.directory, key_file)
51 52
52 53 def _get_writer(self, path, mode):
@@ -62,6 +63,7 b' class FileSystemShard(BaseShard):'
62 63 continue
63 64
64 65 def _write_file(self, full_path, iterator, mode):
66
65 67 # ensure dir exists
66 68 destination, _ = os.path.split(full_path)
67 69 if not self.fs.exists(destination):
@@ -89,7 +91,8 b' class FileSystemShard(BaseShard):'
89 91 def store(self, key, value_reader, metadata: dict | None = None):
90 92 return self._store(key, value_reader, metadata, mode='xb')
91 93
92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
94 def fetch(self, key, retry=NOT_GIVEN,
95 retry_attempts=NOT_GIVEN, retry_backoff=1, **kwargs) -> tuple[ShardFileReader, dict]:
93 96 return self._fetch(key, retry, retry_attempts, retry_backoff)
94 97
95 98 def remove(self, key):
@@ -117,7 +120,7 b' class FileSystemShard(BaseShard):'
117 120
118 121
119 122 class FileSystemFanoutCache(BaseCache):
120 shard_name = 'shard_%03d'
123 shard_name: str = 'shard_{:03d}'
121 124 shard_cls = FileSystemShard
122 125
123 126 def __init__(self, locking_url, **settings):
@@ -162,7 +165,7 b' class FileSystemFanoutCache(BaseCache):'
162 165 self.shard_cls(
163 166 index=num,
164 167 directory=directory,
165 directory_folder=self.shard_name % num,
168 directory_folder=self.shard_name.format(num),
166 169 fs=fs,
167 170 **settings,
168 171 )
@@ -20,6 +20,7 b' import codecs'
20 20 import hashlib
21 21 import logging
22 22 import os
23 import typing
23 24
24 25 import fsspec
25 26
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)'
33 34 class S3Shard(BaseShard):
34 35
35 36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 self._index = index
37 self._bucket_folder = bucket_folder
38 self.storage_type = 'bucket'
39 self._bucket_main = bucket
37 self._index: int = index
38 self._bucket_folder: str = bucket_folder
39 self.storage_type: str = 'bucket'
40 self._bucket_main: str = bucket
40 41
41 42 self.fs = fs
42 43
43 44 @property
44 def bucket(self):
45 def bucket(self) -> str:
45 46 """Cache bucket final path."""
46 47 return os.path.join(self._bucket_main, self._bucket_folder)
47 48
48 49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}-{self.key_suffix}'
50 key_file: str = f'{archive_key}-{self.key_suffix}'
50 51 return key_file, os.path.join(self.bucket, key_file)
51 52
52 53 def _get_writer(self, path, mode):
@@ -76,8 +77,10 b' class S3Shard(BaseShard):'
76 77 def store(self, key, value_reader, metadata: dict | None = None):
77 78 return self._store(key, value_reader, metadata, mode='wb')
78 79
79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
80 return self._fetch(key, retry, retry_attempts, retry_backoff)
80 def fetch(self, key, retry=NOT_GIVEN,
81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
81 84
82 85 def remove(self, key):
83 86 return self._remove(key)
@@ -104,7 +107,7 b' class S3Shard(BaseShard):'
104 107
105 108
106 109 class ObjectStoreCache(BaseCache):
107 shard_name = 'shard-%03d'
110 shard_name: str = 'shard-{:03d}'
108 111 shard_cls = S3Shard
109 112
110 113 def __init__(self, locking_url, **settings):
@@ -152,7 +155,7 b' class ObjectStoreCache(BaseCache):'
152 155 self.shard_cls(
153 156 index=num,
154 157 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
158 bucket_folder=self.shard_name.format(num),
156 159 fs=fs,
157 160 **settings,
158 161 )
@@ -162,3 +165,6 b' class ObjectStoreCache(BaseCache):'
162 165
163 166 def _get_size(self, shard, archive_path):
164 167 return shard.fs.info(archive_path)['size']
168
169 def set_presigned_url_expiry(self, val: int) -> None:
170 self.presigned_url_expires = val
General Comments 0
You need to be logged in to leave comments. Login now