##// END OF EJS Templates
archive-cache: synced with ce changes
super-admin -
r1245:3da621d7 default
parent child Browse files
Show More
@@ -1,31 +1,31 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 from .fanout_cache import get_archival_cache_store
18 from .fanout_cache import get_archival_cache_store
19 from .fanout_cache import get_archival_config
19 from .fanout_cache import get_archival_config
20
20
21 from .utils import archive_iterator
21 from .utils import archive_iterator
22 from .utils import ArchiveCacheLock
22 from .utils import ArchiveCacheGenerationLock
23
23
24
24
25 def includeme(config):
25 def includeme(config):
26 # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode
26 # NOTE: for vcsserver, we lazy init this and config is sent from RhodeCode
27 return
27 return
28
28
29 # init our cache at start
29 # init our cache at start
30 settings = config.get_settings()
30 settings = config.get_settings()
31 get_archival_cache_store(settings)
31 get_archival_cache_store(settings)
@@ -1,426 +1,448 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import codecs
18 import codecs
19 import contextlib
19 import contextlib
20 import functools
20 import functools
21 import os
21 import os
22 import logging
22 import logging
23 import time
23 import time
24 import typing
24 import typing
25 import zlib
25 import zlib
26 import sqlite3
26 import sqlite3
27
27
28 from ...ext_json import json
28 from ...ext_json import json
29 from .lock import GenerationLock
29 from .lock import GenerationLock
30 from .utils import format_size
30 from .utils import format_size
31
31
32 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
33
33
34 cache_meta = None
34 cache_meta = None
35
35
36 UNKNOWN = -241
36 UNKNOWN = -241
37 NO_VAL = -917
37 NO_VAL = -917
38
38
39 MODE_BINARY = 'BINARY'
39 MODE_BINARY = 'BINARY'
40
40
41
41
42 EVICTION_POLICY = {
42 EVICTION_POLICY = {
43 'none': {
43 'none': {
44 'evict': None,
44 'evict': None,
45 },
45 },
46 'least-recently-stored': {
46 'least-recently-stored': {
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
47 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
48 },
48 },
49 'least-recently-used': {
49 'least-recently-used': {
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
50 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
51 },
51 },
52 'least-frequently-used': {
52 'least-frequently-used': {
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
53 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
54 },
54 },
55 }
55 }
56
56
57
57
58 class DB:
58 class DB:
59
59
60 def __init__(self):
60 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
61 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
62 self._init_db()
63
63
64 def _init_db(self):
64 def _init_db(self):
65 qry = '''
65 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
67 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
68 key_file TEXT,
69 key_file_path TEXT,
69 key_file_path TEXT,
70 filename TEXT,
70 filename TEXT,
71 full_path TEXT,
71 full_path TEXT,
72 store_time REAL,
72 store_time REAL,
73 access_time REAL,
73 access_time REAL,
74 access_count INTEGER DEFAULT 0,
74 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
75 size INTEGER DEFAULT 0
76 )
76 )
77 '''
77 '''
78
78
79 self.sql(qry)
79 self.sql(qry)
80 self.connection.commit()
80 self.connection.commit()
81
81
82 @property
82 @property
83 def sql(self):
83 def sql(self):
84 return self.connection.execute
84 return self.connection.execute
85
85
86 def bulk_insert(self, rows):
86 def bulk_insert(self, rows):
87 qry = '''
87 qry = '''
88 INSERT INTO archive_cache (
88 INSERT INTO archive_cache (
89 rowid,
89 rowid,
90 key_file,
90 key_file,
91 key_file_path,
91 key_file_path,
92 filename,
92 filename,
93 full_path,
93 full_path,
94 store_time,
94 store_time,
95 access_time,
95 access_time,
96 access_count,
96 access_count,
97 size
97 size
98 )
98 )
99 VALUES (
99 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
101 )
102 '''
102 '''
103 cursor = self.connection.cursor()
103 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
104 cursor.executemany(qry, rows)
105 self.connection.commit()
105 self.connection.commit()
106
106
107
107
108 class FileSystemCache:
108 class FileSystemCache:
109
109
110 def __init__(self, index, directory, **settings):
110 def __init__(self, index, directory, **settings):
111 self._index = index
111 self._index = index
112 self._directory = directory
112 self._directory = directory
113
113
114 @property
115 def directory(self):
116 """Cache directory."""
117 return self._directory
118
114 def _write_file(self, full_path, iterator, mode, encoding=None):
119 def _write_file(self, full_path, iterator, mode, encoding=None):
115 full_dir, _ = os.path.split(full_path)
120 full_dir, _ = os.path.split(full_path)
116
121
117 for count in range(1, 11):
122 for count in range(1, 11):
118 with contextlib.suppress(OSError):
123 with contextlib.suppress(OSError):
119 os.makedirs(full_dir)
124 os.makedirs(full_dir)
120
125
121 try:
126 try:
122 # Another cache may have deleted the directory before
127 # Another cache may have deleted the directory before
123 # the file could be opened.
128 # the file could be opened.
124 writer = open(full_path, mode, encoding=encoding)
129 writer = open(full_path, mode, encoding=encoding)
125 except OSError:
130 except OSError:
126 if count == 10:
131 if count == 10:
127 # Give up after 10 tries to open the file.
132 # Give up after 10 tries to open the file.
128 raise
133 raise
129 continue
134 continue
130
135
131 with writer:
136 with writer:
132 size = 0
137 size = 0
133 for chunk in iterator:
138 for chunk in iterator:
134 size += len(chunk)
139 size += len(chunk)
135 writer.write(chunk)
140 writer.write(chunk)
136 return size
141 return size
137
142
138 def _get_keyfile(self, key):
143 def _get_keyfile(self, key):
139 return os.path.join(self._directory, f'{key}.key')
144 return os.path.join(self._directory, f'{key}.key')
140
145
141 def store(self, key, value_reader, metadata):
146 def store(self, key, value_reader, metadata):
142 filename, full_path = self.random_filename()
147 filename, full_path = self.random_filename()
143 key_file = self._get_keyfile(key)
148 key_file = self._get_keyfile(key)
144
149
145 # STORE METADATA
150 # STORE METADATA
146 _metadata = {
151 _metadata = {
147 "version": "v1",
152 "version": "v1",
148 "filename": filename,
153 "filename": filename,
149 "full_path": full_path,
154 "full_path": full_path,
150 "key_file": key_file,
155 "key_file": key_file,
151 "store_time": time.time(),
156 "store_time": time.time(),
152 "access_count": 1,
157 "access_count": 1,
153 "access_time": 0,
158 "access_time": 0,
154 "size": 0
159 "size": 0
155 }
160 }
156 if metadata:
161 if metadata:
157 _metadata.update(metadata)
162 _metadata.update(metadata)
158
163
159 reader = functools.partial(value_reader.read, 2**22)
164 reader = functools.partial(value_reader.read, 2**22)
160
165
161 iterator = iter(reader, b'')
166 iterator = iter(reader, b'')
162 size = self._write_file(full_path, iterator, 'xb')
167 size = self._write_file(full_path, iterator, 'xb')
163 metadata['size'] = size
168 metadata['size'] = size
164
169
165 # after archive is finished, we create a key to save the presence of the binary file
170 # after archive is finished, we create a key to save the presence of the binary file
166 with open(key_file, 'wb') as f:
171 with open(key_file, 'wb') as f:
167 f.write(json.dumps(_metadata))
172 f.write(json.dumps(_metadata))
168
173
169 return key, size, MODE_BINARY, filename, _metadata
174 return key, size, MODE_BINARY, filename, _metadata
170
175
171 def fetch(self, key) -> tuple[typing.BinaryIO, dict]:
176 def fetch(self, key, retry=False, retry_attempts=10) -> tuple[typing.BinaryIO, dict]:
177
178 if retry:
179 for attempt in range(retry_attempts):
180 if key in self:
181 break
182 # we dind't find the key, wait 1s, and re-check
183 time.sleep(1)
184
172 if key not in self:
185 if key not in self:
186 log.exception('requested {key} not found in {self}', key, self)
173 raise KeyError(key)
187 raise KeyError(key)
174
188
175 key_file = self._get_keyfile(key)
189 key_file = self._get_keyfile(key)
176 with open(key_file, 'rb') as f:
190 with open(key_file, 'rb') as f:
177 metadata = json.loads(f.read())
191 metadata = json.loads(f.read())
178
192
179 filename = metadata['filename']
193 filename = metadata['filename']
180
194
181 try:
195 try:
182 return open(os.path.join(self._directory, filename), 'rb'), metadata
196 return open(os.path.join(self.directory, filename), 'rb'), metadata
183 finally:
197 finally:
184 # update usage stats, count and accessed
198 # update usage stats, count and accessed
185 metadata["access_count"] = metadata.get("access_count", 0) + 1
199 metadata["access_count"] = metadata.get("access_count", 0) + 1
186 metadata["access_time"] = time.time()
200 metadata["access_time"] = time.time()
187
201
188 with open(key_file, 'wb') as f:
202 with open(key_file, 'wb') as f:
189 f.write(json.dumps(metadata))
203 f.write(json.dumps(metadata))
190
204
191 def random_filename(self):
205 def random_filename(self):
192 """Return filename and full-path tuple for file storage.
206 """Return filename and full-path tuple for file storage.
193
207
194 Filename will be a randomly generated 28 character hexadecimal string
208 Filename will be a randomly generated 28 character hexadecimal string
195 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
209 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
196 reduce the size of directories. On older filesystems, lookups in
210 reduce the size of directories. On older filesystems, lookups in
197 directories with many files may be slow.
211 directories with many files may be slow.
198 """
212 """
199
213
200 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
214 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
201 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
215 sub_dir = os.path.join(hex_name[:2], hex_name[2:4])
202 name = hex_name[4:] + '.archive_cache'
216 name = hex_name[4:] + '.archive_cache'
203 filename = os.path.join(sub_dir, name)
217 filename = os.path.join(sub_dir, name)
204 full_path = os.path.join(self._directory, filename)
218 full_path = os.path.join(self.directory, filename)
205 return filename, full_path
219 return filename, full_path
206
220
207 def hash(self, key):
221 def hash(self, key):
208 """Compute portable hash for `key`.
222 """Compute portable hash for `key`.
209
223
210 :param key: key to hash
224 :param key: key to hash
211 :return: hash value
225 :return: hash value
212
226
213 """
227 """
214 mask = 0xFFFFFFFF
228 mask = 0xFFFFFFFF
215 return zlib.adler32(key.encode('utf-8')) & mask # noqa
229 return zlib.adler32(key.encode('utf-8')) & mask # noqa
216
230
217 def __contains__(self, key):
231 def __contains__(self, key):
218 """Return `True` if `key` matching item is found in cache.
232 """Return `True` if `key` matching item is found in cache.
219
233
220 :param key: key matching item
234 :param key: key matching item
221 :return: True if key matching item
235 :return: True if key matching item
222
236
223 """
237 """
224 key_file = self._get_keyfile(key)
238 key_file = self._get_keyfile(key)
225 return os.path.exists(key_file)
239 return os.path.exists(key_file)
226
240
241 def __repr__(self):
242 return f'FileSystemCache(index={self._index}, dir={self.directory})'
243
227
244
228 class FanoutCache:
245 class FanoutCache:
229 """Cache that shards keys and values."""
246 """Cache that shards keys and values."""
230
247
231 def __init__(
248 def __init__(
232 self, directory=None, **settings
249 self, directory=None, **settings
233 ):
250 ):
234 """Initialize cache instance.
251 """Initialize cache instance.
235
252
236 :param str directory: cache directory
253 :param str directory: cache directory
237 :param settings: settings dict
254 :param settings: settings dict
238
255
239 """
256 """
240 if directory is None:
257 if directory is None:
241 raise ValueError('directory cannot be None')
258 raise ValueError('directory cannot be None')
242
259
243 directory = str(directory)
260 directory = str(directory)
244 directory = os.path.expanduser(directory)
261 directory = os.path.expanduser(directory)
245 directory = os.path.expandvars(directory)
262 directory = os.path.expandvars(directory)
246 self._directory = directory
263 self._directory = directory
247
264
248 self._count = settings.pop('cache_shards')
265 self._count = settings.pop('cache_shards')
249 self._locking_url = settings.pop('locking_url')
266 self._locking_url = settings.pop('locking_url')
250
267
251 self._eviction_policy = settings['cache_eviction_policy']
268 self._eviction_policy = settings['cache_eviction_policy']
252 self._cache_size_limit = settings['cache_size_limit']
269 self._cache_size_limit = settings['cache_size_limit']
253
270
254 self._shards = tuple(
271 self._shards = tuple(
255 FileSystemCache(
272 FileSystemCache(
256 index=num,
273 index=num,
257 directory=os.path.join(directory, 'shard_%03d' % num),
274 directory=os.path.join(directory, 'shard_%03d' % num),
258 **settings,
275 **settings,
259 )
276 )
260 for num in range(self._count)
277 for num in range(self._count)
261 )
278 )
262 self._hash = self._shards[0].hash
279 self._hash = self._shards[0].hash
263
280
281 @property
282 def directory(self):
283 """Cache directory."""
284 return self._directory
285
264 def get_lock(self, lock_key):
286 def get_lock(self, lock_key):
265 return GenerationLock(lock_key, self._locking_url)
287 return GenerationLock(lock_key, self._locking_url)
266
288
267 def _get_shard(self, key) -> FileSystemCache:
289 def _get_shard(self, key) -> FileSystemCache:
268 index = self._hash(key) % self._count
290 index = self._hash(key) % self._count
269 shard = self._shards[index]
291 shard = self._shards[index]
270 return shard
292 return shard
271
293
272 def store(self, key, value_reader, metadata=None):
294 def store(self, key, value_reader, metadata=None):
273 shard = self._get_shard(key)
295 shard = self._get_shard(key)
274 return shard.store(key, value_reader, metadata)
296 return shard.store(key, value_reader, metadata)
275
297
276 def fetch(self, key):
298 def fetch(self, key, retry=False, retry_attempts=10):
277 """Return file handle corresponding to `key` from cache.
299 """Return file handle corresponding to `key` from cache.
278 """
300 """
279 shard = self._get_shard(key)
301 shard = self._get_shard(key)
280 return shard.fetch(key)
302 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts)
281
303
282 def has_key(self, key):
304 def has_key(self, key):
283 """Return `True` if `key` matching item is found in cache.
305 """Return `True` if `key` matching item is found in cache.
284
306
285 :param key: key for item
307 :param key: key for item
286 :return: True if key is found
308 :return: True if key is found
287
309
288 """
310 """
289 shard = self._get_shard(key)
311 shard = self._get_shard(key)
290 return key in shard
312 return key in shard
291
313
292 def __contains__(self, item):
314 def __contains__(self, item):
293 return self.has_key(item)
315 return self.has_key(item)
294
316
295 def evict(self, policy=None, size_limit=None):
317 def evict(self, policy=None, size_limit=None):
296 """
318 """
297 Remove old items based on the conditions
319 Remove old items based on the conditions
298
320
299
321
300 explanation of this algo:
322 explanation of this algo:
301 iterate over each shard, then for each shard iterate over the .key files
323 iterate over each shard, then for each shard iterate over the .key files
302 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
324 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
303 access data, time creation, and access counts.
325 access data, time creation, and access counts.
304
326
305 Store that into a memory DB so we can run different sorting strategies easily.
327 Store that into a memory DB so we can run different sorting strategies easily.
306 Summing the size is a sum sql query.
328 Summing the size is a sum sql query.
307
329
308 Then we run a sorting strategy based on eviction policy.
330 Then we run a sorting strategy based on eviction policy.
309 We iterate over sorted keys, and remove each checking if we hit the overall limit.
331 We iterate over sorted keys, and remove each checking if we hit the overall limit.
310 """
332 """
311
333
312 policy = policy or self._eviction_policy
334 policy = policy or self._eviction_policy
313 size_limit = size_limit or self._cache_size_limit
335 size_limit = size_limit or self._cache_size_limit
314
336
315 select_policy = EVICTION_POLICY[policy]['evict']
337 select_policy = EVICTION_POLICY[policy]['evict']
316
338
317 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
339 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
318 policy, format_size(size_limit))
340 policy, format_size(size_limit))
319
341
320 if select_policy is None:
342 if select_policy is None:
321 return 0
343 return 0
322
344
323 db = DB()
345 db = DB()
324
346
325 data = []
347 data = []
326 cnt = 1
348 cnt = 1
327 for shard in self._shards:
349 for shard in self._shards:
328 for key_file in os.listdir(shard._directory):
350 for key_file in os.listdir(shard.directory):
329 if key_file.endswith('.key'):
351 if key_file.endswith('.key'):
330 key_file_path = os.path.join(shard._directory, key_file)
352 key_file_path = os.path.join(shard.directory, key_file)
331 with open(key_file_path, 'rb') as f:
353 with open(key_file_path, 'rb') as f:
332 metadata = json.loads(f.read())
354 metadata = json.loads(f.read())
333
355
334 size = metadata.get('size')
356 size = metadata.get('size')
335 filename = metadata.get('filename')
357 filename = metadata.get('filename')
336 full_path = metadata.get('full_path')
358 full_path = metadata.get('full_path')
337
359
338 if not size:
360 if not size:
339 # in case we don't have size re-calc it...
361 # in case we don't have size re-calc it...
340 size = os.stat(full_path).st_size
362 size = os.stat(full_path).st_size
341
363
342 data.append([
364 data.append([
343 cnt,
365 cnt,
344 key_file,
366 key_file,
345 key_file_path,
367 key_file_path,
346 filename,
368 filename,
347 full_path,
369 full_path,
348 metadata.get('store_time', 0),
370 metadata.get('store_time', 0),
349 metadata.get('access_time', 0),
371 metadata.get('access_time', 0),
350 metadata.get('access_count', 0),
372 metadata.get('access_count', 0),
351 size,
373 size,
352 ])
374 ])
353 cnt += 1
375 cnt += 1
354
376
355 # Insert bulk data using executemany
377 # Insert bulk data using executemany
356 db.bulk_insert(data)
378 db.bulk_insert(data)
357
379
358 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
380 ((total_size,),) = db.sql('SELECT COALESCE(SUM(size), 0) FROM archive_cache').fetchall()
359 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
381 log.debug('Analyzed %s keys, occupied: %s', len(data), format_size(total_size))
360 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
382 select_policy_qry = select_policy.format(fields='key_file_path, full_path, size')
361 sorted_keys = db.sql(select_policy_qry).fetchall()
383 sorted_keys = db.sql(select_policy_qry).fetchall()
362
384
363 removed_items = 0
385 removed_items = 0
364 removed_size = 0
386 removed_size = 0
365 for key, cached_file, size in sorted_keys:
387 for key, cached_file, size in sorted_keys:
366 # simulate removal impact BEFORE removal
388 # simulate removal impact BEFORE removal
367 total_size -= size
389 total_size -= size
368
390
369 if total_size <= size_limit:
391 if total_size <= size_limit:
370 # we obtained what we wanted...
392 # we obtained what we wanted...
371 break
393 break
372
394
373 os.remove(cached_file)
395 os.remove(cached_file)
374 os.remove(key)
396 os.remove(key)
375 removed_items += 1
397 removed_items += 1
376 removed_size += size
398 removed_size += size
377
399
378 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
400 log.debug('Removed %s cache archives, and reduced size: %s', removed_items, format_size(removed_size))
379 return removed_items
401 return removed_items
380
402
381
403
382 def get_archival_config(config):
404 def get_archival_config(config):
383
405
384 final_config = {
406 final_config = {
385
407
386 }
408 }
387
409
388 for k, v in config.items():
410 for k, v in config.items():
389 if k.startswith('archive_cache'):
411 if k.startswith('archive_cache'):
390 final_config[k] = v
412 final_config[k] = v
391
413
392 return final_config
414 return final_config
393
415
394
416
395 def get_archival_cache_store(config):
417 def get_archival_cache_store(config):
396
418
397 global cache_meta
419 global cache_meta
398 if cache_meta is not None:
420 if cache_meta is not None:
399 return cache_meta
421 return cache_meta
400
422
401 config = get_archival_config(config)
423 config = get_archival_config(config)
402 backend = config['archive_cache.backend.type']
424 backend = config['archive_cache.backend.type']
403 if backend != 'filesystem':
425 if backend != 'filesystem':
404 raise ValueError('archive_cache.backend.type only supports "filesystem"')
426 raise ValueError('archive_cache.backend.type only supports "filesystem"')
405
427
406 archive_cache_locking_url = config['archive_cache.locking.url']
428 archive_cache_locking_url = config['archive_cache.locking.url']
407 archive_cache_dir = config['archive_cache.filesystem.store_dir']
429 archive_cache_dir = config['archive_cache.filesystem.store_dir']
408 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
430 archive_cache_size_gb = config['archive_cache.filesystem.cache_size_gb']
409 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
431 archive_cache_shards = config['archive_cache.filesystem.cache_shards']
410 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
432 archive_cache_eviction_policy = config['archive_cache.filesystem.eviction_policy']
411
433
412 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
434 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
413
435
414 # check if it's ok to write, and re-create the archive cache
436 # check if it's ok to write, and re-create the archive cache
415 if not os.path.isdir(archive_cache_dir):
437 if not os.path.isdir(archive_cache_dir):
416 os.makedirs(archive_cache_dir, exist_ok=True)
438 os.makedirs(archive_cache_dir, exist_ok=True)
417
439
418 d_cache = FanoutCache(
440 d_cache = FanoutCache(
419 archive_cache_dir,
441 archive_cache_dir,
420 locking_url=archive_cache_locking_url,
442 locking_url=archive_cache_locking_url,
421 cache_shards=archive_cache_shards,
443 cache_shards=archive_cache_shards,
422 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
444 cache_size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
423 cache_eviction_policy=archive_cache_eviction_policy
445 cache_eviction_policy=archive_cache_eviction_policy
424 )
446 )
425 cache_meta = d_cache
447 cache_meta = d_cache
426 return cache_meta
448 return cache_meta
@@ -1,58 +1,58 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import redis
18 import redis
19 from ..._vendor import redis_lock
19 from ..._vendor import redis_lock
20 from .utils import ArchiveCacheLock
20 from .utils import ArchiveCacheGenerationLock
21
21
22
22
23 class GenerationLock:
23 class GenerationLock:
24 """
24 """
25 Locking mechanism that detects if a lock is acquired
25 Locking mechanism that detects if a lock is acquired
26
26
27 with GenerationLock(lock_key):
27 with GenerationLock(lock_key):
28 compute_archive()
28 compute_archive()
29 """
29 """
30 lock_timeout = 7200
30 lock_timeout = 7200
31
31
32 def __init__(self, lock_key, url):
32 def __init__(self, lock_key, url):
33 self.lock_key = lock_key
33 self.lock_key = lock_key
34 self._create_client(url)
34 self._create_client(url)
35 self.lock = self.get_lock()
35 self.lock = self.get_lock()
36
36
37 def _create_client(self, url):
37 def _create_client(self, url):
38 connection_pool = redis.ConnectionPool.from_url(url)
38 connection_pool = redis.ConnectionPool.from_url(url)
39 self.writer_client = redis.StrictRedis(
39 self.writer_client = redis.StrictRedis(
40 connection_pool=connection_pool
40 connection_pool=connection_pool
41 )
41 )
42 self.reader_client = self.writer_client
42 self.reader_client = self.writer_client
43
43
44 def get_lock(self):
44 def get_lock(self):
45 return redis_lock.Lock(
45 return redis_lock.Lock(
46 redis_client=self.writer_client,
46 redis_client=self.writer_client,
47 name=self.lock_key,
47 name=self.lock_key,
48 expire=self.lock_timeout,
48 expire=self.lock_timeout,
49 strict=True
49 strict=True
50 )
50 )
51
51
52 def __enter__(self):
52 def __enter__(self):
53 acquired = self.lock.acquire(blocking=False)
53 acquired = self.lock.acquire(blocking=False)
54 if not acquired:
54 if not acquired:
55 raise ArchiveCacheLock('Failed to create a lock')
55 raise ArchiveCacheGenerationLock('Failed to create a lock')
56
56
57 def __exit__(self, exc_type, exc_val, exc_tb):
57 def __exit__(self, exc_type, exc_val, exc_tb):
58 self.lock.release()
58 self.lock.release()
@@ -1,71 +1,71 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2024 RhodeCode GmbH
2 # Copyright (C) 2014-2024 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import os
18 import os
19
19
20
20
21 class ArchiveCacheLock(Exception):
21 class ArchiveCacheGenerationLock(Exception):
22 pass
22 pass
23
23
24
24
25 def archive_iterator(_reader, block_size: int = 4096 * 512):
25 def archive_iterator(_reader, block_size: int = 4096 * 512):
26 # 4096 * 64 = 64KB
26 # 4096 * 64 = 64KB
27 while 1:
27 while 1:
28 data = _reader.read(block_size)
28 data = _reader.read(block_size)
29 if not data:
29 if not data:
30 break
30 break
31 yield data
31 yield data
32
32
33
33
34 def get_directory_statistics(start_path):
34 def get_directory_statistics(start_path):
35 """
35 """
36 total_files, total_size, directory_stats = get_directory_statistics(start_path)
36 total_files, total_size, directory_stats = get_directory_statistics(start_path)
37
37
38 print(f"Directory statistics for: {start_path}\n")
38 print(f"Directory statistics for: {start_path}\n")
39 print(f"Total files: {total_files}")
39 print(f"Total files: {total_files}")
40 print(f"Total size: {format_size(total_size)}\n")
40 print(f"Total size: {format_size(total_size)}\n")
41
41
42 :param start_path:
42 :param start_path:
43 :return:
43 :return:
44 """
44 """
45
45
46 total_files = 0
46 total_files = 0
47 total_size = 0
47 total_size = 0
48 directory_stats = {}
48 directory_stats = {}
49
49
50 for dir_path, dir_names, file_names in os.walk(start_path):
50 for dir_path, dir_names, file_names in os.walk(start_path):
51 dir_size = 0
51 dir_size = 0
52 file_count = len(file_names)
52 file_count = len(file_names)
53
53
54 for file in file_names:
54 for file in file_names:
55 filepath = os.path.join(dir_path, file)
55 filepath = os.path.join(dir_path, file)
56 file_size = os.path.getsize(filepath)
56 file_size = os.path.getsize(filepath)
57 dir_size += file_size
57 dir_size += file_size
58
58
59 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
59 directory_stats[dir_path] = {'file_count': file_count, 'size': dir_size}
60 total_files += file_count
60 total_files += file_count
61 total_size += dir_size
61 total_size += dir_size
62
62
63 return total_files, total_size, directory_stats
63 return total_files, total_size, directory_stats
64
64
65
65
66 def format_size(size):
66 def format_size(size):
67 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
67 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
68 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
68 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
69 if size < 1024:
69 if size < 1024:
70 return f"{size:.2f} {unit}"
70 return f"{size:.2f} {unit}"
71 size /= 1024
71 size /= 1024
General Comments 0
You need to be logged in to leave comments. Login now