##// END OF EJS Templates
feat(s3 region): added configurable s3 region.
ilin.s -
r1264:3850c020 default
parent child Browse files
Show More
@@ -1,170 +1,173 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23 import typing
24 24
25 25 import fsspec
26 26
27 27 from .base import BaseCache, BaseShard
28 28 from ..utils import ShardFileReader, NOT_GIVEN
29 29 from ...type_utils import str2bool
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class S3Shard(BaseShard):
35 35
36 36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
37 37 self._index: int = index
38 38 self._bucket_folder: str = bucket_folder
39 39 self.storage_type: str = 'bucket'
40 40 self._bucket_main: str = bucket
41 41
42 42 self.fs = fs
43 43
44 44 @property
45 45 def bucket(self) -> str:
46 46 """Cache bucket final path."""
47 47 return os.path.join(self._bucket_main, self._bucket_folder)
48 48
49 49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
50 50 key_file: str = f'{archive_key}-{self.key_suffix}'
51 51 return key_file, os.path.join(self.bucket, key_file)
52 52
53 53 def _get_writer(self, path, mode):
54 54 return self.fs.open(path, 'wb')
55 55
56 56 def _write_file(self, full_path, iterator, mode):
57 57
58 58 # ensure folder in bucket exists
59 59 destination = self.bucket
60 60 if not self.fs.exists(destination):
61 61 self.fs.mkdir(destination, s3_additional_kwargs={})
62 62
63 63 writer = self._get_writer(full_path, mode)
64 64
65 65 digest = hashlib.sha256()
66 66 with writer:
67 67 size = 0
68 68 for chunk in iterator:
69 69 size += len(chunk)
70 70 digest.update(chunk)
71 71 writer.write(chunk)
72 72
73 73 sha256 = digest.hexdigest()
74 74 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
75 75 return size, sha256
76 76
77 77 def store(self, key, value_reader, metadata: dict | None = None):
78 78 return self._store(key, value_reader, metadata, mode='wb')
79 79
80 80 def fetch(self, key, retry=NOT_GIVEN,
81 81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
84 84
85 85 def remove(self, key):
86 86 return self._remove(key)
87 87
88 88 def random_filename(self):
89 89 """Return filename and full-path tuple for file storage.
90 90
91 91 Filename will be a randomly generated 28 character hexadecimal string
92 92 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
93 93 reduce the size of directories. On older filesystems, lookups in
94 94 directories with many files may be slow.
95 95 """
96 96
97 97 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
98 98
99 99 archive_name = hex_name[4:] + '.archive_cache'
100 100 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
101 101
102 102 full_path = os.path.join(self.bucket, filename)
103 103 return archive_name, full_path
104 104
105 105 def __repr__(self):
106 106 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
107 107
108 108
109 109 class ObjectStoreCache(BaseCache):
110 110 shard_name: str = 'shard-{:03d}'
111 111 shard_cls = S3Shard
112 112
113 113 def __init__(self, locking_url, **settings):
114 114 """
115 115 Initialize objectstore cache instance.
116 116
117 117 :param str locking_url: redis url for a lock
118 118 :param settings: settings dict
119 119
120 120 """
121 121 self._locking_url = locking_url
122 122 self._config = settings
123 123
124 124 objectstore_url = self.get_conf('archive_cache.objectstore.url')
125 125 self._storage_path = objectstore_url # common path for all from BaseCache
126 126
127 127 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
128 128 if self._shard_count < 1:
129 129 raise ValueError('cache_shards must be 1 or more')
130 130
131 131 self._bucket = settings.pop('archive_cache.objectstore.bucket')
132 132 if not self._bucket:
133 133 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
134 134
135 135 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
136 136 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
137 137
138 138 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
139 139 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
140 140 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
141 141
142 142 endpoint_url = settings.pop('archive_cache.objectstore.url')
143 143 key = settings.pop('archive_cache.objectstore.key')
144 144 secret = settings.pop('archive_cache.objectstore.secret')
145 region = settings.pop('archive_cache.objectstore.region')
145 146
146 147 log.debug('Initializing %s archival cache instance', self)
147 148
148 fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
149 fs = fsspec.filesystem(
150 's3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, client_kwargs={'region_name': region}
151 )
149 152
150 153 # init main bucket
151 154 if not fs.exists(self._bucket):
152 155 fs.mkdir(self._bucket)
153 156
154 157 self._shards = tuple(
155 158 self.shard_cls(
156 159 index=num,
157 160 bucket=self._bucket,
158 161 bucket_folder=self.shard_name.format(num),
159 162 fs=fs,
160 163 **settings,
161 164 )
162 165 for num in range(self._shard_count)
163 166 )
164 167 self._hash = self._shards[0].hash
165 168
166 169 def _get_size(self, shard, archive_path):
167 170 return shard.fs.info(archive_path)['size']
168 171
169 172 def set_presigned_url_expiry(self, val: int) -> None:
170 173 self.presigned_url_expires = val
General Comments 0
You need to be logged in to leave comments. Login now