##// END OF EJS Templates
archive-cache: synced with CE codebase
super-admin -
r1257:60bf3236 default
parent child Browse files
Show More
@@ -166,6 +166,7 b' class BaseCache:'
166 retry_attempts = 0
166 retry_attempts = 0
167 retry_backoff = 1
167 retry_backoff = 1
168 _shards = tuple()
168 _shards = tuple()
169 shard_cls = BaseShard
169
170
170 def __contains__(self, key):
171 def __contains__(self, key):
171 """Return `True` if `key` matching item is found in cache.
172 """Return `True` if `key` matching item is found in cache.
@@ -199,8 +200,10 b' class BaseCache:'
199 del self._config[key]
200 del self._config[key]
200 return val
201 return val
201
202
202 def _get_shard(self, key):
203 def _get_shard(self, key) -> shard_cls:
203 raise NotImplementedError
204 index = self._hash(key) % self._shard_count
205 shard = self._shards[index]
206 return shard
204
207
205 def _get_size(self, shard, archive_path):
208 def _get_size(self, shard, archive_path):
206 raise NotImplementedError
209 raise NotImplementedError
@@ -32,16 +32,18 b' log = logging.getLogger(__name__)'
32
32
33 class FileSystemShard(BaseShard):
33 class FileSystemShard(BaseShard):
34
34
35 def __init__(self, index, directory, **settings):
35 def __init__(self, index, directory, directory_folder, fs, **settings):
36 self._index = index
36 self._index = index
37 self._directory = directory
37 self._directory = directory
38 self._directory_folder = directory_folder
38 self.storage_type = 'directory'
39 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
40
41 self.fs = fs
40
42
41 @property
43 @property
42 def directory(self):
44 def directory(self):
43 """Cache directory."""
45 """Cache directory final path."""
44 return self._directory
46 return os.path.join(self._directory, self._directory_folder)
45
47
46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 key_file = f'{archive_key}.{self.key_suffix}'
49 key_file = f'{archive_key}.{self.key_suffix}'
@@ -116,6 +118,7 b' class FileSystemShard(BaseShard):'
116
118
117 class FileSystemFanoutCache(BaseCache):
119 class FileSystemFanoutCache(BaseCache):
118 shard_name = 'shard_%03d'
120 shard_name = 'shard_%03d'
121 shard_cls = FileSystemShard
119
122
120 def __init__(self, locking_url, **settings):
123 def __init__(self, locking_url, **settings):
121 """
124 """
@@ -132,13 +135,11 b' class FileSystemFanoutCache(BaseCache):'
132 directory = os.path.expanduser(directory)
135 directory = os.path.expanduser(directory)
133 directory = os.path.expandvars(directory)
136 directory = os.path.expandvars(directory)
134 self._directory = directory
137 self._directory = directory
135 self._storage_path = directory
138 self._storage_path = directory # common path for all from BaseCache
136
139
137 # check if it's ok to write, and re-create the archive cache
140 self._shard_count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
138 if not os.path.isdir(self._directory):
141 if self._shard_count < 1:
139 os.makedirs(self._directory, exist_ok=True)
142 raise ValueError('cache_shards must be 1 or more')
140
141 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
142
143
143 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
144 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
144 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
145 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
@@ -147,21 +148,27 b' class FileSystemFanoutCache(BaseCache):'
147 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
148 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
148 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
149 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
149
150
150 log.debug('Initializing archival cache instance under %s', self._directory)
151 log.debug('Initializing %s archival cache instance under %s', self)
152 fs = fsspec.filesystem('file')
153 # check if it's ok to write, and re-create the archive cache main dir
154 # A directory is the virtual equivalent of a physical file cabinet.
155 # In other words, it's a container for organizing digital data.
156 # Unlike a folder, which can only store files, a directory can store files,
157 # subdirectories, and other directories.
158 if not fs.exists(self._directory):
159 fs.makedirs(self._directory, exist_ok=True)
160
151 self._shards = tuple(
161 self._shards = tuple(
152 FileSystemShard(
162 self.shard_cls(
153 index=num,
163 index=num,
154 directory=os.path.join(directory, self.shard_name % num),
164 directory=directory,
165 directory_folder=self.shard_name % num,
166 fs=fs,
155 **settings,
167 **settings,
156 )
168 )
157 for num in range(self._count)
169 for num in range(self._shard_count)
158 )
170 )
159 self._hash = self._shards[0].hash
171 self._hash = self._shards[0].hash
160
172
161 def _get_shard(self, key) -> FileSystemShard:
162 index = self._hash(key) % self._count
163 shard = self._shards[index]
164 return shard
165
166 def _get_size(self, shard, archive_path):
173 def _get_size(self, shard, archive_path):
167 return os.stat(archive_path).st_size
174 return os.stat(archive_path).st_size
@@ -32,24 +32,18 b' log = logging.getLogger(__name__)'
32
32
33 class S3Shard(BaseShard):
33 class S3Shard(BaseShard):
34
34
35 def __init__(self, index, bucket, **settings):
35 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 self._index = index
36 self._index = index
37 self._bucket = bucket
37 self._bucket_folder = bucket_folder
38 self.storage_type = 'bucket'
38 self.storage_type = 'bucket'
39 self._bucket_main = bucket
39
40
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 self.fs = fs
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
44 # TODO: Add it all over the place...
45 self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root')
46
47 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
48
42
49 @property
43 @property
50 def bucket(self):
44 def bucket(self):
51 """Cache bucket."""
45 """Cache bucket final path."""
52 return os.path.join(self._bucket_root, self._bucket)
46 return os.path.join(self._bucket_main, self._bucket_folder)
53
47
54 def _get_keyfile(self, archive_key) -> tuple[str, str]:
48 def _get_keyfile(self, archive_key) -> tuple[str, str]:
55 key_file = f'{archive_key}-{self.key_suffix}'
49 key_file = f'{archive_key}-{self.key_suffix}'
@@ -59,11 +53,8 b' class S3Shard(BaseShard):'
59 return self.fs.open(path, 'wb')
53 return self.fs.open(path, 'wb')
60
54
61 def _write_file(self, full_path, iterator, mode):
55 def _write_file(self, full_path, iterator, mode):
62 if self._bucket_root:
63 if not self.fs.exists(self._bucket_root):
64 self.fs.mkdir(self._bucket_root)
65
56
66 # ensure bucket exists
57 # ensure folder in bucket exists
67 destination = self.bucket
58 destination = self.bucket
68 if not self.fs.exists(destination):
59 if not self.fs.exists(destination):
69 self.fs.mkdir(destination, s3_additional_kwargs={})
60 self.fs.mkdir(destination, s3_additional_kwargs={})
@@ -113,7 +104,8 b' class S3Shard(BaseShard):'
113
104
114
105
115 class ObjectStoreCache(BaseCache):
106 class ObjectStoreCache(BaseCache):
116 shard_name = 'shard-bucket-%03d'
107 shard_name = 'shard-%03d'
108 shard_cls = S3Shard
117
109
118 def __init__(self, locking_url, **settings):
110 def __init__(self, locking_url, **settings):
119 """
111 """
@@ -127,9 +119,15 b' class ObjectStoreCache(BaseCache):'
127 self._config = settings
119 self._config = settings
128
120
129 objectstore_url = self.get_conf('archive_cache.objectstore.url')
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
130 self._storage_path = objectstore_url
122 self._storage_path = objectstore_url # common path for all from BaseCache
131
123
132 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
124 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 if self._shard_count < 1:
126 raise ValueError('cache_shards must be 1 or more')
127
128 self._bucket = settings.pop('archive_cache.objectstore.bucket')
129 if not self._bucket:
130 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
133
131
134 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
132 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
135 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
133 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
@@ -138,21 +136,29 b' class ObjectStoreCache(BaseCache):'
138 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
136 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
139 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
137 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
140
138
141 log.debug('Initializing archival cache instance under %s', objectstore_url)
139 endpoint_url = settings.pop('archive_cache.objectstore.url')
140 key = settings.pop('archive_cache.objectstore.key')
141 secret = settings.pop('archive_cache.objectstore.secret')
142
143 log.debug('Initializing %s archival cache instance under %s', self)
144
145 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
146
147 # init main bucket
148 if not fs.exists(self._bucket):
149 fs.mkdir(self._bucket)
150
142 self._shards = tuple(
151 self._shards = tuple(
143 S3Shard(
152 self.shard_cls(
144 index=num,
153 index=num,
145 bucket=self.shard_name % num,
154 bucket=self._bucket,
155 bucket_folder=self.shard_name % num,
156 fs=fs,
146 **settings,
157 **settings,
147 )
158 )
148 for num in range(self._count)
159 for num in range(self._shard_count)
149 )
160 )
150 self._hash = self._shards[0].hash
161 self._hash = self._shards[0].hash
151
162
152 def _get_shard(self, key) -> S3Shard:
153 index = self._hash(key) % self._count
154 shard = self._shards[index]
155 return shard
156
157 def _get_size(self, shard, archive_path):
163 def _get_size(self, shard, archive_path):
158 return shard.fs.info(archive_path)['size']
164 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now