##// END OF EJS Templates
archive-cache: synced with CE codebase
super-admin -
r1257:60bf3236 default
parent child Browse files
Show More
@@ -166,6 +166,7 b' class BaseCache:'
166 166 retry_attempts = 0
167 167 retry_backoff = 1
168 168 _shards = tuple()
169 shard_cls = BaseShard
169 170
170 171 def __contains__(self, key):
171 172 """Return `True` if `key` matching item is found in cache.
@@ -199,8 +200,10 b' class BaseCache:'
199 200 del self._config[key]
200 201 return val
201 202
202 def _get_shard(self, key):
203 raise NotImplementedError
203 def _get_shard(self, key) -> shard_cls:
204 index = self._hash(key) % self._shard_count
205 shard = self._shards[index]
206 return shard
204 207
205 208 def _get_size(self, shard, archive_path):
206 209 raise NotImplementedError
@@ -32,16 +32,18 b' log = logging.getLogger(__name__)'
32 32
33 33 class FileSystemShard(BaseShard):
34 34
35 def __init__(self, index, directory, **settings):
35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 36 self._index = index
37 37 self._directory = directory
38 self._directory_folder = directory_folder
38 39 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
40
41 self.fs = fs
40 42
41 43 @property
42 44 def directory(self):
43 """Cache directory."""
44 return self._directory
45 """Cache directory final path."""
46 return os.path.join(self._directory, self._directory_folder)
45 47
46 48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 49 key_file = f'{archive_key}.{self.key_suffix}'
@@ -116,6 +118,7 b' class FileSystemShard(BaseShard):'
116 118
117 119 class FileSystemFanoutCache(BaseCache):
118 120 shard_name = 'shard_%03d'
121 shard_cls = FileSystemShard
119 122
120 123 def __init__(self, locking_url, **settings):
121 124 """
@@ -132,13 +135,11 b' class FileSystemFanoutCache(BaseCache):'
132 135 directory = os.path.expanduser(directory)
133 136 directory = os.path.expandvars(directory)
134 137 self._directory = directory
135 self._storage_path = directory
138 self._storage_path = directory # common path for all from BaseCache
136 139
137 # check if it's ok to write, and re-create the archive cache
138 if not os.path.isdir(self._directory):
139 os.makedirs(self._directory, exist_ok=True)
140
141 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 if self._shard_count < 1:
142 raise ValueError('cache_shards must be 1 or more')
142 143
143 144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
144 145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
@@ -147,21 +148,27 b' class FileSystemFanoutCache(BaseCache):'
147 148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
148 149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
149 150
150 log.debug('Initializing archival cache instance under %s', self._directory)
151 log.debug('Initializing %s archival cache instance under %s', self)
152 fs = fsspec.filesystem('file')
153 # check if it's ok to write, and re-create the archive cache main dir
154 # A directory is the virtual equivalent of a physical file cabinet.
155 # In other words, it's a container for organizing digital data.
156 # Unlike a folder, which can only store files, a directory can store files,
157 # subdirectories, and other directories.
158 if not fs.exists(self._directory):
159 fs.makedirs(self._directory, exist_ok=True)
160
151 161 self._shards = tuple(
152 FileSystemShard(
162 self.shard_cls(
153 163 index=num,
154 directory=os.path.join(directory, self.shard_name % num),
164 directory=directory,
165 directory_folder=self.shard_name % num,
166 fs=fs,
155 167 **settings,
156 168 )
157 for num in range(self._count)
169 for num in range(self._shard_count)
158 170 )
159 171 self._hash = self._shards[0].hash
160 172
161 def _get_shard(self, key) -> FileSystemShard:
162 index = self._hash(key) % self._count
163 shard = self._shards[index]
164 return shard
165
166 173 def _get_size(self, shard, archive_path):
167 174 return os.stat(archive_path).st_size
@@ -32,24 +32,18 b' log = logging.getLogger(__name__)'
32 32
33 33 class S3Shard(BaseShard):
34 34
35 def __init__(self, index, bucket, **settings):
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 36 self._index = index
37 self._bucket = bucket
37 self._bucket_folder = bucket_folder
38 38 self.storage_type = 'bucket'
39 self._bucket_main = bucket
39 40
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
44 # TODO: Add it all over the place...
45 self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root')
46
47 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
41 self.fs = fs
48 42
49 43 @property
50 44 def bucket(self):
51 """Cache bucket."""
52 return os.path.join(self._bucket_root, self._bucket)
45 """Cache bucket final path."""
46 return os.path.join(self._bucket_main, self._bucket_folder)
53 47
54 48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
55 49 key_file = f'{archive_key}-{self.key_suffix}'
@@ -59,11 +53,8 b' class S3Shard(BaseShard):'
59 53 return self.fs.open(path, 'wb')
60 54
61 55 def _write_file(self, full_path, iterator, mode):
62 if self._bucket_root:
63 if not self.fs.exists(self._bucket_root):
64 self.fs.mkdir(self._bucket_root)
65 56
66 # ensure bucket exists
57 # ensure folder in bucket exists
67 58 destination = self.bucket
68 59 if not self.fs.exists(destination):
69 60 self.fs.mkdir(destination, s3_additional_kwargs={})
@@ -113,7 +104,8 b' class S3Shard(BaseShard):'
113 104
114 105
115 106 class ObjectStoreCache(BaseCache):
116 shard_name = 'shard-bucket-%03d'
107 shard_name = 'shard-%03d'
108 shard_cls = S3Shard
117 109
118 110 def __init__(self, locking_url, **settings):
119 111 """
@@ -127,9 +119,15 b' class ObjectStoreCache(BaseCache):'
127 119 self._config = settings
128 120
129 121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
130 self._storage_path = objectstore_url
122 self._storage_path = objectstore_url # common path for all from BaseCache
131 123
132 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 if self._shard_count < 1:
126 raise ValueError('cache_shards must be 1 or more')
127
128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 if not self._bucket:
130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
133 131
134 132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
135 133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
@@ -138,21 +136,29 b' class ObjectStoreCache(BaseCache):'
138 136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
139 137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
140 138
141 log.debug('Initializing archival cache instance under %s', objectstore_url)
139 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 key = settings.pop('archive_cache.objectstore.key')
141 secret = settings.pop('archive_cache.objectstore.secret')
142
143 log.debug('Initializing %s archival cache instance under %s', self)
144
145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146
147 # init main bucket
148 if not fs.exists(self._bucket):
149 fs.mkdir(self._bucket)
150
142 151 self._shards = tuple(
143 S3Shard(
152 self.shard_cls(
144 153 index=num,
145 bucket=self.shard_name % num,
154 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
156 fs=fs,
146 157 **settings,
147 158 )
148 for num in range(self._count)
159 for num in range(self._shard_count)
149 160 )
150 161 self._hash = self._shards[0].hash
151 162
152 def _get_shard(self, key) -> S3Shard:
153 index = self._hash(key) % self._count
154 shard = self._shards[index]
155 return shard
156
157 163 def _get_size(self, shard, archive_path):
158 164 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now