##// END OF EJS Templates
fix: fixed s3 region. Fixes: RCCE-98
ilin.s -
r5457:1e916a13 default
parent child Browse files
Show More
@@ -1,171 +1,173 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import codecs
19 import codecs
20 import hashlib
20 import hashlib
21 import logging
21 import logging
22 import os
22 import os
23 import typing
23 import typing
24
24
25 import fsspec
25 import fsspec
26
26
27 from .base import BaseCache, BaseShard
27 from .base import BaseCache, BaseShard
28 from ..utils import ShardFileReader, NOT_GIVEN
28 from ..utils import ShardFileReader, NOT_GIVEN
29 from ...type_utils import str2bool
29 from ...type_utils import str2bool
30
30
31 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
32
32
33
33
34 class S3Shard(BaseShard):
34 class S3Shard(BaseShard):
35
35
36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
37 self._index: int = index
37 self._index: int = index
38 self._bucket_folder: str = bucket_folder
38 self._bucket_folder: str = bucket_folder
39 self.storage_type: str = 'bucket'
39 self.storage_type: str = 'bucket'
40 self._bucket_main: str = bucket
40 self._bucket_main: str = bucket
41
41
42 self.fs = fs
42 self.fs = fs
43
43
44 @property
44 @property
45 def bucket(self) -> str:
45 def bucket(self) -> str:
46 """Cache bucket final path."""
46 """Cache bucket final path."""
47 return os.path.join(self._bucket_main, self._bucket_folder)
47 return os.path.join(self._bucket_main, self._bucket_folder)
48
48
49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
50 key_file: str = f'{archive_key}-{self.key_suffix}'
50 key_file: str = f'{archive_key}-{self.key_suffix}'
51 return key_file, os.path.join(self.bucket, key_file)
51 return key_file, os.path.join(self.bucket, key_file)
52
52
53 def _get_writer(self, path, mode):
53 def _get_writer(self, path, mode):
54 return self.fs.open(path, 'wb')
54 return self.fs.open(path, 'wb')
55
55
56 def _write_file(self, full_path, iterator, mode):
56 def _write_file(self, full_path, iterator, mode):
57
57
58 # ensure folder in bucket exists
58 # ensure folder in bucket exists
59 destination = self.bucket
59 destination = self.bucket
60 if not self.fs.exists(destination):
60 if not self.fs.exists(destination):
61 self.fs.mkdir(destination, s3_additional_kwargs={})
61 self.fs.mkdir(destination, s3_additional_kwargs={})
62
62
63 writer = self._get_writer(full_path, mode)
63 writer = self._get_writer(full_path, mode)
64
64
65 digest = hashlib.sha256()
65 digest = hashlib.sha256()
66 with writer:
66 with writer:
67 size = 0
67 size = 0
68 for chunk in iterator:
68 for chunk in iterator:
69 size += len(chunk)
69 size += len(chunk)
70 digest.update(chunk)
70 digest.update(chunk)
71 writer.write(chunk)
71 writer.write(chunk)
72
72
73 sha256 = digest.hexdigest()
73 sha256 = digest.hexdigest()
74 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
74 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
75 return size, sha256
75 return size, sha256
76
76
77 def store(self, key, value_reader, metadata: dict | None = None):
77 def store(self, key, value_reader, metadata: dict | None = None):
78 return self._store(key, value_reader, metadata, mode='wb')
78 return self._store(key, value_reader, metadata, mode='wb')
79
79
80 def fetch(self, key, retry=NOT_GIVEN,
80 def fetch(self, key, retry=NOT_GIVEN,
81 retry_attempts=NOT_GIVEN, retry_backoff=1,
81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
84
84
85 def remove(self, key):
85 def remove(self, key):
86 return self._remove(key)
86 return self._remove(key)
87
87
88 def random_filename(self):
88 def random_filename(self):
89 """Return filename and full-path tuple for file storage.
89 """Return filename and full-path tuple for file storage.
90
90
91 Filename will be a randomly generated 28 character hexadecimal string
91 Filename will be a randomly generated 28 character hexadecimal string
92 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
92 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
93 reduce the size of directories. On older filesystems, lookups in
93 reduce the size of directories. On older filesystems, lookups in
94 directories with many files may be slow.
94 directories with many files may be slow.
95 """
95 """
96
96
97 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
97 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
98
98
99 archive_name = hex_name[4:] + '.archive_cache'
99 archive_name = hex_name[4:] + '.archive_cache'
100 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
100 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
101
101
102 full_path = os.path.join(self.bucket, filename)
102 full_path = os.path.join(self.bucket, filename)
103 return archive_name, full_path
103 return archive_name, full_path
104
104
105 def __repr__(self):
105 def __repr__(self):
106 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
106 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
107
107
108
108
109 class ObjectStoreCache(BaseCache):
109 class ObjectStoreCache(BaseCache):
110 shard_name: str = 'shard-{:03d}'
110 shard_name: str = 'shard-{:03d}'
111 shard_cls = S3Shard
111 shard_cls = S3Shard
112
112
113 def __init__(self, locking_url, **settings):
113 def __init__(self, locking_url, **settings):
114 """
114 """
115 Initialize objectstore cache instance.
115 Initialize objectstore cache instance.
116
116
117 :param str locking_url: redis url for a lock
117 :param str locking_url: redis url for a lock
118 :param settings: settings dict
118 :param settings: settings dict
119
119
120 """
120 """
121 self._locking_url = locking_url
121 self._locking_url = locking_url
122 self._config = settings
122 self._config = settings
123
123
124 objectstore_url = self.get_conf('archive_cache.objectstore.url')
124 objectstore_url = self.get_conf('archive_cache.objectstore.url')
125 self._storage_path = objectstore_url # common path for all from BaseCache
125 self._storage_path = objectstore_url # common path for all from BaseCache
126
126
127 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
127 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
128 if self._shard_count < 1:
128 if self._shard_count < 1:
129 raise ValueError('cache_shards must be 1 or more')
129 raise ValueError('cache_shards must be 1 or more')
130
130
131 self._bucket = settings.pop('archive_cache.objectstore.bucket')
131 self._bucket = settings.pop('archive_cache.objectstore.bucket')
132 if not self._bucket:
132 if not self._bucket:
133 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
133 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
134
134
135 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
135 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
136 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
136 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
137
137
138 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
138 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
139 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
139 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
140 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
140 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
141
141
142 endpoint_url = settings.pop('archive_cache.objectstore.url')
142 endpoint_url = settings.pop('archive_cache.objectstore.url')
143 key = settings.pop('archive_cache.objectstore.key')
143 key = settings.pop('archive_cache.objectstore.key')
144 secret = settings.pop('archive_cache.objectstore.secret')
144 secret = settings.pop('archive_cache.objectstore.secret')
145 region = settings.pop('archive_cache.objectstore.region')
145 region = settings.pop('archive_cache.objectstore.region')
146
146
147 log.debug('Initializing %s archival cache instance', self)
147 log.debug('Initializing %s archival cache instance', self)
148
148
149 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, region=region)
149 fs = fsspec.filesystem(
150 's3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, client_kwargs={'region_name': region}
151 )
150
152
151 # init main bucket
153 # init main bucket
152 if not fs.exists(self._bucket):
154 if not fs.exists(self._bucket):
153 fs.mkdir(self._bucket)
155 fs.mkdir(self._bucket)
154
156
155 self._shards = tuple(
157 self._shards = tuple(
156 self.shard_cls(
158 self.shard_cls(
157 index=num,
159 index=num,
158 bucket=self._bucket,
160 bucket=self._bucket,
159 bucket_folder=self.shard_name.format(num),
161 bucket_folder=self.shard_name.format(num),
160 fs=fs,
162 fs=fs,
161 **settings,
163 **settings,
162 )
164 )
163 for num in range(self._shard_count)
165 for num in range(self._shard_count)
164 )
166 )
165 self._hash = self._shards[0].hash
167 self._hash = self._shards[0].hash
166
168
167 def _get_size(self, shard, archive_path):
169 def _get_size(self, shard, archive_path):
168 return shard.fs.info(archive_path)['size']
170 return shard.fs.info(archive_path)['size']
169
171
170 def set_presigned_url_expiry(self, val: int) -> None:
172 def set_presigned_url_expiry(self, val: int) -> None:
171 self.presigned_url_expires = val
173 self.presigned_url_expires = val
General Comments 0
You need to be logged in to leave comments. Login now