Show More
@@ -115,6 +115,7 b' class FileSystemShard(BaseShard):' | |||||
115 |
|
115 | |||
116 |
|
116 | |||
117 | class FileSystemFanoutCache(BaseCache): |
|
117 | class FileSystemFanoutCache(BaseCache): | |
|
118 | shard_name = 'shard_%03d' | |||
118 |
|
119 | |||
119 | def __init__(self, locking_url, **settings): |
|
120 | def __init__(self, locking_url, **settings): | |
120 | """ |
|
121 | """ | |
@@ -150,7 +151,7 b' class FileSystemFanoutCache(BaseCache):' | |||||
150 | self._shards = tuple( |
|
151 | self._shards = tuple( | |
151 | FileSystemShard( |
|
152 | FileSystemShard( | |
152 | index=num, |
|
153 | index=num, | |
153 |
directory=os.path.join(directory, |
|
154 | directory=os.path.join(directory, self.shard_name % num), | |
154 | **settings, |
|
155 | **settings, | |
155 | ) |
|
156 | ) | |
156 | for num in range(self._count) |
|
157 | for num in range(self._count) |
@@ -41,12 +41,15 b' class S3Shard(BaseShard):' | |||||
41 | key = settings.pop('archive_cache.objectstore.key') |
|
41 | key = settings.pop('archive_cache.objectstore.key') | |
42 | secret = settings.pop('archive_cache.objectstore.secret') |
|
42 | secret = settings.pop('archive_cache.objectstore.secret') | |
43 |
|
43 | |||
|
44 | # TODO: Add it all over the place... | |||
|
45 | self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root') | |||
|
46 | ||||
44 | self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) |
|
47 | self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret) | |
45 |
|
48 | |||
46 | @property |
|
49 | @property | |
47 | def bucket(self): |
|
50 | def bucket(self): | |
48 | """Cache bucket.""" |
|
51 | """Cache bucket.""" | |
49 | return self._bucket |
|
52 | return os.path.join(self._bucket_root, self._bucket) | |
50 |
|
53 | |||
51 | def _get_keyfile(self, archive_key) -> tuple[str, str]: |
|
54 | def _get_keyfile(self, archive_key) -> tuple[str, str]: | |
52 | key_file = f'{archive_key}-{self.key_suffix}' |
|
55 | key_file = f'{archive_key}-{self.key_suffix}' | |
@@ -56,6 +59,10 b' class S3Shard(BaseShard):' | |||||
56 | return self.fs.open(path, 'wb') |
|
59 | return self.fs.open(path, 'wb') | |
57 |
|
60 | |||
58 | def _write_file(self, full_path, iterator, mode): |
|
61 | def _write_file(self, full_path, iterator, mode): | |
|
62 | if self._bucket_root: | |||
|
63 | if not self.fs.exists(self._bucket_root): | |||
|
64 | self.fs.mkdir(self._bucket_root) | |||
|
65 | ||||
59 | # ensure bucket exists |
|
66 | # ensure bucket exists | |
60 | destination = self.bucket |
|
67 | destination = self.bucket | |
61 | if not self.fs.exists(destination): |
|
68 | if not self.fs.exists(destination): | |
@@ -106,6 +113,7 b' class S3Shard(BaseShard):' | |||||
106 |
|
113 | |||
107 |
|
114 | |||
108 | class ObjectStoreCache(BaseCache): |
|
115 | class ObjectStoreCache(BaseCache): | |
|
116 | shard_name = 'shard-bucket-%03d' | |||
109 |
|
117 | |||
110 | def __init__(self, locking_url, **settings): |
|
118 | def __init__(self, locking_url, **settings): | |
111 | """ |
|
119 | """ | |
@@ -134,7 +142,7 b' class ObjectStoreCache(BaseCache):' | |||||
134 | self._shards = tuple( |
|
142 | self._shards = tuple( | |
135 | S3Shard( |
|
143 | S3Shard( | |
136 | index=num, |
|
144 | index=num, | |
137 |
bucket= |
|
145 | bucket=self.shard_name % num, | |
138 | **settings, |
|
146 | **settings, | |
139 | ) |
|
147 | ) | |
140 | for num in range(self._count) |
|
148 | for num in range(self._count) |
General Comments 0
You need to be logged in to leave comments.
Login now