##// END OF EJS Templates
feat(archive-cache): added presigned_url support for objectstore
super-admin -
r5450:6c0bdadc default
parent child Browse files
Show More
@@ -54,6 +54,9 b' class BaseShard:'
54 def random_filename(self):
54 def random_filename(self):
55 raise NotImplementedError
55 raise NotImplementedError
56
56
57 def store(self, *args, **kwargs):
58 raise NotImplementedError
59
57 def _store(self, key, value_reader, metadata, mode):
60 def _store(self, key, value_reader, metadata, mode):
58 (filename, # hash-name
61 (filename, # hash-name
59 full_path # full-path/hash-name
62 full_path # full-path/hash-name
@@ -91,7 +94,11 b' class BaseShard:'
91
94
92 return key, filename, size, _metadata
95 return key, filename, size, _metadata
93
96
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
97 def fetch(self, *args, **kwargs):
98 raise NotImplementedError
99
100 def _fetch(self, key, retry, retry_attempts, retry_backoff,
101 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
95 if retry is NOT_GIVEN:
102 if retry is NOT_GIVEN:
96 retry = False
103 retry = False
97 if retry_attempts is NOT_GIVEN:
104 if retry_attempts is NOT_GIVEN:
@@ -113,6 +120,8 b' class BaseShard:'
113 metadata = json.loads(f.read())
120 metadata = json.loads(f.read())
114
121
115 archive_path = metadata['archive_full_path']
122 archive_path = metadata['archive_full_path']
123 if presigned_url_expires and presigned_url_expires > 0:
124 metadata['url'] = self.fs.url(archive_path, expires=presigned_url_expires)
116
125
117 try:
126 try:
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
127 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
@@ -125,6 +134,9 b' class BaseShard:'
125 with self.fs.open(key_file_path, 'wb') as f:
134 with self.fs.open(key_file_path, 'wb') as f:
126 f.write(json.dumps(metadata))
135 f.write(json.dumps(metadata))
127
136
137 def remove(self, *args, **kwargs):
138 raise NotImplementedError
139
128 def _remove(self, key):
140 def _remove(self, key):
129 if key not in self:
141 if key not in self:
130 log.exception(f'requested key={key} not found in {self}')
142 log.exception(f'requested key={key} not found in {self}')
@@ -161,12 +173,14 b' class BaseShard:'
161 class BaseCache:
173 class BaseCache:
162 _locking_url: str = ''
174 _locking_url: str = ''
163 _storage_path: str = ''
175 _storage_path: str = ''
164 _config = {}
176 _config: dict = {}
165 retry = False
177 retry = False
166 retry_attempts = 0
178 retry_attempts: int = 0
167 retry_backoff = 1
179 retry_backoff: int | float = 1
168 _shards = tuple()
180 _shards = tuple()
169 shard_cls = BaseShard
181 shard_cls = BaseShard
182 # define the presigned url expiration, 0 == disabled
183 presigned_url_expires: int = 0
170
184
171 def __contains__(self, key):
185 def __contains__(self, key):
172 """Return `True` if `key` matching item is found in cache.
186 """Return `True` if `key` matching item is found in cache.
@@ -221,9 +235,13 b' class BaseCache:'
221 if retry_attempts is NOT_GIVEN:
235 if retry_attempts is NOT_GIVEN:
222 retry_attempts = self.retry_attempts
236 retry_attempts = self.retry_attempts
223 retry_backoff = self.retry_backoff
237 retry_backoff = self.retry_backoff
238 presigned_url_expires = self.presigned_url_expires
224
239
225 shard = self._get_shard(key)
240 shard = self._get_shard(key)
226 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
241 return shard.fetch(key, retry=retry,
242 retry_attempts=retry_attempts,
243 retry_backoff=retry_backoff,
244 presigned_url_expires=presigned_url_expires)
227
245
228 def remove(self, key):
246 def remove(self, key):
229 shard = self._get_shard(key)
247 shard = self._get_shard(key)
@@ -352,4 +370,3 b' class BaseCache:'
352 total_size += metadata['size']
370 total_size += metadata['size']
353
371
354 return total_files, total_size, meta
372 return total_files, total_size, meta
355
@@ -20,6 +20,7 b' import codecs'
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23 import typing
23
24
24 import fsspec
25 import fsspec
25
26
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)'
33 class FileSystemShard(BaseShard):
34 class FileSystemShard(BaseShard):
34
35
35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 def __init__(self, index, directory, directory_folder, fs, **settings):
36 self._index = index
37 self._index: int = index
37 self._directory = directory
38 self._directory: str = directory
38 self._directory_folder = directory_folder
39 self._directory_folder: str = directory_folder
39 self.storage_type = 'directory'
40 self.storage_type: str = 'directory'
40
41
41 self.fs = fs
42 self.fs = fs
42
43
43 @property
44 @property
44 def directory(self):
45 def directory(self) -> str:
45 """Cache directory final path."""
46 """Cache directory final path."""
46 return os.path.join(self._directory, self._directory_folder)
47 return os.path.join(self._directory, self._directory_folder)
47
48
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}.{self.key_suffix}'
50 key_file: str = f'{archive_key}.{self.key_suffix}'
50 return key_file, os.path.join(self.directory, key_file)
51 return key_file, os.path.join(self.directory, key_file)
51
52
52 def _get_writer(self, path, mode):
53 def _get_writer(self, path, mode):
@@ -62,6 +63,7 b' class FileSystemShard(BaseShard):'
62 continue
63 continue
63
64
64 def _write_file(self, full_path, iterator, mode):
65 def _write_file(self, full_path, iterator, mode):
66
65 # ensure dir exists
67 # ensure dir exists
66 destination, _ = os.path.split(full_path)
68 destination, _ = os.path.split(full_path)
67 if not self.fs.exists(destination):
69 if not self.fs.exists(destination):
@@ -89,7 +91,8 b' class FileSystemShard(BaseShard):'
89 def store(self, key, value_reader, metadata: dict | None = None):
91 def store(self, key, value_reader, metadata: dict | None = None):
90 return self._store(key, value_reader, metadata, mode='xb')
92 return self._store(key, value_reader, metadata, mode='xb')
91
93
92 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
94 def fetch(self, key, retry=NOT_GIVEN,
95 retry_attempts=NOT_GIVEN, retry_backoff=1, **kwargs) -> tuple[ShardFileReader, dict]:
93 return self._fetch(key, retry, retry_attempts, retry_backoff)
96 return self._fetch(key, retry, retry_attempts, retry_backoff)
94
97
95 def remove(self, key):
98 def remove(self, key):
@@ -117,7 +120,7 b' class FileSystemShard(BaseShard):'
117
120
118
121
119 class FileSystemFanoutCache(BaseCache):
122 class FileSystemFanoutCache(BaseCache):
120 shard_name = 'shard_%03d'
123 shard_name: str = 'shard_{:03d}'
121 shard_cls = FileSystemShard
124 shard_cls = FileSystemShard
122
125
123 def __init__(self, locking_url, **settings):
126 def __init__(self, locking_url, **settings):
@@ -162,7 +165,7 b' class FileSystemFanoutCache(BaseCache):'
162 self.shard_cls(
165 self.shard_cls(
163 index=num,
166 index=num,
164 directory=directory,
167 directory=directory,
165 directory_folder=self.shard_name % num,
168 directory_folder=self.shard_name.format(num),
166 fs=fs,
169 fs=fs,
167 **settings,
170 **settings,
168 )
171 )
@@ -20,6 +20,7 b' import codecs'
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23 import typing
23
24
24 import fsspec
25 import fsspec
25
26
@@ -33,20 +34,20 b' log = logging.getLogger(__name__)'
33 class S3Shard(BaseShard):
34 class S3Shard(BaseShard):
34
35
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 self._index = index
37 self._index: int = index
37 self._bucket_folder = bucket_folder
38 self._bucket_folder: str = bucket_folder
38 self.storage_type = 'bucket'
39 self.storage_type: str = 'bucket'
39 self._bucket_main = bucket
40 self._bucket_main: str = bucket
40
41
41 self.fs = fs
42 self.fs = fs
42
43
43 @property
44 @property
44 def bucket(self):
45 def bucket(self) -> str:
45 """Cache bucket final path."""
46 """Cache bucket final path."""
46 return os.path.join(self._bucket_main, self._bucket_folder)
47 return os.path.join(self._bucket_main, self._bucket_folder)
47
48
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 key_file = f'{archive_key}-{self.key_suffix}'
50 key_file: str = f'{archive_key}-{self.key_suffix}'
50 return key_file, os.path.join(self.bucket, key_file)
51 return key_file, os.path.join(self.bucket, key_file)
51
52
52 def _get_writer(self, path, mode):
53 def _get_writer(self, path, mode):
@@ -76,8 +77,10 b' class S3Shard(BaseShard):'
76 def store(self, key, value_reader, metadata: dict | None = None):
77 def store(self, key, value_reader, metadata: dict | None = None):
77 return self._store(key, value_reader, metadata, mode='wb')
78 return self._store(key, value_reader, metadata, mode='wb')
78
79
79 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
80 def fetch(self, key, retry=NOT_GIVEN,
80 return self._fetch(key, retry, retry_attempts, retry_backoff)
81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
81
84
82 def remove(self, key):
85 def remove(self, key):
83 return self._remove(key)
86 return self._remove(key)
@@ -104,7 +107,7 b' class S3Shard(BaseShard):'
104
107
105
108
106 class ObjectStoreCache(BaseCache):
109 class ObjectStoreCache(BaseCache):
107 shard_name = 'shard-%03d'
110 shard_name: str = 'shard-{:03d}'
108 shard_cls = S3Shard
111 shard_cls = S3Shard
109
112
110 def __init__(self, locking_url, **settings):
113 def __init__(self, locking_url, **settings):
@@ -152,7 +155,7 b' class ObjectStoreCache(BaseCache):'
152 self.shard_cls(
155 self.shard_cls(
153 index=num,
156 index=num,
154 bucket=self._bucket,
157 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
158 bucket_folder=self.shard_name.format(num),
156 fs=fs,
159 fs=fs,
157 **settings,
160 **settings,
158 )
161 )
@@ -162,3 +165,6 b' class ObjectStoreCache(BaseCache):'
162
165
163 def _get_size(self, shard, archive_path):
166 def _get_size(self, shard, archive_path):
164 return shard.fs.info(archive_path)['size']
167 return shard.fs.info(archive_path)['size']
168
169 def set_presigned_url_expiry(self, val: int) -> None:
170 self.presigned_url_expires = val
General Comments 0
You need to be logged in to leave comments. Login now