##// END OF EJS Templates
archive-cache: synced with CE codebase
super-admin -
r1256:c72dd577 default
parent child Browse files
Show More
@@ -1,166 +1,167 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23
23
24 import fsspec
24 import fsspec
25
25
26 from .base import BaseCache, BaseShard
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
28 from ...type_utils import str2bool
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class FileSystemShard(BaseShard):
33 class FileSystemShard(BaseShard):
34
34
35 def __init__(self, index, directory, **settings):
35 def __init__(self, index, directory, **settings):
36 self._index = index
36 self._index = index
37 self._directory = directory
37 self._directory = directory
38 self.storage_type = 'directory'
38 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
39 self.fs = fsspec.filesystem('file')
40
40
41 @property
41 @property
42 def directory(self):
42 def directory(self):
43 """Cache directory."""
43 """Cache directory."""
44 return self._directory
44 return self._directory
45
45
46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 key_file = f'{archive_key}.{self.key_suffix}'
47 key_file = f'{archive_key}.{self.key_suffix}'
48 return key_file, os.path.join(self.directory, key_file)
48 return key_file, os.path.join(self.directory, key_file)
49
49
50 def _get_writer(self, path, mode):
50 def _get_writer(self, path, mode):
51 for count in range(1, 11):
51 for count in range(1, 11):
52 try:
52 try:
53 # Another cache may have deleted the directory before
53 # Another cache may have deleted the directory before
54 # the file could be opened.
54 # the file could be opened.
55 return self.fs.open(path, mode)
55 return self.fs.open(path, mode)
56 except OSError:
56 except OSError:
57 if count == 10:
57 if count == 10:
58 # Give up after 10 tries to open the file.
58 # Give up after 10 tries to open the file.
59 raise
59 raise
60 continue
60 continue
61
61
62 def _write_file(self, full_path, iterator, mode):
62 def _write_file(self, full_path, iterator, mode):
63 # ensure dir exists
63 # ensure dir exists
64 destination, _ = os.path.split(full_path)
64 destination, _ = os.path.split(full_path)
65 if not self.fs.exists(destination):
65 if not self.fs.exists(destination):
66 self.fs.makedirs(destination)
66 self.fs.makedirs(destination)
67
67
68 writer = self._get_writer(full_path, mode)
68 writer = self._get_writer(full_path, mode)
69
69
70 digest = hashlib.sha256()
70 digest = hashlib.sha256()
71 with writer:
71 with writer:
72 size = 0
72 size = 0
73 for chunk in iterator:
73 for chunk in iterator:
74 size += len(chunk)
74 size += len(chunk)
75 digest.update(chunk)
75 digest.update(chunk)
76 writer.write(chunk)
76 writer.write(chunk)
77 writer.flush()
77 writer.flush()
78 # Get the file descriptor
78 # Get the file descriptor
79 fd = writer.fileno()
79 fd = writer.fileno()
80
80
81 # Sync the file descriptor to disk, helps with NFS cases...
81 # Sync the file descriptor to disk, helps with NFS cases...
82 os.fsync(fd)
82 os.fsync(fd)
83 sha256 = digest.hexdigest()
83 sha256 = digest.hexdigest()
84 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
84 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
85 return size, sha256
85 return size, sha256
86
86
87 def store(self, key, value_reader, metadata: dict | None = None):
87 def store(self, key, value_reader, metadata: dict | None = None):
88 return self._store(key, value_reader, metadata, mode='xb')
88 return self._store(key, value_reader, metadata, mode='xb')
89
89
90 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
90 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
91 return self._fetch(key, retry, retry_attempts, retry_backoff)
91 return self._fetch(key, retry, retry_attempts, retry_backoff)
92
92
93 def remove(self, key):
93 def remove(self, key):
94 return self._remove(key)
94 return self._remove(key)
95
95
96 def random_filename(self):
96 def random_filename(self):
97 """Return filename and full-path tuple for file storage.
97 """Return filename and full-path tuple for file storage.
98
98
99 Filename will be a randomly generated 28 character hexadecimal string
99 Filename will be a randomly generated 28 character hexadecimal string
100 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
100 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
101 reduce the size of directories. On older filesystems, lookups in
101 reduce the size of directories. On older filesystems, lookups in
102 directories with many files may be slow.
102 directories with many files may be slow.
103 """
103 """
104
104
105 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
105 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
106
106
107 archive_name = hex_name[4:] + '.archive_cache'
107 archive_name = hex_name[4:] + '.archive_cache'
108 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
108 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
109
109
110 full_path = os.path.join(self.directory, filename)
110 full_path = os.path.join(self.directory, filename)
111 return archive_name, full_path
111 return archive_name, full_path
112
112
113 def __repr__(self):
113 def __repr__(self):
114 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
114 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
115
115
116
116
117 class FileSystemFanoutCache(BaseCache):
117 class FileSystemFanoutCache(BaseCache):
118 shard_name = 'shard_%03d'
118
119
119 def __init__(self, locking_url, **settings):
120 def __init__(self, locking_url, **settings):
120 """
121 """
121 Initialize file system cache instance.
122 Initialize file system cache instance.
122
123
123 :param str locking_url: redis url for a lock
124 :param str locking_url: redis url for a lock
124 :param settings: settings dict
125 :param settings: settings dict
125
126
126 """
127 """
127 self._locking_url = locking_url
128 self._locking_url = locking_url
128 self._config = settings
129 self._config = settings
129 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
130 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
130 directory = str(cache_dir)
131 directory = str(cache_dir)
131 directory = os.path.expanduser(directory)
132 directory = os.path.expanduser(directory)
132 directory = os.path.expandvars(directory)
133 directory = os.path.expandvars(directory)
133 self._directory = directory
134 self._directory = directory
134 self._storage_path = directory
135 self._storage_path = directory
135
136
136 # check if it's ok to write, and re-create the archive cache
137 # check if it's ok to write, and re-create the archive cache
137 if not os.path.isdir(self._directory):
138 if not os.path.isdir(self._directory):
138 os.makedirs(self._directory, exist_ok=True)
139 os.makedirs(self._directory, exist_ok=True)
139
140
140 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141
142
142 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
143 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
143 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
144 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
144
145
145 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
146 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
146 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
147 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
147 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
148 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
148
149
149 log.debug('Initializing archival cache instance under %s', self._directory)
150 log.debug('Initializing archival cache instance under %s', self._directory)
150 self._shards = tuple(
151 self._shards = tuple(
151 FileSystemShard(
152 FileSystemShard(
152 index=num,
153 index=num,
153 directory=os.path.join(directory, 'shard_%03d' % num),
154 directory=os.path.join(directory, self.shard_name % num),
154 **settings,
155 **settings,
155 )
156 )
156 for num in range(self._count)
157 for num in range(self._count)
157 )
158 )
158 self._hash = self._shards[0].hash
159 self._hash = self._shards[0].hash
159
160
160 def _get_shard(self, key) -> FileSystemShard:
161 def _get_shard(self, key) -> FileSystemShard:
161 index = self._hash(key) % self._count
162 index = self._hash(key) % self._count
162 shard = self._shards[index]
163 shard = self._shards[index]
163 return shard
164 return shard
164
165
165 def _get_size(self, shard, archive_path):
166 def _get_size(self, shard, archive_path):
166 return os.stat(archive_path).st_size
167 return os.stat(archive_path).st_size
@@ -1,150 +1,158 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23
23
24 import fsspec
24 import fsspec
25
25
26 from .base import BaseCache, BaseShard
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
28 from ...type_utils import str2bool
29
29
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class S3Shard(BaseShard):
33 class S3Shard(BaseShard):
34
34
35 def __init__(self, index, bucket, **settings):
35 def __init__(self, index, bucket, **settings):
36 self._index = index
36 self._index = index
37 self._bucket = bucket
37 self._bucket = bucket
38 self.storage_type = 'bucket'
38 self.storage_type = 'bucket'
39
39
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 key = settings.pop('archive_cache.objectstore.key')
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
43
44 # TODO: Add it all over the place...
45 self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root')
46
44 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
47 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
45
48
46 @property
49 @property
47 def bucket(self):
50 def bucket(self):
48 """Cache bucket."""
51 """Cache bucket."""
49 return self._bucket
52 return os.path.join(self._bucket_root, self._bucket)
50
53
51 def _get_keyfile(self, archive_key) -> tuple[str, str]:
54 def _get_keyfile(self, archive_key) -> tuple[str, str]:
52 key_file = f'{archive_key}-{self.key_suffix}'
55 key_file = f'{archive_key}-{self.key_suffix}'
53 return key_file, os.path.join(self.bucket, key_file)
56 return key_file, os.path.join(self.bucket, key_file)
54
57
55 def _get_writer(self, path, mode):
58 def _get_writer(self, path, mode):
56 return self.fs.open(path, 'wb')
59 return self.fs.open(path, 'wb')
57
60
58 def _write_file(self, full_path, iterator, mode):
61 def _write_file(self, full_path, iterator, mode):
62 if self._bucket_root:
63 if not self.fs.exists(self._bucket_root):
64 self.fs.mkdir(self._bucket_root)
65
59 # ensure bucket exists
66 # ensure bucket exists
60 destination = self.bucket
67 destination = self.bucket
61 if not self.fs.exists(destination):
68 if not self.fs.exists(destination):
62 self.fs.mkdir(destination, s3_additional_kwargs={})
69 self.fs.mkdir(destination, s3_additional_kwargs={})
63
70
64 writer = self._get_writer(full_path, mode)
71 writer = self._get_writer(full_path, mode)
65
72
66 digest = hashlib.sha256()
73 digest = hashlib.sha256()
67 with writer:
74 with writer:
68 size = 0
75 size = 0
69 for chunk in iterator:
76 for chunk in iterator:
70 size += len(chunk)
77 size += len(chunk)
71 digest.update(chunk)
78 digest.update(chunk)
72 writer.write(chunk)
79 writer.write(chunk)
73
80
74 sha256 = digest.hexdigest()
81 sha256 = digest.hexdigest()
75 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
82 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
76 return size, sha256
83 return size, sha256
77
84
78 def store(self, key, value_reader, metadata: dict | None = None):
85 def store(self, key, value_reader, metadata: dict | None = None):
79 return self._store(key, value_reader, metadata, mode='wb')
86 return self._store(key, value_reader, metadata, mode='wb')
80
87
81 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
88 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
82 return self._fetch(key, retry, retry_attempts, retry_backoff)
89 return self._fetch(key, retry, retry_attempts, retry_backoff)
83
90
84 def remove(self, key):
91 def remove(self, key):
85 return self._remove(key)
92 return self._remove(key)
86
93
87 def random_filename(self):
94 def random_filename(self):
88 """Return filename and full-path tuple for file storage.
95 """Return filename and full-path tuple for file storage.
89
96
90 Filename will be a randomly generated 28 character hexadecimal string
97 Filename will be a randomly generated 28 character hexadecimal string
91 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
98 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
92 reduce the size of directories. On older filesystems, lookups in
99 reduce the size of directories. On older filesystems, lookups in
93 directories with many files may be slow.
100 directories with many files may be slow.
94 """
101 """
95
102
96 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
103 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
97
104
98 archive_name = hex_name[4:] + '.archive_cache'
105 archive_name = hex_name[4:] + '.archive_cache'
99 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
106 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
100
107
101 full_path = os.path.join(self.bucket, filename)
108 full_path = os.path.join(self.bucket, filename)
102 return archive_name, full_path
109 return archive_name, full_path
103
110
104 def __repr__(self):
111 def __repr__(self):
105 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
112 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
106
113
107
114
108 class ObjectStoreCache(BaseCache):
115 class ObjectStoreCache(BaseCache):
116 shard_name = 'shard-bucket-%03d'
109
117
110 def __init__(self, locking_url, **settings):
118 def __init__(self, locking_url, **settings):
111 """
119 """
112 Initialize objectstore cache instance.
120 Initialize objectstore cache instance.
113
121
114 :param str locking_url: redis url for a lock
122 :param str locking_url: redis url for a lock
115 :param settings: settings dict
123 :param settings: settings dict
116
124
117 """
125 """
118 self._locking_url = locking_url
126 self._locking_url = locking_url
119 self._config = settings
127 self._config = settings
120
128
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
129 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 self._storage_path = objectstore_url
130 self._storage_path = objectstore_url
123
131
124 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
132 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125
133
126 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
134 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
127 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
135 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
128
136
129 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
137 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
130 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
138 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
131 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
139 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
132
140
133 log.debug('Initializing archival cache instance under %s', objectstore_url)
141 log.debug('Initializing archival cache instance under %s', objectstore_url)
134 self._shards = tuple(
142 self._shards = tuple(
135 S3Shard(
143 S3Shard(
136 index=num,
144 index=num,
137 bucket='rhodecode-archivecache-%03d' % num,
145 bucket=self.shard_name % num,
138 **settings,
146 **settings,
139 )
147 )
140 for num in range(self._count)
148 for num in range(self._count)
141 )
149 )
142 self._hash = self._shards[0].hash
150 self._hash = self._shards[0].hash
143
151
144 def _get_shard(self, key) -> S3Shard:
152 def _get_shard(self, key) -> S3Shard:
145 index = self._hash(key) % self._count
153 index = self._hash(key) % self._count
146 shard = self._shards[index]
154 shard = self._shards[index]
147 return shard
155 return shard
148
156
149 def _get_size(self, shard, archive_path):
157 def _get_size(self, shard, archive_path):
150 return shard.fs.info(archive_path)['size']
158 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now