##// END OF EJS Templates
archive-cache: synced with ce
super-admin -
r1247:35680f51 default
parent child Browse files
Show More
@@ -1,448 +1,455 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import codecs
18 import codecs
19 import contextlib
19 import contextlib
20 import functools
20 import functools
21 import os
21 import os
22 import logging
22 import logging
23 import time
23 import time
24 import typing
24 import typing
25 import zlib
25 import zlib
26 import sqlite3
26 import sqlite3
27
27
28 from ...ext_json import json
28 from ...ext_json import json
29 from .lock import GenerationLock
29 from .lock import GenerationLock
30 from .utils import format_size
30 from .utils import format_size
31
31
32 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
33
33
34 cache_meta = None
34 cache_meta = None
35
35
36 UNKNOWN = -241
36 UNKNOWN = -241
37 NO_VAL = -917
37 NO_VAL = -917
38
38
39 MODE_BINARY = 'BINARY'
39 MODE_BINARY = 'BINARY'
40
40
41
41
42 EVICTION_POLICY = {
42 EVICTION_POLICY = {
43 'none': {
43 'none': {
44 'evict': None,
44 'evict': None,
45 },
45 },
46 'least-recently-stored': {
46 'least-recently-stored': {
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 },
48 },
49 'least-recently-used': {
49 'least-recently-used': {
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 },
51 },
52 'least-frequently-used': {
52 'least-frequently-used': {
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 },
54 },
55 }
55 }
56
56
57
57
58 class DB:
58 class DB:
59
59
60 def __init__(self):
60 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
61 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
62 self._init_db()
63
63
64 def _init_db(self):
64 def _init_db(self):
65 qry = '''
65 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
67 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
68 key_file TEXT,
69 key_file_path TEXT,
69 key_file_path TEXT,
70 filename TEXT,
70 filename TEXT,
71 full_path TEXT,
71 full_path TEXT,
72 store_time REAL,
72 store_time REAL,
73 access_time REAL,
73 access_time REAL,
74 access_count INTEGER DEFAULT 0,
74 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
75 size INTEGER DEFAULT 0
76 )
76 )
77 '''
77 '''
78
78
79 self.sql(qry)
79 self.sql(qry)
80 self.connection.commit()
80 self.connection.commit()
81
81
82 @property
82 @property
83 def sql(self):
83 def sql(self):
84 return self.connection.execute
84 return self.connection.execute
85
85
86 def bulk_insert(self, rows):
86 def bulk_insert(self, rows):
87 qry = '''
87 qry = '''
88 INSERT INTO archive_cache (
88 INSERT INTO archive_cache (
89 rowid,
89 rowid,
90 key_file,
90 key_file,
91 key_file_path,
91 key_file_path,
92 filename,
92 filename,
93 full_path,
93 full_path,
94 store_time,
94 store_time,
95 access_time,
95 access_time,
96 access_count,
96 access_count,
97 size
97 size
98 )
98 )
99 VALUES (
99 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
101 )
102 '''
102 '''
103 cursor = self.connection.cursor()
103 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
104 cursor.executemany(qry, rows)
105 self.connection.commit()
105 self.connection.commit()
106
106
107
107
108 class FileSystemCache:
108 class FileSystemCache:
109
109
110 def __init__(self, index, directory, **settings):
110 def __init__(self, index, directory, **settings):
111 self._index = index
111 self._index = index
112 self._directory = directory
112 self._directory = directory
113
113
114 @property
114 @property
115 def directory(self):
115 def directory(self):
116 """Cache directory."""
116 """Cache directory."""
117 return self._directory
117 return self._directory
118
118
119 def _write_file(self, full_path, iterator, mode, encoding=None):
119 def _write_file(self, full_path, iterator, mode, encoding=None):
120 full_dir, _ = os.path.split(full_path)
120 full_dir, _ = os.path.split(full_path)
121
121
122 for count in range(1, 11):
122 for count in range(1, 11):
123 with contextlib.suppress(OSError):
123 with contextlib.suppress(OSError):
124 os.makedirs(full_dir)
124 os.makedirs(full_dir)
125
125
126 try:
126 try:
127 # Another cache may have deleted the directory before
127 # Another cache may have deleted the directory before
128 # the file could be opened.
128 # the file could be opened.
129 writer = open(full_path, mode, encoding=encoding)
129 writer = open(full_path, mode, encoding=encoding)
130 except OSError:
130 except OSError:
131 if count == 10:
131 if count == 10:
132 # Give up after 10 tries to open the file.
132 # Give up after 10 tries to open the file.
133 raise
133 raise
134 continue
134 continue
135
135
136 with writer:
136 with writer:
137 size = 0
137 size = 0
138 for chunk in iterator:
138 for chunk in iterator:
139 size += len(chunk)
139 size += len(chunk)
140 writer.write(chunk)
140 writer.write(chunk)
141 writer.flush()
142 # Get the file descriptor
143 fd = writer.fileno()
144
145 # Sync the file descriptor to disk, helps with NFS cases...
146 os.fsync(fd)
147 log.debug('written new archive cache under %s', full_path)
141 return size
148 return size
142
149
143 def _get_keyfile(self, key):
150 def _get_keyfile(self, key):
144 return os.path.join(self._directory, f'{key}.key')
151 return os.path.join(self._directory, f'{key}.key')
145
152
146 def store(self, key, value_reader, metadata):
153 def store(self, key, value_reader, metadata):
147 filename, full_path = self.random_filename()
154 filename, full_path = self.random_filename()
148 key_file = self._get_keyfile(key)
155 key_file = self._get_keyfile(key)
149
156
150 # STORE METADATA
157 # STORE METADATA
151 _metadata = {
158 _metadata = {
152 "version": "v1",
159 "version": "v1",
153 "filename": filename,
160 "filename": filename,
154 "full_path": full_path,
161 "full_path": full_path,
155 "key_file": key_file,
162 "key_file": key_file,
156 "store_time": time.time(),
163 "store_time": time.time(),
157 "access_count": 1,
164 "access_count": 1,
158 "access_time": 0,
165 "access_time": 0,
159 "size": 0
166 "size": 0
160 }
167 }
161 if metadata:
168 if metadata:
162 _metadata.update(metadata)
169 _metadata.update(metadata)
163
170
164 reader = functools.partial(value_reader.read, 2**22)
171 reader = functools.partial(value_reader.read, 2**22)
165
172
166 iterator = iter(reader, b'')
173 iterator = iter(reader, b'')
167 size = self._write_file(full_path, iterator, 'xb')
174 size = self._write_file(full_path, iterator, 'xb')
168 metadata['size'] = size
175 metadata['size'] = size
169
176
170 # after archive is finished, we create a key to save the presence of the binary file
177 # after archive is finished, we create a key to save the presence of the binary file
171 with open(key_file, 'wb') as f:
178 with open(key_file, 'wb') as f:
172 f.write(json.dumps(_metadata))
179 f.write(json.dumps(_metadata))
173
180
174 return key, size, MODE_BINARY, filename, _metadata
181 return key, size, MODE_BINARY, filename, _metadata
175
182
176 def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
183 def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
177
184
178 if retry:
185 if retry:
179 for attempt in range(retry_attempts):
186 for attempt in range(retry_attempts):
180 if key in self:
187 if key in self:
181 break
188 break
182 # we dind't find the key, wait 1s, and re-check
189 # we dind't find the key, wait 1s, and re-check
183 time.sleep(1)
190 time.sleep(1)
184
191
185 if key not in self:
192 if key not in self:
186 log.exception('requested {key} not found in {self}', key, self)
193 log.exception('requested {key} not found in {self}', key, self)
187 raise KeyError(key)
194 raise KeyError(key)
188
195
189 key_file = self._get_keyfile(key)
196 key_file = self._get_keyfile(key)
190 with open(key_file, 'rb') as f:
197 with open(key_file, 'rb') as f:
191 metadata = json.loads(f.read())
198 metadata = json.loads(f.read())
192
199
193 filename = metadata['filename']
200 filename = metadata['filename']
194
201
195 try:
202 try:
196 return open(os.path.join(self.directory, filename), 'rb'), metadata
203 return open(os.path.join(self.directory, filename), 'rb'), metadata
197 finally:
204 finally:
198 # update usage stats, count and accessed
205 # update usage stats, count and accessed
199 metadata["access_count"] = metadata.get("access_count", 0) + 1
206 metadata["access_count"] = metadata.get("access_count", 0) + 1
200 metadata["access_time"] = time.time()
207 metadata["access_time"] = time.time()
201
208
202 with open(key_file, 'wb') as f:
209 with open(key_file, 'wb') as f:
203 f.write(json.dumps(metadata))
210 f.write(json.dumps(metadata))
204
211
205 def random_filename(self):
212 def random_filename(self):
206 """Return filename and full-path tuple for file storage.
213 """Return filename and full-path tuple for file storage.
207
214
208 Filename will be a randomly generated 28 character hexadecimal string
215 Filename will be a randomly generated 28 character hexadecimal string
209 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
216 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
210 reduce the size of directories. On older filesystems, lookups in
217 reduce the size of directories. On older filesystems, lookups in
211 directories with many files may be slow.
218 directories with many files may be slow.
212 """
219 """
213
220
214 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
221 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
215 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
222 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
216 name = hex_name[4:] + '.archive_cache'
223 name = hex_name[4:] + '.archive_cache'
217 filename = os.path.join(sub_dir, name)
224 filename = os.path.join(sub_dir, name)
218 full_path = os.path.join(self.directory, filename)
225 full_path = os.path.join(self.directory, filename)
219 return filename, full_path
226 return filename, full_path
220
227
221 def hash(self, key):
228 def hash(self, key):
222 """Compute portable hash for `key`.
229 """Compute portable hash for `key`.
223
230
224 :param key: key to hash
231 :param key: key to hash
225 :return: hash value
232 :return: hash value
226
233
227 """
234 """
228 mask = 0xFFFFFFFF
235 mask = 0xFFFFFFFF
229 return zlib.adler32(key.encode('utf-8')) & mask # noqa
236 return zlib.adler32(key.encode('utf-8')) & mask # noqa
230
237
231 def __contains__(self, key):
238 def __contains__(self, key):
232 """Return `True` if `key` matching item is found in cache.
239 """Return `True` if `key` matching item is found in cache.
233
240
234 :param key: key matching item
241 :param key: key matching item
235 :return: True if key matching item
242 :return: True if key matching item
236
243
237 """
244 """
238 key_file = self._get_keyfile(key)
245 key_file = self._get_keyfile(key)
239 return os.path.exists(key_file)
246 return os.path.exists(key_file)
240
247
241 def __repr__(self):
248 def __repr__(self):
242 return f'FileSystemCache(index={self._index}, dir={self.directory})'
249 return f'FileSystemCache(index={self._index}, dir={self.directory})'
243
250
244
251
245 class FanoutCache:
252 class FanoutCache:
246 """Cache that shards keys and values."""
253 """Cache that shards keys and values."""
247
254
248 def __init__(
255 def __init__(
249 self, directory=None, **settings
256 self, directory=None, **settings
250 ):
257 ):
251 """Initialize cache instance.
258 """Initialize cache instance.
252
259
253 :param str directory: cache directory
260 :param str directory: cache directory
254 :param settings: settings dict
261 :param settings: settings dict
255
262
256 """
263 """
257 if directory is None:
264 if directory is None:
258 raise ValueError('directory cannot be None')
265 raise ValueError('directory cannot be None')
259
266
260 directory = str(directory)
267 directory = str(directory)
261 directory = os.path.expanduser(directory)
268 directory = os.path.expanduser(directory)
262 directory = os.path.expandvars(directory)
269 directory = os.path.expandvars(directory)
263 self._directory = directory
270 self._directory = directory
264
271
265 self._count = settings.pop('cache_shards')
272 self._count = settings.pop('cache_shards')
266 self._locking_url = settings.pop('locking_url')
273 self._locking_url = settings.pop('locking_url')
267
274
268 self._eviction_policy = settings['cache_eviction_policy']
275 self._eviction_policy = settings['cache_eviction_policy']
269 self._cache_size_limit = settings['cache_size_limit']
276 self._cache_size_limit = settings['cache_size_limit']
270
277
271 self._shards = tuple(
278 self._shards = tuple(
272 FileSystemCache(
279 FileSystemCache(
273 index=num,
280 index=num,
274 directory=os.path.join(directory, 'shard_%03d' % num),
281 directory=os.path.join(directory, 'shard_%03d' % num),
275 **settings,
282 **settings,
276 )
283 )
277 for num in range(self._count)
284 for num in range(self._count)
278 )
285 )
279 self._hash = self._shards[0].hash
286 self._hash = self._shards[0].hash
280
287
281 @property
288 @property
282 def directory(self):
289 def directory(self):
283 """Cache directory."""
290 """Cache directory."""
284 return self._directory
291 return self._directory
285
292
286 def get_lock(self, lock_key):
293 def get_lock(self, lock_key):
287 return GenerationLock(lock_key, self._locking_url)
294 return GenerationLock(lock_key, self._locking_url)
288
295
289 def _get_shard(self, key) -> FileSystemCache:
296 def _get_shard(self, key) -> FileSystemCache:
290 index = self._hash(key) % self._count
297 index = self._hash(key) % self._count
291 shard = self._shards[index]
298 shard = self._shards[index]
292 return shard
299 return shard
293
300
294 def store(self, key, value_reader, metadata=None):
301 def store(self, key, value_reader, metadata=None):
295 shard = self._get_shard(key)
302 shard = self._get_shard(key)
296 return shard.store(key, value_reader, metadata)
303 return shard.store(key, value_reader, metadata)
297
304
298 def fetch(self, key, retry=False, retry_attempts=10):
305 def fetch(self, key, retry=False, retry_attempts=10):
299 """Return file handle corresponding to `key` from cache.
306 """Return file handle corresponding to `key` from cache.
300 """
307 """
301 shard = self._get_shard(key)
308 shard = self._get_shard(key)
302 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
309 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
303
310
304 def has_key(self, key):
311 def has_key(self, key):
305 """Return `True` if `key` matching item is found in cache.
312 """Return `True` if `key` matching item is found in cache.
306
313
307 :param key: key for item
314 :param key: key for item
308 :return: True if key is found
315 :return: True if key is found
309
316
310 """
317 """
311 shard = self._get_shard(key)
318 shard = self._get_shard(key)
312 return key in shard
319 return key in shard
313
320
314 def __contains__(self, item):
321 def __contains__(self, item):
315 return self.has_key(item)
322 return self.has_key(item)
316
323
317 def evict(self, policy=None, size_limit=None):
324 def evict(self, policy=None, size_limit=None):
318 """
325 """
319 Remove old items based on the conditions
326 Remove old items based on the conditions
320
327
321
328
322 explanation of this algo:
329 explanation of this algo:
323 iterate over each shard, then for each shard iterate over the .key files
330 iterate over each shard, then for each shard iterate over the .key files
324 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
331 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
325 access data, time creation, and access counts.
332 access data, time creation, and access counts.
326
333
327 Store that into a memory DB so we can run different sorting strategies easily.
334 Store that into a memory DB so we can run different sorting strategies easily.
328 Summing the size is a sum sql query.
335 Summing the size is a sum sql query.
329
336
330 Then we run a sorting strategy based on eviction policy.
337 Then we run a sorting strategy based on eviction policy.
331 We iterate over sorted keys, and remove each checking if we hit the overall limit.
338 We iterate over sorted keys, and remove each checking if we hit the overall limit.
332 """
339 """
333
340
334 policy = policy or self._eviction_policy
341 policy = policy or self._eviction_policy
335 size_limit = size_limit or self._cache_size_limit
342 size_limit = size_limit or self._cache_size_limit
336
343
337 select_policy = EVICTION_POLICY[policy]['evict']
344 select_policy = EVICTION_POLICY[policy]['evict']
338
345
339 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
346 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
340 policy, format_size(size_limit))
347 policy, format_size(size_limit))
341
348
342 if select_policy is None:
349 if select_policy is None:
343 return 0
350 return 0
344
351
345 db = DB()
352 db = DB()
346
353
347 data = []
354 data = []
348 cnt = 1
355 cnt = 1
349 for shard in self._shards:
356 for shard in self._shards:
350 for key_file in os.listdir(shard.directory):
357 for key_file in os.listdir(shard.directory):
351 if key_file.endswith('.key'):
358 if key_file.endswith('.key'):
352 key_file_path = os.path.join(shard.directory, key_file)
359 key_file_path = os.path.join(shard.directory, key_file)
353 with open(key_file_path, 'rb') as f:
360 with open(key_file_path, 'rb') as f:
354 metadata = json.loads(f.read())
361 metadata = json.loads(f.read())
355
362
356 size = metadata.get('size')
363 size = metadata.get('size')
357 filename = metadata.get('filename')
364 filename = metadata.get('filename')
358 full_path = metadata.get('full_path')
365 full_path = metadata.get('full_path')
359
366
360 if not size:
367 if not size:
361 # in case we don't have size re-calc it...
368 # in case we don't have size re-calc it...
362 size = os.stat(full_path).st_size
369 size = os.stat(full_path).st_size
363
370
364 data.append([
371 data.append([
365 cnt,
372 cnt,
366 key_file,
373 key_file,
367 key_file_path,
374 key_file_path,
368 filename,
375 filename,
369 full_path,
376 full_path,
370 metadata.get('store_time', 0),
377 metadata.get('store_time', 0),
371 metadata.get('access_time', 0),
378 metadata.get('access_time', 0),
372 metadata.get('access_count', 0),
379 metadata.get('access_count', 0),
373 size,
380 size,
374 ])
381 ])
375 cnt += 1
382 cnt += 1
376
383
377 # Insert bulk data using executemany
384 # Insert bulk data using executemany
378 db.bulk_insert(data)
385 db.bulk_insert(data)
379
386
380 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
387 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
381 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
388 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
382 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
389 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
383 sorted_keys = db.sql(select_policy_qry).fetchall()
390 sorted_keys = db.sql(select_policy_qry).fetchall()
384
391
385 removed_items = 0
392 removed_items = 0
386 removed_size = 0
393 removed_size = 0
387 for key, cached_file, size in sorted_keys:
394 for key, cached_file, size in sorted_keys:
388 # simulate removal impact BEFORE removal
395 # simulate removal impact BEFORE removal
389 total_size -= size
396 total_size -= size
390
397
391 if total_size <= size_limit:
398 if total_size <= size_limit:
392 # we obtained what we wanted...
399 # we obtained what we wanted...
393 break
400 break
394
401
395 os.remove(cached_file)
402 os.remove(cached_file)
396 os.remove(key)
403 os.remove(key)
397 removed_items += 1
404 removed_items += 1
398 removed_size += size
405 removed_size += size
399
406
400 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
407 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
401 return removed_items
408 return removed_items
402
409
403
410
404 def get_archival_config(config):
411 def get_archival_config(config):
405
412
406 final_config = {
413 final_config = {
407
414
408 }
415 }
409
416
410 for k, v in config.items():
417 for k, v in config.items():
411 if k.startswith('archive_cache'):
418 if k.startswith('archive_cache'):
412 final_config[k] = v
419 final_config[k] = v
413
420
414 return final_config
421 return final_config
415
422
416
423
417 def get_archival_cache_store(config):
424 def get_archival_cache_store(config):
418
425
419 global cache_meta
426 global cache_meta
420 if cache_meta is not None:
427 if cache_meta is not None:
421 return cache_meta
428 return cache_meta
422
429
423 config = get_archival_config(config)
430 config = get_archival_config(config)
424 backend = config['archive_cache.backend.type']
431 backend = config['archive_cache.backend.type']
425 if backend != 'filesystem':
432 if backend != 'filesystem':
426 raise ValueError('archive_cache.backend.type only supports "filesystem"')
433 raise ValueError('archive_cache.backend.type only supports "filesystem"')
427
434
428 archive_cache_locking_url = config['archive_cache.locking.url']
435 archive_cache_locking_url = config['archive_cache.locking.url']
429 archive_cache_dir = config['archive_cache.filesystem.store_dir']
436 archive_cache_dir = config['archive_cache.filesystem.store_dir']
430 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
437 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
431 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
438 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
432 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
439 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
433
440
434 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
441 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
435
442
436 # check if it's ok to write, and re-create the archive cache
443 # check if it's ok to write, and re-create the archive cache
437 if not os.path.isdir(archive_cache_dir):
444 if not os.path.isdir(archive_cache_dir):
438 os.makedirs(archive_cache_dir, exist_ok=True)
445 os.makedirs(archive_cache_dir, exist_ok=True)
439
446
440 d_cache = FanoutCache(
447 d_cache = FanoutCache(
441 archive_cache_dir,
448 archive_cache_dir,
442 locking_url=archive_cache_locking_url,
449 locking_url=archive_cache_locking_url,
443 cache_shards=archive_cache_shards,
450 cache_shards=archive_cache_shards,
444 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
451 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
445 cache_eviction_policy=archive_cache_eviction_policy
452 cache_eviction_policy=archive_cache_eviction_policy
446 )
453 )
447 cache_meta = d_cache
454 cache_meta = d_cache
448 return cache_meta
455 return cache_meta
General Comments 0
You need to be logged in to leave comments. Login now