Show More
@@ -166,6 +166,7 b' class BaseCache:' | |||
|
166 | 166 | retry_attempts = 0 |
|
167 | 167 | retry_backoff = 1 |
|
168 | 168 | _shards = tuple() |
|
169 | shard_cls = BaseShard | |
|
169 | 170 | |
|
170 | 171 | def __contains__(self, key): |
|
171 | 172 | """Return `True` if `key` matching item is found in cache. |
@@ -199,8 +200,10 b' class BaseCache:' | |||
|
199 | 200 | del self._config[key] |
|
200 | 201 | return val |
|
201 | 202 | |
|
202 | def _get_shard(self, key): | |
|
203 | raise NotImplementedError | |
|
203 | def _get_shard(self, key) -> shard_cls: | |
|
204 | index = self._hash(key) % self._shard_count | |
|
205 | shard = self._shards[index] | |
|
206 | return shard | |
|
204 | 207 | |
|
205 | 208 | def _get_size(self, shard, archive_path): |
|
206 | 209 | raise NotImplementedError |
@@ -32,16 +32,18 b' log = logging.getLogger(__name__)' | |||
|
32 | 32 | |
|
33 | 33 | class FileSystemShard(BaseShard): |
|
34 | 34 | |
|
35 | def __init__(self, index, directory, **settings): | |
|
35 | def __init__(self, index, directory, directory_folder, fs, **settings): | |
|
36 | 36 | self._index = index |
|
37 | 37 | self._directory = directory |
|
38 | self._directory_folder = directory_folder | |
|
38 | 39 | self.storage_type = 'directory' |
|
39 | self.fs = fsspec.filesystem('file') | |
|
40 | ||
|
41 | self.fs = fs | |
|
40 | 42 | |
|
41 | 43 | @property |
|
42 | 44 | def directory(self): |
|
43 | """Cache directory.""" | |
|
44 | return self._directory | |
|
45 | """Cache directory final path.""" | |
|
46 | return os.path.join(self._directory, self._directory_folder) | |
|
45 | 47 | |
|
46 | 48 | def _get_keyfile(self, archive_key) -> tuple[str, str]: |
|
47 | 49 | key_file = f'{archive_key}.{self.key_suffix}' |
@@ -116,6 +118,7 b' class FileSystemShard(BaseShard):' | |||
|
116 | 118 | |
|
117 | 119 | class FileSystemFanoutCache(BaseCache): |
|
118 | 120 | shard_name = 'shard_%03d' |
|
121 | shard_cls = FileSystemShard | |
|
119 | 122 | |
|
120 | 123 | def __init__(self, locking_url, **settings): |
|
121 | 124 | """ |
@@ -132,13 +135,11 b' class FileSystemFanoutCache(BaseCache):' | |||
|
132 | 135 | directory = os.path.expanduser(directory) |
|
133 | 136 | directory = os.path.expandvars(directory) |
|
134 | 137 | self._directory = directory |
|
135 | self._storage_path = directory | |
|
138 | self._storage_path = directory # common path for all from BaseCache | |
|
136 | 139 | |
|
137 | # check if it's ok to write, and re-create the archive cache | |
|
138 | if not os.path.isdir(self._directory): | |
|
139 | os.makedirs(self._directory, exist_ok=True) | |
|
140 | ||
|
141 | self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True)) | |
|
140 | self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True)) | |
|
141 | if self._shard_count < 1: | |
|
142 | raise ValueError('cache_shards must be 1 or more') | |
|
142 | 143 | |
|
143 | 144 | self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True) |
|
144 | 145 | self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb'))) |
@@ -147,21 +148,27 b' class FileSystemFanoutCache(BaseCache):' | |||
|
147 | 148 | self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True)) |
|
148 | 149 | self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True)) |
|
149 | 150 | |
|
150 |
log.debug('Initializing archival cache instance under %s', self |
|
|
151 | log.debug('Initializing %s archival cache instance under %s', self) | |
|
152 | fs = fsspec.filesystem('file') | |
|
153 | # check if it's ok to write, and re-create the archive cache main dir | |
|
154 | # A directory is the virtual equivalent of a physical file cabinet. | |
|
155 | # In other words, it's a container for organizing digital data. | |
|
156 | # Unlike a folder, which can only store files, a directory can store files, | |
|
157 | # subdirectories, and other directories. | |
|
158 | if not fs.exists(self._directory): | |
|
159 | fs.makedirs(self._directory, exist_ok=True) | |
|
160 | ||
|
151 | 161 | self._shards = tuple( |
|
152 |
|
|
|
162 | self.shard_cls( | |
|
153 | 163 | index=num, |
|
154 |
directory= |
|
|
164 | directory=directory, | |
|
165 | directory_folder=self.shard_name % num, | |
|
166 | fs=fs, | |
|
155 | 167 | **settings, |
|
156 | 168 | ) |
|
157 | for num in range(self._count) | |
|
169 | for num in range(self._shard_count) | |
|
158 | 170 | ) |
|
159 | 171 | self._hash = self._shards[0].hash |
|
160 | 172 | |
|
161 | def _get_shard(self, key) -> FileSystemShard: | |
|
162 | index = self._hash(key) % self._count | |
|
163 | shard = self._shards[index] | |
|
164 | return shard | |
|
165 | ||
|
166 | 173 | def _get_size(self, shard, archive_path): |
|
167 | 174 | return os.stat(archive_path).st_size |
@@ -32,24 +32,18 b' log = logging.getLogger(__name__)' | |||
|
32 | 32 | |
|
33 | 33 | class S3Shard(BaseShard): |
|
34 | 34 | |
|
35 | def __init__(self, index, bucket, **settings): | |
|
35 | def __init__(self, index, bucket, bucket_folder, fs, **settings): | |
|
36 | 36 | self._index = index |
|
37 | self._bucket = bucket | |
|
37 | self._bucket_folder = bucket_folder | |
|
38 | 38 | self.storage_type = 'bucket' |
|
39 | self._bucket_main = bucket | |
|
39 | 40 | |
|
40 | endpoint_url = settings.pop('archive_cache.objectstore.url') | |
|
41 | key = settings.pop('archive_cache.objectstore.key') | |
|
42 | secret = settings.pop('archive_cache.objectstore.secret') | |
|
43 | ||
|
44 | # TODO: Add it all over the place... | |
|
45 | self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root') | |
|
46 | ||
|
47 | self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) | |
|
41 | self.fs = fs | |
|
48 | 42 | |
|
49 | 43 | @property |
|
50 | 44 | def bucket(self): |
|
51 | """Cache bucket.""" | |
|
52 |
return os.path.join(self._bucket_ |
|
|
45 | """Cache bucket final path.""" | |
|
46 | return os.path.join(self._bucket_main, self._bucket_folder) | |
|
53 | 47 | |
|
54 | 48 | def _get_keyfile(self, archive_key) -> tuple[str, str]: |
|
55 | 49 | key_file = f'{archive_key}-{self.key_suffix}' |
@@ -59,11 +53,8 b' class S3Shard(BaseShard):' | |||
|
59 | 53 | return self.fs.open(path, 'wb') |
|
60 | 54 | |
|
61 | 55 | def _write_file(self, full_path, iterator, mode): |
|
62 | if self._bucket_root: | |
|
63 | if not self.fs.exists(self._bucket_root): | |
|
64 | self.fs.mkdir(self._bucket_root) | |
|
65 | 56 | |
|
66 | # ensure bucket exists | |
|
57 | # ensure folder in bucket exists | |
|
67 | 58 | destination = self.bucket |
|
68 | 59 | if not self.fs.exists(destination): |
|
69 | 60 | self.fs.mkdir(destination, s3_additional_kwargs={}) |
@@ -113,7 +104,8 b' class S3Shard(BaseShard):' | |||
|
113 | 104 | |
|
114 | 105 | |
|
115 | 106 | class ObjectStoreCache(BaseCache): |
|
116 |
shard_name = 'shard- |
|
|
107 | shard_name = 'shard-%03d' | |
|
108 | shard_cls = S3Shard | |
|
117 | 109 | |
|
118 | 110 | def __init__(self, locking_url, **settings): |
|
119 | 111 | """ |
@@ -127,9 +119,15 b' class ObjectStoreCache(BaseCache):' | |||
|
127 | 119 | self._config = settings |
|
128 | 120 | |
|
129 | 121 | objectstore_url = self.get_conf('archive_cache.objectstore.url') |
|
130 | self._storage_path = objectstore_url | |
|
122 | self._storage_path = objectstore_url # common path for all from BaseCache | |
|
131 | 123 | |
|
132 | self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) | |
|
124 | self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True)) | |
|
125 | if self._shard_count < 1: | |
|
126 | raise ValueError('cache_shards must be 1 or more') | |
|
127 | ||
|
128 | self._bucket = settings.pop('archive_cache.objectstore.bucket') | |
|
129 | if not self._bucket: | |
|
130 | raise ValueError('archive_cache.objectstore.bucket needs to have a value') | |
|
133 | 131 | |
|
134 | 132 | self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True) |
|
135 | 133 | self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb'))) |
@@ -138,21 +136,29 b' class ObjectStoreCache(BaseCache):' | |||
|
138 | 136 | self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True)) |
|
139 | 137 | self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True)) |
|
140 | 138 | |
|
141 | log.debug('Initializing archival cache instance under %s', objectstore_url) | |
|
139 | endpoint_url = settings.pop('archive_cache.objectstore.url') | |
|
140 | key = settings.pop('archive_cache.objectstore.key') | |
|
141 | secret = settings.pop('archive_cache.objectstore.secret') | |
|
142 | ||
|
143 | log.debug('Initializing %s archival cache instance under %s', self) | |
|
144 | ||
|
145 | fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) | |
|
146 | ||
|
147 | # init main bucket | |
|
148 | if not fs.exists(self._bucket): | |
|
149 | fs.mkdir(self._bucket) | |
|
150 | ||
|
142 | 151 | self._shards = tuple( |
|
143 |
|
|
|
152 | self.shard_cls( | |
|
144 | 153 | index=num, |
|
145 |
bucket=self. |
|
|
154 | bucket=self._bucket, | |
|
155 | bucket_folder=self.shard_name % num, | |
|
156 | fs=fs, | |
|
146 | 157 | **settings, |
|
147 | 158 | ) |
|
148 | for num in range(self._count) | |
|
159 | for num in range(self._shard_count) | |
|
149 | 160 | ) |
|
150 | 161 | self._hash = self._shards[0].hash |
|
151 | 162 | |
|
152 | def _get_shard(self, key) -> S3Shard: | |
|
153 | index = self._hash(key) % self._count | |
|
154 | shard = self._shards[index] | |
|
155 | return shard | |
|
156 | ||
|
157 | 163 | def _get_size(self, shard, archive_path): |
|
158 | 164 | return shard.fs.info(archive_path)['size'] |
General Comments 0
You need to be logged in to leave comments.
Login now