##// END OF EJS Templates
fix: fixed s3 region. Fixes: RCCE-98
ilin.s -
r5457:1e916a13 default
parent child Browse files
Show More
@@ -1,171 +1,173 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23 import typing
24 24
25 25 import fsspec
26 26
27 27 from .base import BaseCache, BaseShard
28 28 from ..utils import ShardFileReader, NOT_GIVEN
29 29 from ...type_utils import str2bool
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class S3Shard(BaseShard):
35 35
36 36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
37 37 self._index: int = index
38 38 self._bucket_folder: str = bucket_folder
39 39 self.storage_type: str = 'bucket'
40 40 self._bucket_main: str = bucket
41 41
42 42 self.fs = fs
43 43
44 44 @property
45 45 def bucket(self) -> str:
46 46 """Cache bucket final path."""
47 47 return os.path.join(self._bucket_main, self._bucket_folder)
48 48
49 49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
50 50 key_file: str = f'{archive_key}-{self.key_suffix}'
51 51 return key_file, os.path.join(self.bucket, key_file)
52 52
53 53 def _get_writer(self, path, mode):
54 54 return self.fs.open(path, 'wb')
55 55
56 56 def _write_file(self, full_path, iterator, mode):
57 57
58 58 # ensure folder in bucket exists
59 59 destination = self.bucket
60 60 if not self.fs.exists(destination):
61 61 self.fs.mkdir(destination, s3_additional_kwargs={})
62 62
63 63 writer = self._get_writer(full_path, mode)
64 64
65 65 digest = hashlib.sha256()
66 66 with writer:
67 67 size = 0
68 68 for chunk in iterator:
69 69 size += len(chunk)
70 70 digest.update(chunk)
71 71 writer.write(chunk)
72 72
73 73 sha256 = digest.hexdigest()
74 74 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
75 75 return size, sha256
76 76
77 77 def store(self, key, value_reader, metadata: dict | None = None):
78 78 return self._store(key, value_reader, metadata, mode='wb')
79 79
80 80 def fetch(self, key, retry=NOT_GIVEN,
81 81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
84 84
85 85 def remove(self, key):
86 86 return self._remove(key)
87 87
88 88 def random_filename(self):
89 89 """Return filename and full-path tuple for file storage.
90 90
91 91 Filename will be a randomly generated 28 character hexadecimal string
92 92 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
93 93 reduce the size of directories. On older filesystems, lookups in
94 94 directories with many files may be slow.
95 95 """
96 96
97 97 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
98 98
99 99 archive_name = hex_name[4:] + '.archive_cache'
100 100 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
101 101
102 102 full_path = os.path.join(self.bucket, filename)
103 103 return archive_name, full_path
104 104
105 105 def __repr__(self):
106 106 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
107 107
108 108
109 109 class ObjectStoreCache(BaseCache):
110 110 shard_name: str = 'shard-{:03d}'
111 111 shard_cls = S3Shard
112 112
113 113 def __init__(self, locking_url, **settings):
114 114 """
115 115 Initialize objectstore cache instance.
116 116
117 117 :param str locking_url: redis url for a lock
118 118 :param settings: settings dict
119 119
120 120 """
121 121 self._locking_url = locking_url
122 122 self._config = settings
123 123
124 124 objectstore_url = self.get_conf('archive_cache.objectstore.url')
125 125 self._storage_path = objectstore_url # common path for all from BaseCache
126 126
127 127 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
128 128 if self._shard_count < 1:
129 129 raise ValueError('cache_shards must be 1 or more')
130 130
131 131 self._bucket = settings.pop('archive_cache.objectstore.bucket')
132 132 if not self._bucket:
133 133 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
134 134
135 135 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
136 136 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
137 137
138 138 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
139 139 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
140 140 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
141 141
142 142 endpoint_url = settings.pop('archive_cache.objectstore.url')
143 143 key = settings.pop('archive_cache.objectstore.key')
144 144 secret = settings.pop('archive_cache.objectstore.secret')
145 145 region = settings.pop('archive_cache.objectstore.region')
146 146
147 147 log.debug('Initializing %s archival cache instance', self)
148 148
149 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, region=region)
149 fs = fsspec.filesystem(
150 's3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, client_kwargs={'region_name': region}
151 )
150 152
151 153 # init main bucket
152 154 if not fs.exists(self._bucket):
153 155 fs.mkdir(self._bucket)
154 156
155 157 self._shards = tuple(
156 158 self.shard_cls(
157 159 index=num,
158 160 bucket=self._bucket,
159 161 bucket_folder=self.shard_name.format(num),
160 162 fs=fs,
161 163 **settings,
162 164 )
163 165 for num in range(self._shard_count)
164 166 )
165 167 self._hash = self._shards[0].hash
166 168
167 169 def _get_size(self, shard, archive_path):
168 170 return shard.fs.info(archive_path)['size']
169 171
170 172 def set_presigned_url_expiry(self, val: int) -> None:
171 173 self.presigned_url_expires = val
General Comments 0
You need to be logged in to leave comments. Login now