##// END OF EJS Templates
archive-cache: synced with CE codebase
super-admin -
r1256:c72dd577 default
parent child Browse files
Show More
@@ -1,166 +1,167 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23
24 24 import fsspec
25 25
26 26 from .base import BaseCache, BaseShard
27 27 from ..utils import ShardFileReader, NOT_GIVEN
28 28 from ...type_utils import str2bool
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class FileSystemShard(BaseShard):
34 34
35 35 def __init__(self, index, directory, **settings):
36 36 self._index = index
37 37 self._directory = directory
38 38 self.storage_type = 'directory'
39 39 self.fs = fsspec.filesystem('file')
40 40
41 41 @property
42 42 def directory(self):
43 43 """Cache directory."""
44 44 return self._directory
45 45
46 46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 47 key_file = f'{archive_key}.{self.key_suffix}'
48 48 return key_file, os.path.join(self.directory, key_file)
49 49
50 50 def _get_writer(self, path, mode):
51 51 for count in range(1, 11):
52 52 try:
53 53 # Another cache may have deleted the directory before
54 54 # the file could be opened.
55 55 return self.fs.open(path, mode)
56 56 except OSError:
57 57 if count == 10:
58 58 # Give up after 10 tries to open the file.
59 59 raise
60 60 continue
61 61
62 62 def _write_file(self, full_path, iterator, mode):
63 63 # ensure dir exists
64 64 destination, _ = os.path.split(full_path)
65 65 if not self.fs.exists(destination):
66 66 self.fs.makedirs(destination)
67 67
68 68 writer = self._get_writer(full_path, mode)
69 69
70 70 digest = hashlib.sha256()
71 71 with writer:
72 72 size = 0
73 73 for chunk in iterator:
74 74 size += len(chunk)
75 75 digest.update(chunk)
76 76 writer.write(chunk)
77 77 writer.flush()
78 78 # Get the file descriptor
79 79 fd = writer.fileno()
80 80
81 81 # Sync the file descriptor to disk, helps with NFS cases...
82 82 os.fsync(fd)
83 83 sha256 = digest.hexdigest()
84 84 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
85 85 return size, sha256
86 86
87 87 def store(self, key, value_reader, metadata: dict | None = None):
88 88 return self._store(key, value_reader, metadata, mode='xb')
89 89
90 90 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
91 91 return self._fetch(key, retry, retry_attempts, retry_backoff)
92 92
93 93 def remove(self, key):
94 94 return self._remove(key)
95 95
96 96 def random_filename(self):
97 97 """Return filename and full-path tuple for file storage.
98 98
99 99 Filename will be a randomly generated 28 character hexadecimal string
100 100 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
101 101 reduce the size of directories. On older filesystems, lookups in
102 102 directories with many files may be slow.
103 103 """
104 104
105 105 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
106 106
107 107 archive_name = hex_name[4:] + '.archive_cache'
108 108 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
109 109
110 110 full_path = os.path.join(self.directory, filename)
111 111 return archive_name, full_path
112 112
113 113 def __repr__(self):
114 114 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
115 115
116 116
117 117 class FileSystemFanoutCache(BaseCache):
118 shard_name = 'shard_%03d'
118 119
119 120 def __init__(self, locking_url, **settings):
120 121 """
121 122 Initialize file system cache instance.
122 123
123 124 :param str locking_url: redis url for a lock
124 125 :param settings: settings dict
125 126
126 127 """
127 128 self._locking_url = locking_url
128 129 self._config = settings
129 130 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
130 131 directory = str(cache_dir)
131 132 directory = os.path.expanduser(directory)
132 133 directory = os.path.expandvars(directory)
133 134 self._directory = directory
134 135 self._storage_path = directory
135 136
136 137 # check if it's ok to write, and re-create the archive cache
137 138 if not os.path.isdir(self._directory):
138 139 os.makedirs(self._directory, exist_ok=True)
139 140
140 141 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141 142
142 143 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
143 144 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
144 145
145 146 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
146 147 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
147 148 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
148 149
149 150 log.debug('Initializing archival cache instance under %s', self._directory)
150 151 self._shards = tuple(
151 152 FileSystemShard(
152 153 index=num,
153 directory=os.path.join(directory, 'shard_%03d' % num),
154 directory=os.path.join(directory, self.shard_name % num),
154 155 **settings,
155 156 )
156 157 for num in range(self._count)
157 158 )
158 159 self._hash = self._shards[0].hash
159 160
160 161 def _get_shard(self, key) -> FileSystemShard:
161 162 index = self._hash(key) % self._count
162 163 shard = self._shards[index]
163 164 return shard
164 165
165 166 def _get_size(self, shard, archive_path):
166 167 return os.stat(archive_path).st_size
@@ -1,150 +1,158 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23
24 24 import fsspec
25 25
26 26 from .base import BaseCache, BaseShard
27 27 from ..utils import ShardFileReader, NOT_GIVEN
28 28 from ...type_utils import str2bool
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class S3Shard(BaseShard):
34 34
35 35 def __init__(self, index, bucket, **settings):
36 36 self._index = index
37 37 self._bucket = bucket
38 38 self.storage_type = 'bucket'
39 39
40 40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 41 key = settings.pop('archive_cache.objectstore.key')
42 42 secret = settings.pop('archive_cache.objectstore.secret')
43 43
44 # TODO: Add it all over the place...
45 self._bucket_root = settings.pop('archive_cache.objectstore.bucket_root')
46
44 47 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
45 48
46 49 @property
47 50 def bucket(self):
48 51 """Cache bucket."""
49 return self._bucket
52 return os.path.join(self._bucket_root, self._bucket)
50 53
51 54 def _get_keyfile(self, archive_key) -> tuple[str, str]:
52 55 key_file = f'{archive_key}-{self.key_suffix}'
53 56 return key_file, os.path.join(self.bucket, key_file)
54 57
55 58 def _get_writer(self, path, mode):
56 59 return self.fs.open(path, 'wb')
57 60
58 61 def _write_file(self, full_path, iterator, mode):
62 if self._bucket_root:
63 if not self.fs.exists(self._bucket_root):
64 self.fs.mkdir(self._bucket_root)
65
59 66 # ensure bucket exists
60 67 destination = self.bucket
61 68 if not self.fs.exists(destination):
62 69 self.fs.mkdir(destination, s3_additional_kwargs={})
63 70
64 71 writer = self._get_writer(full_path, mode)
65 72
66 73 digest = hashlib.sha256()
67 74 with writer:
68 75 size = 0
69 76 for chunk in iterator:
70 77 size += len(chunk)
71 78 digest.update(chunk)
72 79 writer.write(chunk)
73 80
74 81 sha256 = digest.hexdigest()
75 82 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
76 83 return size, sha256
77 84
78 85 def store(self, key, value_reader, metadata: dict | None = None):
79 86 return self._store(key, value_reader, metadata, mode='wb')
80 87
81 88 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
82 89 return self._fetch(key, retry, retry_attempts, retry_backoff)
83 90
84 91 def remove(self, key):
85 92 return self._remove(key)
86 93
87 94 def random_filename(self):
88 95 """Return filename and full-path tuple for file storage.
89 96
90 97 Filename will be a randomly generated 28 character hexadecimal string
91 98 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
92 99 reduce the size of directories. On older filesystems, lookups in
93 100 directories with many files may be slow.
94 101 """
95 102
96 103 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
97 104
98 105 archive_name = hex_name[4:] + '.archive_cache'
99 106 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
100 107
101 108 full_path = os.path.join(self.bucket, filename)
102 109 return archive_name, full_path
103 110
104 111 def __repr__(self):
105 112 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
106 113
107 114
108 115 class ObjectStoreCache(BaseCache):
116 shard_name = 'shard-bucket-%03d'
109 117
110 118 def __init__(self, locking_url, **settings):
111 119 """
112 120 Initialize objectstore cache instance.
113 121
114 122 :param str locking_url: redis url for a lock
115 123 :param settings: settings dict
116 124
117 125 """
118 126 self._locking_url = locking_url
119 127 self._config = settings
120 128
121 129 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 130 self._storage_path = objectstore_url
123 131
124 132 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125 133
126 134 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
127 135 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
128 136
129 137 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
130 138 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
131 139 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
132 140
133 141 log.debug('Initializing archival cache instance under %s', objectstore_url)
134 142 self._shards = tuple(
135 143 S3Shard(
136 144 index=num,
137 bucket='rhodecode-archivecache-%03d' % num,
145 bucket=self.shard_name % num,
138 146 **settings,
139 147 )
140 148 for num in range(self._count)
141 149 )
142 150 self._hash = self._shards[0].hash
143 151
144 152 def _get_shard(self, key) -> S3Shard:
145 153 index = self._hash(key) % self._count
146 154 shard = self._shards[index]
147 155 return shard
148 156
149 157 def _get_size(self, shard, archive_path):
150 158 return shard.fs.info(archive_path)['size']
General Comments 0
You need to be logged in to leave comments. Login now