##// END OF EJS Templates
feat(archive-cache): synced with CE codebase
super-admin -
r1251:f8e66197 default
parent child Browse files
Show More
@@ -0,0 +1,79 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import logging
20
21 from .backends.fanout_cache import FileSystemFanoutCache
22 from .backends.objectstore_cache import ObjectStoreCache
23
24 from .utils import archive_iterator # noqa
25 from .lock import ArchiveCacheGenerationLock # noqa
26
27 log = logging.getLogger(__name__)
28
29
30 cache_meta = None
31
32
33 def includeme(config):
34 return # vcsserver gets its config from rhodecode on a remote call
35 # init our cache at start
36 settings = config.get_settings()
37 get_archival_cache_store(settings)
38
39
40 def get_archival_config(config):
41
42 final_config = {
43
44 }
45
46 for k, v in config.items():
47 if k.startswith('archive_cache'):
48 final_config[k] = v
49
50 return final_config
51
52
53 def get_archival_cache_store(config, always_init=False):
54
55 global cache_meta
56 if cache_meta is not None and not always_init:
57 return cache_meta
58
59 config = get_archival_config(config)
60 backend = config['archive_cache.backend.type']
61
62 archive_cache_locking_url = config['archive_cache.locking.url']
63
64 match backend:
65 case 'filesystem':
66 d_cache = FileSystemFanoutCache(
67 locking_url=archive_cache_locking_url,
68 **config
69 )
70 case 'objectstore':
71 d_cache = ObjectStoreCache(
72 locking_url=archive_cache_locking_url,
73 **config
74 )
75 case _:
76 raise ValueError(f'archive_cache.backend.type only supports "filesystem" or "objectstore" got {backend} ')
77
78 cache_meta = d_cache
79 return cache_meta
@@ -0,0 +1,17 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
@@ -0,0 +1,348 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import os
20 import functools
21 import logging
22 import typing
23 import time
24 import zlib
25
26 from ...ext_json import json
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 from ..lock import GenerationLock
29
30 log = logging.getLogger(__name__)
31
32
33 class BaseShard:
34 storage_type: str = ''
35 fs = None
36
37 @classmethod
38 def hash(cls, key):
39 """Compute portable hash for `key`.
40
41 :param key: key to hash
42 :return: hash value
43
44 """
45 mask = 0xFFFFFFFF
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47
48 def _write_file(self, full_path, read_iterator, mode):
49 raise NotImplementedError
50
51 def _get_keyfile(self, key):
52 raise NotImplementedError
53
54 def random_filename(self):
55 raise NotImplementedError
56
57 def _store(self, key, value_reader, metadata, mode):
58 (filename, # hash-name
59 full_path # full-path/hash-name
60 ) = self.random_filename()
61
62 key_file, key_file_path = self._get_keyfile(key)
63
64 # STORE METADATA
65 _metadata = {
66 "version": "v1",
67
68 "key_file": key_file, # this is the .key.json file storing meta
69 "key_file_path": key_file_path, # full path to key_file
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 "archive_filename": filename, # the actual filename we stored that file under
72 "archive_full_path": full_path,
73
74 "store_time": time.time(),
75 "access_count": 0,
76 "access_time": 0,
77
78 "size": 0
79 }
80 if metadata:
81 _metadata.update(metadata)
82
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 _metadata['size'] = size
86 _metadata['sha256'] = sha256
87
88 # after archive is finished, we create a key to save the presence of the binary file
89 with self.fs.open(key_file_path, 'wb') as f:
90 f.write(json.dumps(_metadata))
91
92 return key, filename, size, _metadata
93
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
95 if retry is NOT_GIVEN:
96 retry = False
97 if retry_attempts is NOT_GIVEN:
98 retry_attempts = 0
99
100 if retry and retry_attempts > 0:
101 for attempt in range(1, retry_attempts + 1):
102 if key in self:
103 break
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 time.sleep(retry_backoff)
106
107 if key not in self:
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 raise KeyError(key)
110
111 key_file, key_file_path = self._get_keyfile(key)
112 with self.fs.open(key_file_path, 'rb') as f:
113 metadata = json.loads(f.read())
114
115 archive_path = metadata['archive_full_path']
116
117 try:
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 finally:
120 # update usage stats, count and accessed
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 metadata["access_time"] = time.time()
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 key_file, metadata['access_count'], metadata['access_time'])
125 with self.fs.open(key_file_path, 'wb') as f:
126 f.write(json.dumps(metadata))
127
128 def _remove(self, key):
129 if key not in self:
130 log.exception(f'requested key={key} not found in {self}')
131 raise KeyError(key)
132
133 key_file, key_file_path = self._get_keyfile(key)
134 with self.fs.open(key_file_path, 'rb') as f:
135 metadata = json.loads(f.read())
136
137 archive_path = metadata['archive_full_path']
138 self.fs.rm(archive_path)
139 self.fs.rm(key_file_path)
140 return 1
141
142 @property
143 def storage_medium(self):
144 return getattr(self, self.storage_type)
145
146 @property
147 def key_suffix(self):
148 return 'key.json'
149
150 def __contains__(self, key):
151 """Return `True` if `key` matching item is found in cache.
152
153 :param key: key matching item
154 :return: True if key matching item
155
156 """
157 key_file, key_file_path = self._get_keyfile(key)
158 return self.fs.exists(key_file_path)
159
160
161 class BaseCache:
162 _locking_url: str = ''
163 _storage_path: str = ''
164 _config = {}
165 retry = False
166 retry_attempts = 0
167 retry_backoff = 1
168 _shards = tuple()
169
170 def __contains__(self, key):
171 """Return `True` if `key` matching item is found in cache.
172
173 :param key: key matching item
174 :return: True if key matching item
175
176 """
177 return self.has_key(key)
178
179 def __repr__(self):
180 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181
182 @classmethod
183 def gb_to_bytes(cls, gb):
184 return gb * (1024 ** 3)
185
186 @property
187 def storage_path(self):
188 return self._storage_path
189
190 @classmethod
191 def get_stats_db(cls):
192 return StatsDB()
193
194 def get_conf(self, key, pop=False):
195 if key not in self._config:
196 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 val = self._config[key]
198 if pop:
199 del self._config[key]
200 return val
201
202 def _get_shard(self, key):
203 raise NotImplementedError
204
205 def _get_size(self, shard, archive_path):
206 raise NotImplementedError
207
208 def store(self, key, value_reader, metadata=None):
209 shard = self._get_shard(key)
210 return shard.store(key, value_reader, metadata)
211
212 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
213 """
214 Return file handle corresponding to `key` from specific shard cache.
215 """
216 if retry is NOT_GIVEN:
217 retry = self.retry
218 if retry_attempts is NOT_GIVEN:
219 retry_attempts = self.retry_attempts
220 retry_backoff = self.retry_backoff
221
222 shard = self._get_shard(key)
223 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
224
225 def remove(self, key):
226 shard = self._get_shard(key)
227 return shard.remove(key)
228
229 def has_key(self, archive_key):
230 """Return `True` if `key` matching item is found in cache.
231
232 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
233 :return: True if key is found
234
235 """
236 shard = self._get_shard(archive_key)
237 return archive_key in shard
238
239 def iter_keys(self):
240 for shard in self._shards:
241 if shard.fs.exists(shard.storage_medium):
242 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
243 for key_file_path in _files:
244 if key_file_path.endswith(shard.key_suffix):
245 yield shard, key_file_path
246
247 def get_lock(self, lock_key):
248 return GenerationLock(lock_key, self._locking_url)
249
250 def evict(self, policy=None, size_limit=None) -> int:
251 """
252 Remove old items based on the conditions
253
254
255 explanation of this algo:
256 iterate over each shard, then for each shard iterate over the .key files
257 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
258 access data, time creation, and access counts.
259
260 Store that into a memory DB so we can run different sorting strategies easily.
261 Summing the size is a sum sql query.
262
263 Then we run a sorting strategy based on eviction policy.
264 We iterate over sorted keys, and remove each checking if we hit the overall limit.
265 """
266
267 policy = policy or self._eviction_policy
268 size_limit = size_limit or self._cache_size_limit
269
270 select_policy = EVICTION_POLICY[policy]['evict']
271
272 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
273 policy, format_size(size_limit))
274
275 if select_policy is None:
276 return 0
277
278 db = self.get_stats_db()
279
280 data = []
281 cnt = 1
282
283 for shard, key_file in self.iter_keys():
284 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
285 metadata = json.loads(f.read())
286
287 key_file_path = os.path.join(shard.storage_medium, key_file)
288
289 archive_key = metadata['archive_key']
290 archive_path = metadata['archive_full_path']
291
292 size = metadata.get('size')
293 if not size:
294 # in case we don't have size re-calc it...
295 size = self._get_size(shard, archive_path)
296
297 data.append([
298 cnt,
299 key_file,
300 key_file_path,
301 archive_key,
302 archive_path,
303 metadata.get('store_time', 0),
304 metadata.get('access_time', 0),
305 metadata.get('access_count', 0),
306 size,
307 ])
308 cnt += 1
309
310 # Insert bulk data using executemany
311 db.bulk_insert(data)
312
313 total_size = db.get_total_size()
314 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
315 len(data), format_size(total_size), format_size(size_limit))
316
317 removed_items = 0
318 removed_size = 0
319 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
320 # simulate removal impact BEFORE removal
321 total_size -= size
322
323 if total_size <= size_limit:
324 # we obtained what we wanted...
325 break
326
327 self.remove(archive_key)
328 removed_items += 1
329 removed_size += size
330
331 log.debug('Removed %s cache archives, and reduced size by: %s',
332 removed_items, format_size(removed_size))
333 return removed_items
334
335 def get_statistics(self):
336 total_files = 0
337 total_size = 0
338 meta = {}
339
340 for shard, key_file in self.iter_keys():
341 json_key = f"{shard.storage_medium}/{key_file}"
342 with shard.fs.open(json_key, 'rb') as f:
343 total_files += 1
344 metadata = json.loads(f.read())
345 total_size += metadata['size']
346
347 return total_files, total_size, meta
348
@@ -0,0 +1,166 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import codecs
20 import hashlib
21 import logging
22 import os
23
24 import fsspec
25
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
29
30 log = logging.getLogger(__name__)
31
32
33 class FileSystemShard(BaseShard):
34
35 def __init__(self, index, directory, **settings):
36 self._index = index
37 self._directory = directory
38 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
40
41 @property
42 def directory(self):
43 """Cache directory."""
44 return self._directory
45
46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 key_file = f'{archive_key}.{self.key_suffix}'
48 return key_file, os.path.join(self.directory, key_file)
49
50 def _get_writer(self, path, mode):
51 for count in range(1, 11):
52 try:
53 # Another cache may have deleted the directory before
54 # the file could be opened.
55 return self.fs.open(path, mode)
56 except OSError:
57 if count == 10:
58 # Give up after 10 tries to open the file.
59 raise
60 continue
61
62 def _write_file(self, full_path, iterator, mode):
63 # ensure dir exists
64 destination, _ = os.path.split(full_path)
65 if not self.fs.exists(destination):
66 self.fs.makedirs(destination)
67
68 writer = self._get_writer(full_path, mode)
69
70 digest = hashlib.sha256()
71 with writer:
72 size = 0
73 for chunk in iterator:
74 size += len(chunk)
75 digest.update(chunk)
76 writer.write(chunk)
77 writer.flush()
78 # Get the file descriptor
79 fd = writer.fileno()
80
81 # Sync the file descriptor to disk, helps with NFS cases...
82 os.fsync(fd)
83 sha256 = digest.hexdigest()
84 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
85 return size, sha256
86
87 def store(self, key, value_reader, metadata: dict | None = None):
88 return self._store(key, value_reader, metadata, mode='xb')
89
90 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
91 return self._fetch(key, retry, retry_attempts, retry_backoff)
92
93 def remove(self, key):
94 return self._remove(key)
95
96 def random_filename(self):
97 """Return filename and full-path tuple for file storage.
98
99 Filename will be a randomly generated 28 character hexadecimal string
100 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
101 reduce the size of directories. On older filesystems, lookups in
102 directories with many files may be slow.
103 """
104
105 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
106
107 archive_name = hex_name[4:] + '.archive_cache'
108 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
109
110 full_path = os.path.join(self.directory, filename)
111 return archive_name, full_path
112
113 def __repr__(self):
114 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
115
116
117 class FileSystemFanoutCache(BaseCache):
118
119 def __init__(self, locking_url, **settings):
120 """
121 Initialize file system cache instance.
122
123 :param str locking_url: redis url for a lock
124 :param settings: settings dict
125
126 """
127 self._locking_url = locking_url
128 self._config = settings
129 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
130 directory = str(cache_dir)
131 directory = os.path.expanduser(directory)
132 directory = os.path.expandvars(directory)
133 self._directory = directory
134 self._storage_path = directory
135
136 # check if it's ok to write, and re-create the archive cache
137 if not os.path.isdir(self._directory):
138 os.makedirs(self._directory, exist_ok=True)
139
140 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141
142 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
143 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
144
145 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
146 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
147 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
148
149 log.debug('Initializing archival cache instance under %s', self._directory)
150 self._shards = tuple(
151 FileSystemShard(
152 index=num,
153 directory=os.path.join(directory, 'shard_%03d' % num),
154 **settings,
155 )
156 for num in range(self._count)
157 )
158 self._hash = self._shards[0].hash
159
160 def _get_shard(self, key) -> FileSystemShard:
161 index = self._hash(key) % self._count
162 shard = self._shards[index]
163 return shard
164
165 def _get_size(self, shard, archive_path):
166 return os.stat(archive_path).st_size
@@ -0,0 +1,150 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import codecs
20 import hashlib
21 import logging
22 import os
23
24 import fsspec
25
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
29
30 log = logging.getLogger(__name__)
31
32
33 class S3Shard(BaseShard):
34
35 def __init__(self, index, bucket, **settings):
36 self._index = index
37 self._bucket = bucket
38 self.storage_type = 'bucket'
39
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
44 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
45
46 @property
47 def bucket(self):
48 """Cache bucket."""
49 return self._bucket
50
51 def _get_keyfile(self, archive_key) -> tuple[str, str]:
52 key_file = f'{archive_key}-{self.key_suffix}'
53 return key_file, os.path.join(self.bucket, key_file)
54
55 def _get_writer(self, path, mode):
56 return self.fs.open(path, 'wb')
57
58 def _write_file(self, full_path, iterator, mode):
59 # ensure bucket exists
60 destination = self.bucket
61 if not self.fs.exists(destination):
62 self.fs.mkdir(destination, s3_additional_kwargs={})
63
64 writer = self._get_writer(full_path, mode)
65
66 digest = hashlib.sha256()
67 with writer:
68 size = 0
69 for chunk in iterator:
70 size += len(chunk)
71 digest.update(chunk)
72 writer.write(chunk)
73
74 sha256 = digest.hexdigest()
75 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
76 return size, sha256
77
78 def store(self, key, value_reader, metadata: dict | None = None):
79 return self._store(key, value_reader, metadata, mode='wb')
80
81 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
82 return self._fetch(key, retry, retry_attempts, retry_backoff)
83
84 def remove(self, key):
85 return self._remove(key)
86
87 def random_filename(self):
88 """Return filename and full-path tuple for file storage.
89
90 Filename will be a randomly generated 28 character hexadecimal string
91 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
92 reduce the size of directories. On older filesystems, lookups in
93 directories with many files may be slow.
94 """
95
96 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
97
98 archive_name = hex_name[4:] + '.archive_cache'
99 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
100
101 full_path = os.path.join(self.bucket, filename)
102 return archive_name, full_path
103
104 def __repr__(self):
105 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
106
107
108 class ObjectStoreCache(BaseCache):
109
110 def __init__(self, locking_url, **settings):
111 """
112 Initialize objectstore cache instance.
113
114 :param str locking_url: redis url for a lock
115 :param settings: settings dict
116
117 """
118 self._locking_url = locking_url
119 self._config = settings
120
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 self._storage_path = objectstore_url
123
124 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125
126 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
127 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
128
129 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
130 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
131 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
132
133 log.debug('Initializing archival cache instance under %s', objectstore_url)
134 self._shards = tuple(
135 S3Shard(
136 index=num,
137 bucket='rhodecode-archivecache-%03d' % num,
138 **settings,
139 )
140 for num in range(self._count)
141 )
142 self._hash = self._shards[0].hash
143
144 def _get_shard(self, key) -> S3Shard:
145 index = self._hash(key) % self._count
146 shard = self._shards[index]
147 return shard
148
149 def _get_size(self, shard, archive_path):
150 return shard.fs.info(archive_path)['size']
@@ -0,0 +1,62 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import redis
20 from .._vendor import redis_lock
21
22
23 class ArchiveCacheGenerationLock(Exception):
24 pass
25
26
27 class GenerationLock:
28 """
29 Locking mechanism that detects if a lock is acquired
30
31 with GenerationLock(lock_key):
32 compute_archive()
33 """
34 lock_timeout = 7200
35
36 def __init__(self, lock_key, url):
37 self.lock_key = lock_key
38 self._create_client(url)
39 self.lock = self.get_lock()
40
41 def _create_client(self, url):
42 connection_pool = redis.ConnectionPool.from_url(url)
43 self.writer_client = redis.StrictRedis(
44 connection_pool=connection_pool
45 )
46 self.reader_client = self.writer_client
47
48 def get_lock(self):
49 return redis_lock.Lock(
50 redis_client=self.writer_client,
51 name=self.lock_key,
52 expire=self.lock_timeout,
53 strict=True
54 )
55
56 def __enter__(self):
57 acquired = self.lock.acquire(blocking=False)
58 if not acquired:
59 raise ArchiveCacheGenerationLock('Failed to create a lock')
60
61 def __exit__(self, exc_type, exc_val, exc_tb):
62 self.lock.release()
@@ -0,0 +1,134 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import sqlite3
20 import s3fs.core
21
22 NOT_GIVEN = -917
23
24
25 EVICTION_POLICY = {
26 'none': {
27 'evict': None,
28 },
29 'least-recently-stored': {
30 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
31 },
32 'least-recently-used': {
33 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
34 },
35 'least-frequently-used': {
36 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
37 },
38 }
39
40
41 def archive_iterator(_reader, block_size: int = 4096 * 512):
42 # 4096 * 64 = 64KB
43 while 1:
44 data = _reader.read(block_size)
45 if not data:
46 break
47 yield data
48
49
50 def format_size(size):
51 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
52 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
53 if size < 1024:
54 return f"{size:.2f} {unit}"
55 size /= 1024
56
57
58 class StatsDB:
59
60 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
63
64 def _init_db(self):
65 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
69 key_file_path TEXT,
70 archive_key TEXT,
71 archive_path TEXT,
72 store_time REAL,
73 access_time REAL,
74 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
76 )
77 '''
78
79 self.sql(qry)
80 self.connection.commit()
81
82 @property
83 def sql(self):
84 return self.connection.execute
85
86 def bulk_insert(self, rows):
87 qry = '''
88 INSERT INTO archive_cache (
89 rowid,
90 key_file,
91 key_file_path,
92 archive_key,
93 archive_path,
94 store_time,
95 access_time,
96 access_count,
97 size
98 )
99 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
102 '''
103 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
105 self.connection.commit()
106
107 def get_total_size(self):
108 qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache'
109 ((total_size,),) = self.sql(qry).fetchall()
110 return total_size
111
112 def get_sorted_keys(self, select_policy):
113 select_policy_qry = select_policy.format(fields='key_file, archive_key, size')
114 return self.sql(select_policy_qry).fetchall()
115
116
117 class ShardFileReader:
118
119 def __init__(self, file_like_reader):
120 self._file_like_reader = file_like_reader
121
122 def __getattr__(self, item):
123 if isinstance(self._file_like_reader, s3fs.core.S3File):
124 match item:
125 case 'name':
126 # S3 FileWrapper doesn't support name attribute, and we use it
127 return self._file_like_reader.full_name
128 case _:
129 return getattr(self._file_like_reader, item)
130 else:
131 return getattr(self._file_like_reader, item)
132
133 def __repr__(self):
134 return f'<{self.__class__.__name__}={self._file_like_reader}>'
@@ -1,73 +1,102 b''
1 1 # deps, generated via pipdeptree --exclude setuptools,wheel,pipdeptree,pip -f | tr '[:upper:]' '[:lower:]'
2 2
3 3 async-timeout==4.0.3
4 4 atomicwrites==1.4.1
5 5 celery==5.3.6
6 6 billiard==4.2.0
7 7 click==8.1.3
8 8 click-didyoumean==0.3.0
9 9 click==8.1.3
10 10 click-plugins==1.1.1
11 11 click==8.1.3
12 12 click-repl==0.2.0
13 13 click==8.1.3
14 14 prompt-toolkit==3.0.38
15 15 wcwidth==0.2.6
16 16 six==1.16.0
17 17 kombu==5.3.5
18 18 amqp==5.2.0
19 19 vine==5.1.0
20 20 vine==5.1.0
21 21 python-dateutil==2.8.2
22 22 six==1.16.0
23 23 tzdata==2024.1
24 24 vine==5.1.0
25 25 contextlib2==21.6.0
26 26 dogpile.cache==1.3.3
27 27 decorator==5.1.1
28 28 stevedore==5.1.0
29 29 pbr==5.11.1
30 30 dulwich==0.21.6
31 31 urllib3==1.26.14
32 fsspec==2024.6.0
32 33 gunicorn==21.2.0
33 34 packaging==24.0
34 35 hg-evolve==11.1.3
35 36 importlib-metadata==6.0.0
36 37 zipp==3.15.0
37 38 mercurial==6.7.4
38 39 more-itertools==9.1.0
39 40 msgpack==1.0.8
40 41 orjson==3.10.3
41 42 psutil==5.9.8
42 43 py==1.11.0
43 44 pygit2==1.13.3
44 45 cffi==1.16.0
45 46 pycparser==2.21
46 47 pygments==2.15.1
47 48 pyparsing==3.1.1
48 49 pyramid==2.0.2
49 50 hupper==1.12
50 51 plaster==1.1.2
51 52 plaster-pastedeploy==1.0.1
52 53 pastedeploy==3.1.0
53 54 plaster==1.1.2
54 55 translationstring==1.4
55 56 venusian==3.0.0
56 57 webob==1.8.7
57 58 zope.deprecation==5.0.0
58 59 zope.interface==6.3.0
59 60 redis==5.0.4
60 61 async-timeout==4.0.3
61 62 repoze.lru==0.7
63 s3fs==2024.6.0
64 aiobotocore==2.13.0
65 aiohttp==3.9.5
66 aiosignal==1.3.1
67 frozenlist==1.4.1
68 attrs==22.2.0
69 frozenlist==1.4.1
70 multidict==6.0.5
71 yarl==1.9.4
72 idna==3.4
73 multidict==6.0.5
74 aioitertools==0.11.0
75 botocore==1.34.106
76 jmespath==1.0.1
77 python-dateutil==2.8.2
78 six==1.16.0
79 urllib3==1.26.14
80 wrapt==1.16.0
81 aiohttp==3.9.5
82 aiosignal==1.3.1
83 frozenlist==1.4.1
84 attrs==22.2.0
85 frozenlist==1.4.1
86 multidict==6.0.5
87 yarl==1.9.4
88 idna==3.4
89 multidict==6.0.5
90 fsspec==2024.6.0
62 91 scandir==1.10.0
63 92 setproctitle==1.3.3
64 93 subvertpy==0.11.0
65 94 waitress==3.0.0
66 95 wcwidth==0.2.6
67 96
68 97
69 98 ## test related requirements
70 99 #-r requirements_test.txt
71 100
72 101 ## uncomment to add the debug libraries
73 102 #-r requirements_debug.txt
@@ -1,187 +1,187 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17 import os
18 18 import sys
19 19 import tempfile
20 20 import logging
21 21 import urllib.parse
22 22
23 from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
23 from vcsserver.lib.archive_cache import get_archival_cache_store
24 24
25 25 from vcsserver import exceptions
26 26 from vcsserver.exceptions import NoContentException
27 27 from vcsserver.hgcompat import archival
28 28 from vcsserver.lib.str_utils import safe_bytes
29 29 from vcsserver.lib.exc_tracking import format_exc
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class RepoFactory:
34 34 """
35 35 Utility to create instances of repository
36 36
37 37 It provides internal caching of the `repo` object based on
38 38 the :term:`call context`.
39 39 """
40 40 repo_type = None
41 41
42 42 def __init__(self):
43 43 pass
44 44
45 45 def _create_config(self, path, config):
46 46 config = {}
47 47 return config
48 48
49 49 def _create_repo(self, wire, create):
50 50 raise NotImplementedError()
51 51
52 52 def repo(self, wire, create=False):
53 53 raise NotImplementedError()
54 54
55 55
56 56 def obfuscate_qs(query_string):
57 57 if query_string is None:
58 58 return None
59 59
60 60 parsed = []
61 61 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
62 62 if k in ['auth_token', 'api_key']:
63 63 v = "*****"
64 64 parsed.append((k, v))
65 65
66 66 return '&'.join('{}{}'.format(
67 67 k, f'={v}' if v else '') for k, v in parsed)
68 68
69 69
70 70 def raise_from_original(new_type, org_exc: Exception):
71 71 """
72 72 Raise a new exception type with original args and traceback.
73 73 """
74 74 exc_info = sys.exc_info()
75 75 exc_type, exc_value, exc_traceback = exc_info
76 76 new_exc = new_type(*exc_value.args)
77 77
78 78 # store the original traceback into the new exc
79 79 new_exc._org_exc_tb = format_exc(exc_info)
80 80
81 81 try:
82 82 raise new_exc.with_traceback(exc_traceback)
83 83 finally:
84 84 del exc_traceback
85 85
86 86
87 87 class ArchiveNode:
88 88 def __init__(self, path, mode, is_link, raw_bytes):
89 89 self.path = path
90 90 self.mode = mode
91 91 self.is_link = is_link
92 92 self.raw_bytes = raw_bytes
93 93
94 94
95 95 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
96 96 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
97 97 """
98 98 Function that would store generate archive and send it to a dedicated backend store
99 99 In here we use diskcache
100 100
101 101 :param node_walker: a generator returning nodes to add to archive
102 102 :param archive_key: key used to store the path
103 103 :param kind: archive kind
104 104 :param mtime: time of creation
105 105 :param archive_at_path: default '/' the path at archive was started.
106 106 If this is not '/' it means it's a partial archive
107 107 :param archive_dir_name: inside dir name when creating an archive
108 108 :param commit_id: commit sha of revision archive was created at
109 109 :param write_metadata:
110 110 :param extra_metadata:
111 111 :param cache_config:
112 112
113 113 walker should be a file walker, for example,
114 114 def node_walker():
115 115 for file_info in files:
116 116 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
117 117 """
118 118 extra_metadata = extra_metadata or {}
119 119
120 120 d_cache = get_archival_cache_store(config=cache_config)
121 121
122 122 if archive_key in d_cache:
123 123 reader, metadata = d_cache.fetch(archive_key)
124 124 return reader.name
125 125
126 126 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
127 127 log.debug('Creating new temp archive in %s', archive_tmp_path)
128 128
129 129 if kind == "tgz":
130 130 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
131 131 elif kind == "tbz2":
132 132 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
133 133 elif kind == 'zip':
134 134 archiver = archival.zipit(archive_tmp_path, mtime)
135 135 else:
136 136 raise exceptions.ArchiveException()(
137 137 f'Remote does not support: "{kind}" archive type.')
138 138
139 139 for f in node_walker(commit_id, archive_at_path):
140 140 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
141 141
142 142 try:
143 143 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
144 144 except NoContentException:
145 145 # NOTE(marcink): this is a special case for SVN so we can create "empty"
146 146 # directories which are not supported by archiver
147 147 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
148 148
149 149 metadata = dict([
150 150 ('commit_id', commit_id),
151 151 ('mtime', mtime),
152 152 ])
153 153 metadata.update(extra_metadata)
154 154 if write_metadata:
155 155 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
156 156 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
157 157 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
158 158
159 159 archiver.done()
160 160
161 161 with open(archive_tmp_path, 'rb') as archive_file:
162 162 add_result = d_cache.store(archive_key, archive_file, metadata=metadata)
163 163 if not add_result:
164 164 log.error('Failed to store cache for key=%s', archive_key)
165 165
166 166 os.remove(archive_tmp_path)
167 167
168 168 reader, metadata = d_cache.fetch(archive_key)
169 169
170 170 return reader.name
171 171
172 172
173 173 class BinaryEnvelope:
174 174 def __init__(self, val):
175 175 self.val = val
176 176
177 177
178 178 class BytesEnvelope(bytes):
179 179 def __new__(cls, content):
180 180 if isinstance(content, bytes):
181 181 return super().__new__(cls, content)
182 182 else:
183 183 raise TypeError('BytesEnvelope content= param must be bytes. Use BinaryEnvelope to wrap other types')
184 184
185 185
186 186 class BinaryBytesEnvelope(BytesEnvelope):
187 187 pass
@@ -1,774 +1,774 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import os
20 20 import platform
21 21 import sys
22 22 import locale
23 23 import logging
24 24 import uuid
25 25 import time
26 26 import wsgiref.util
27 27 import tempfile
28 28 import psutil
29 29
30 30 from itertools import chain
31 31
32 32 import msgpack
33 33 import configparser
34 34
35 35 from pyramid.config import Configurator
36 36 from pyramid.wsgi import wsgiapp
37 37 from pyramid.response import Response
38 38
39 39 from vcsserver.base import BytesEnvelope, BinaryEnvelope
40 40 from vcsserver.lib.ext_json import json
41 41 from vcsserver.config.settings_maker import SettingsMaker
42 42 from vcsserver.lib.str_utils import safe_int
43 43 from vcsserver.lib.statsd_client import StatsdClient
44 44 from vcsserver.tweens.request_wrapper import get_headers_call_context
45 45
46 46 import vcsserver
47 47 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
48 48 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
49 49 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
50 50 from vcsserver.echo_stub.echo_app import EchoApp
51 51 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
52 52 from vcsserver.lib.exc_tracking import store_exception, format_exc
53 53 from vcsserver.server import VcsServer
54 54
55 55 strict_vcs = True
56 56
57 57 git_import_err = None
58 58 try:
59 59 from vcsserver.remote.git_remote import GitFactory, GitRemote
60 60 except ImportError as e:
61 61 GitFactory = None
62 62 GitRemote = None
63 63 git_import_err = e
64 64 if strict_vcs:
65 65 raise
66 66
67 67
68 68 hg_import_err = None
69 69 try:
70 70 from vcsserver.remote.hg_remote import MercurialFactory, HgRemote
71 71 except ImportError as e:
72 72 MercurialFactory = None
73 73 HgRemote = None
74 74 hg_import_err = e
75 75 if strict_vcs:
76 76 raise
77 77
78 78
79 79 svn_import_err = None
80 80 try:
81 81 from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote
82 82 except ImportError as e:
83 83 SubversionFactory = None
84 84 SvnRemote = None
85 85 svn_import_err = e
86 86 if strict_vcs:
87 87 raise
88 88
89 89 log = logging.getLogger(__name__)
90 90
91 91 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
92 92 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
93 93
94 94 try:
95 95 locale.setlocale(locale.LC_ALL, '')
96 96 except locale.Error as e:
97 97 log.error(
98 98 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
99 99 os.environ['LC_ALL'] = 'C'
100 100
101 101
102 102 def _is_request_chunked(environ):
103 103 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
104 104 return stream
105 105
106 106
107 107 def log_max_fd():
108 108 try:
109 109 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
110 110 log.info('Max file descriptors value: %s', maxfd)
111 111 except Exception:
112 112 pass
113 113
114 114
115 115 class VCS:
116 116 def __init__(self, locale_conf=None, cache_config=None):
117 117 self.locale = locale_conf
118 118 self.cache_config = cache_config
119 119 self._configure_locale()
120 120
121 121 log_max_fd()
122 122
123 123 if GitFactory and GitRemote:
124 124 git_factory = GitFactory()
125 125 self._git_remote = GitRemote(git_factory)
126 126 else:
127 127 log.error("Git client import failed: %s", git_import_err)
128 128
129 129 if MercurialFactory and HgRemote:
130 130 hg_factory = MercurialFactory()
131 131 self._hg_remote = HgRemote(hg_factory)
132 132 else:
133 133 log.error("Mercurial client import failed: %s", hg_import_err)
134 134
135 135 if SubversionFactory and SvnRemote:
136 136 svn_factory = SubversionFactory()
137 137
138 138 # hg factory is used for svn url validation
139 139 hg_factory = MercurialFactory()
140 140 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
141 141 else:
142 142 log.error("Subversion client import failed: %s", svn_import_err)
143 143
144 144 self._vcsserver = VcsServer()
145 145
146 146 def _configure_locale(self):
147 147 if self.locale:
148 148 log.info('Settings locale: `LC_ALL` to %s', self.locale)
149 149 else:
150 150 log.info('Configuring locale subsystem based on environment variables')
151 151 try:
152 152 # If self.locale is the empty string, then the locale
153 153 # module will use the environment variables. See the
154 154 # documentation of the package `locale`.
155 155 locale.setlocale(locale.LC_ALL, self.locale)
156 156
157 157 language_code, encoding = locale.getlocale()
158 158 log.info(
159 159 'Locale set to language code "%s" with encoding "%s".',
160 160 language_code, encoding)
161 161 except locale.Error:
162 162 log.exception('Cannot set locale, not configuring the locale system')
163 163
164 164
165 165 class WsgiProxy:
166 166 def __init__(self, wsgi):
167 167 self.wsgi = wsgi
168 168
169 169 def __call__(self, environ, start_response):
170 170 input_data = environ['wsgi.input'].read()
171 171 input_data = msgpack.unpackb(input_data)
172 172
173 173 error = None
174 174 try:
175 175 data, status, headers = self.wsgi.handle(
176 176 input_data['environment'], input_data['input_data'],
177 177 *input_data['args'], **input_data['kwargs'])
178 178 except Exception as e:
179 179 data, status, headers = [], None, None
180 180 error = {
181 181 'message': str(e),
182 182 '_vcs_kind': getattr(e, '_vcs_kind', None)
183 183 }
184 184
185 185 start_response(200, {})
186 186 return self._iterator(error, status, headers, data)
187 187
188 188 def _iterator(self, error, status, headers, data):
189 189 initial_data = [
190 190 error,
191 191 status,
192 192 headers,
193 193 ]
194 194
195 195 for d in chain(initial_data, data):
196 196 yield msgpack.packb(d)
197 197
198 198
199 199 def not_found(request):
200 200 return {'status': '404 NOT FOUND'}
201 201
202 202
203 203 class VCSViewPredicate:
204 204 def __init__(self, val, config):
205 205 self.remotes = val
206 206
207 207 def text(self):
208 208 return f'vcs view method = {list(self.remotes.keys())}'
209 209
210 210 phash = text
211 211
212 212 def __call__(self, context, request):
213 213 """
214 214 View predicate that returns true if given backend is supported by
215 215 defined remotes.
216 216 """
217 217 backend = request.matchdict.get('backend')
218 218 return backend in self.remotes
219 219
220 220
221 221 class HTTPApplication:
222 222 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
223 223
224 224 remote_wsgi = remote_wsgi
225 225 _use_echo_app = False
226 226
227 227 def __init__(self, settings=None, global_config=None):
228 228
229 229 self.config = Configurator(settings=settings)
230 230 # Init our statsd at very start
231 231 self.config.registry.statsd = StatsdClient.statsd
232 232 self.config.registry.vcs_call_context = {}
233 233
234 234 self.global_config = global_config
235 235 self.config.include('vcsserver.lib.rc_cache')
236 self.config.include('vcsserver.lib.rc_cache.archive_cache')
236 self.config.include('vcsserver.lib.archive_cache')
237 237
238 238 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
239 239 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
240 240 self._remotes = {
241 241 'hg': vcs._hg_remote,
242 242 'git': vcs._git_remote,
243 243 'svn': vcs._svn_remote,
244 244 'server': vcs._vcsserver,
245 245 }
246 246 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
247 247 self._use_echo_app = True
248 248 log.warning("Using EchoApp for VCS operations.")
249 249 self.remote_wsgi = remote_wsgi_stub
250 250
251 251 self._configure_settings(global_config, settings)
252 252
253 253 self._configure()
254 254
255 255 def _configure_settings(self, global_config, app_settings):
256 256 """
257 257 Configure the settings module.
258 258 """
259 259 settings_merged = global_config.copy()
260 260 settings_merged.update(app_settings)
261 261
262 262 binary_dir = app_settings['core.binary_dir']
263 263
264 264 settings.BINARY_DIR = binary_dir
265 265
266 266 # Store the settings to make them available to other modules.
267 267 vcsserver.PYRAMID_SETTINGS = settings_merged
268 268 vcsserver.CONFIG = settings_merged
269 269
270 270 def _configure(self):
271 271 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
272 272
273 273 self.config.add_route('service', '/_service')
274 274 self.config.add_route('status', '/status')
275 275 self.config.add_route('hg_proxy', '/proxy/hg')
276 276 self.config.add_route('git_proxy', '/proxy/git')
277 277
278 278 # rpc methods
279 279 self.config.add_route('vcs', '/{backend}')
280 280
281 281 # streaming rpc remote methods
282 282 self.config.add_route('vcs_stream', '/{backend}/stream')
283 283
284 284 # vcs operations clone/push as streaming
285 285 self.config.add_route('stream_git', '/stream/git/*repo_name')
286 286 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
287 287
288 288 self.config.add_view(self.status_view, route_name='status', renderer='json')
289 289 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
290 290
291 291 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
292 292 self.config.add_view(self.git_proxy(), route_name='git_proxy')
293 293 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
294 294 vcs_view=self._remotes)
295 295 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
296 296 vcs_view=self._remotes)
297 297
298 298 self.config.add_view(self.hg_stream(), route_name='stream_hg')
299 299 self.config.add_view(self.git_stream(), route_name='stream_git')
300 300
301 301 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
302 302
303 303 self.config.add_notfound_view(not_found, renderer='json')
304 304
305 305 self.config.add_view(self.handle_vcs_exception, context=Exception)
306 306
307 307 self.config.add_tween(
308 308 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
309 309 )
310 310 self.config.add_request_method(
311 311 'vcsserver.lib.request_counter.get_request_counter',
312 312 'request_count')
313 313
314 314 def wsgi_app(self):
315 315 return self.config.make_wsgi_app()
316 316
317 317 def _vcs_view_params(self, request):
318 318 remote = self._remotes[request.matchdict['backend']]
319 319 payload = msgpack.unpackb(request.body, use_list=True)
320 320
321 321 method = payload.get('method')
322 322 params = payload['params']
323 323 wire = params.get('wire')
324 324 args = params.get('args')
325 325 kwargs = params.get('kwargs')
326 326 context_uid = None
327 327
328 328 request.registry.vcs_call_context = {
329 329 'method': method,
330 330 'repo_name': payload.get('_repo_name'),
331 331 }
332 332
333 333 if wire:
334 334 try:
335 335 wire['context'] = context_uid = uuid.UUID(wire['context'])
336 336 except KeyError:
337 337 pass
338 338 args.insert(0, wire)
339 339 repo_state_uid = wire.get('repo_state_uid') if wire else None
340 340
341 341 # NOTE(marcink): trading complexity for slight performance
342 342 if log.isEnabledFor(logging.DEBUG):
343 343 # also we SKIP printing out any of those methods args since they maybe excessive
344 344 just_args_methods = {
345 345 'commitctx': ('content', 'removed', 'updated'),
346 346 'commit': ('content', 'removed', 'updated')
347 347 }
348 348 if method in just_args_methods:
349 349 skip_args = just_args_methods[method]
350 350 call_args = ''
351 351 call_kwargs = {}
352 352 for k in kwargs:
353 353 if k in skip_args:
354 354 # replace our skip key with dummy
355 355 call_kwargs[k] = f'RemovedParam({k})'
356 356 else:
357 357 call_kwargs[k] = kwargs[k]
358 358 else:
359 359 call_args = args[1:]
360 360 call_kwargs = kwargs
361 361
362 362 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
363 363 method, call_args, call_kwargs, context_uid, repo_state_uid)
364 364
365 365 statsd = request.registry.statsd
366 366 if statsd:
367 367 statsd.incr(
368 368 'vcsserver_method_total', tags=[
369 369 f"method:{method}",
370 370 ])
371 371 return payload, remote, method, args, kwargs
372 372
373 373 def vcs_view(self, request):
374 374
375 375 payload, remote, method, args, kwargs = self._vcs_view_params(request)
376 376 payload_id = payload.get('id')
377 377
378 378 try:
379 379 resp = getattr(remote, method)(*args, **kwargs)
380 380 except Exception as e:
381 381 exc_info = list(sys.exc_info())
382 382 exc_type, exc_value, exc_traceback = exc_info
383 383
384 384 org_exc = getattr(e, '_org_exc', None)
385 385 org_exc_name = None
386 386 org_exc_tb = ''
387 387 if org_exc:
388 388 org_exc_name = org_exc.__class__.__name__
389 389 org_exc_tb = getattr(e, '_org_exc_tb', '')
390 390 # replace our "faked" exception with our org
391 391 exc_info[0] = org_exc.__class__
392 392 exc_info[1] = org_exc
393 393
394 394 should_store_exc = True
395 395 if org_exc:
396 396 def get_exc_fqn(_exc_obj):
397 397 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
398 398 return module_name + '.' + org_exc_name
399 399
400 400 exc_fqn = get_exc_fqn(org_exc)
401 401
402 402 if exc_fqn in ['mercurial.error.RepoLookupError',
403 403 'vcsserver.exceptions.RefNotFoundException']:
404 404 should_store_exc = False
405 405
406 406 if should_store_exc:
407 407 store_exception(id(exc_info), exc_info, request_path=request.path)
408 408
409 409 tb_info = format_exc(exc_info)
410 410
411 411 type_ = e.__class__.__name__
412 412 if type_ not in self.ALLOWED_EXCEPTIONS:
413 413 type_ = None
414 414
415 415 resp = {
416 416 'id': payload_id,
417 417 'error': {
418 418 'message': str(e),
419 419 'traceback': tb_info,
420 420 'org_exc': org_exc_name,
421 421 'org_exc_tb': org_exc_tb,
422 422 'type': type_
423 423 }
424 424 }
425 425
426 426 try:
427 427 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
428 428 except AttributeError:
429 429 pass
430 430 else:
431 431 resp = {
432 432 'id': payload_id,
433 433 'result': resp
434 434 }
435 435 log.debug('Serving data for method %s', method)
436 436 return resp
437 437
438 438 def vcs_stream_view(self, request):
439 439 payload, remote, method, args, kwargs = self._vcs_view_params(request)
440 440 # this method has a stream: marker we remove it here
441 441 method = method.split('stream:')[-1]
442 442 chunk_size = safe_int(payload.get('chunk_size')) or 4096
443 443
444 444 resp = getattr(remote, method)(*args, **kwargs)
445 445
446 446 def get_chunked_data(method_resp):
447 447 stream = io.BytesIO(method_resp)
448 448 while 1:
449 449 chunk = stream.read(chunk_size)
450 450 if not chunk:
451 451 break
452 452 yield chunk
453 453
454 454 response = Response(app_iter=get_chunked_data(resp))
455 455 response.content_type = 'application/octet-stream'
456 456
457 457 return response
458 458
459 459 def status_view(self, request):
460 460 import vcsserver
461 461 _platform_id = platform.uname()[1] or 'instance'
462 462
463 463 return {
464 464 "status": "OK",
465 465 "vcsserver_version": vcsserver.get_version(),
466 466 "platform": _platform_id,
467 467 "pid": os.getpid(),
468 468 }
469 469
470 470 def service_view(self, request):
471 471 import vcsserver
472 472
473 473 payload = msgpack.unpackb(request.body, use_list=True)
474 474 server_config, app_config = {}, {}
475 475
476 476 try:
477 477 path = self.global_config['__file__']
478 478 config = configparser.RawConfigParser()
479 479
480 480 config.read(path)
481 481
482 482 if config.has_section('server:main'):
483 483 server_config = dict(config.items('server:main'))
484 484 if config.has_section('app:main'):
485 485 app_config = dict(config.items('app:main'))
486 486
487 487 except Exception:
488 488 log.exception('Failed to read .ini file for display')
489 489
490 490 environ = list(os.environ.items())
491 491
492 492 resp = {
493 493 'id': payload.get('id'),
494 494 'result': dict(
495 495 version=vcsserver.get_version(),
496 496 config=server_config,
497 497 app_config=app_config,
498 498 environ=environ,
499 499 payload=payload,
500 500 )
501 501 }
502 502 return resp
503 503
504 504 def _msgpack_renderer_factory(self, info):
505 505
506 506 def _render(value, system):
507 507 bin_type = False
508 508 res = value.get('result')
509 509 if isinstance(res, BytesEnvelope):
510 510 log.debug('Result is wrapped in BytesEnvelope type')
511 511 bin_type = True
512 512 elif isinstance(res, BinaryEnvelope):
513 513 log.debug('Result is wrapped in BinaryEnvelope type')
514 514 value['result'] = res.val
515 515 bin_type = True
516 516
517 517 request = system.get('request')
518 518 if request is not None:
519 519 response = request.response
520 520 ct = response.content_type
521 521 if ct == response.default_content_type:
522 522 response.content_type = 'application/x-msgpack'
523 523 if bin_type:
524 524 response.content_type = 'application/x-msgpack-bin'
525 525
526 526 return msgpack.packb(value, use_bin_type=bin_type)
527 527 return _render
528 528
529 529 def set_env_from_config(self, environ, config):
530 530 dict_conf = {}
531 531 try:
532 532 for elem in config:
533 533 if elem[0] == 'rhodecode':
534 534 dict_conf = json.loads(elem[2])
535 535 break
536 536 except Exception:
537 537 log.exception('Failed to fetch SCM CONFIG')
538 538 return
539 539
540 540 username = dict_conf.get('username')
541 541 if username:
542 542 environ['REMOTE_USER'] = username
543 543 # mercurial specific, some extension api rely on this
544 544 environ['HGUSER'] = username
545 545
546 546 ip = dict_conf.get('ip')
547 547 if ip:
548 548 environ['REMOTE_HOST'] = ip
549 549
550 550 if _is_request_chunked(environ):
551 551 # set the compatibility flag for webob
552 552 environ['wsgi.input_terminated'] = True
553 553
554 554 def hg_proxy(self):
555 555 @wsgiapp
556 556 def _hg_proxy(environ, start_response):
557 557 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
558 558 return app(environ, start_response)
559 559 return _hg_proxy
560 560
561 561 def git_proxy(self):
562 562 @wsgiapp
563 563 def _git_proxy(environ, start_response):
564 564 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
565 565 return app(environ, start_response)
566 566 return _git_proxy
567 567
568 568 def hg_stream(self):
569 569 if self._use_echo_app:
570 570 @wsgiapp
571 571 def _hg_stream(environ, start_response):
572 572 app = EchoApp('fake_path', 'fake_name', None)
573 573 return app(environ, start_response)
574 574 return _hg_stream
575 575 else:
576 576 @wsgiapp
577 577 def _hg_stream(environ, start_response):
578 578 log.debug('http-app: handling hg stream')
579 579 call_context = get_headers_call_context(environ)
580 580
581 581 repo_path = call_context['repo_path']
582 582 repo_name = call_context['repo_name']
583 583 config = call_context['repo_config']
584 584
585 585 app = scm_app.create_hg_wsgi_app(
586 586 repo_path, repo_name, config)
587 587
588 588 # Consistent path information for hgweb
589 589 environ['PATH_INFO'] = call_context['path_info']
590 590 environ['REPO_NAME'] = repo_name
591 591 self.set_env_from_config(environ, config)
592 592
593 593 log.debug('http-app: starting app handler '
594 594 'with %s and process request', app)
595 595 return app(environ, ResponseFilter(start_response))
596 596 return _hg_stream
597 597
598 598 def git_stream(self):
599 599 if self._use_echo_app:
600 600 @wsgiapp
601 601 def _git_stream(environ, start_response):
602 602 app = EchoApp('fake_path', 'fake_name', None)
603 603 return app(environ, start_response)
604 604 return _git_stream
605 605 else:
606 606 @wsgiapp
607 607 def _git_stream(environ, start_response):
608 608 log.debug('http-app: handling git stream')
609 609
610 610 call_context = get_headers_call_context(environ)
611 611
612 612 repo_path = call_context['repo_path']
613 613 repo_name = call_context['repo_name']
614 614 config = call_context['repo_config']
615 615
616 616 environ['PATH_INFO'] = call_context['path_info']
617 617 self.set_env_from_config(environ, config)
618 618
619 619 content_type = environ.get('CONTENT_TYPE', '')
620 620
621 621 path = environ['PATH_INFO']
622 622 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
623 623 log.debug(
624 624 'LFS: Detecting if request `%s` is LFS server path based '
625 625 'on content type:`%s`, is_lfs:%s',
626 626 path, content_type, is_lfs_request)
627 627
628 628 if not is_lfs_request:
629 629 # fallback detection by path
630 630 if GIT_LFS_PROTO_PAT.match(path):
631 631 is_lfs_request = True
632 632 log.debug(
633 633 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
634 634 path, is_lfs_request)
635 635
636 636 if is_lfs_request:
637 637 app = scm_app.create_git_lfs_wsgi_app(
638 638 repo_path, repo_name, config)
639 639 else:
640 640 app = scm_app.create_git_wsgi_app(
641 641 repo_path, repo_name, config)
642 642
643 643 log.debug('http-app: starting app handler '
644 644 'with %s and process request', app)
645 645
646 646 return app(environ, start_response)
647 647
648 648 return _git_stream
649 649
650 650 def handle_vcs_exception(self, exception, request):
651 651 _vcs_kind = getattr(exception, '_vcs_kind', '')
652 652
653 653 if _vcs_kind == 'repo_locked':
654 654 headers_call_context = get_headers_call_context(request.environ)
655 655 status_code = safe_int(headers_call_context['locked_status_code'])
656 656
657 657 return HTTPRepoLocked(
658 658 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
659 659
660 660 elif _vcs_kind == 'repo_branch_protected':
661 661 # Get custom repo-branch-protected status code if present.
662 662 return HTTPRepoBranchProtected(
663 663 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
664 664
665 665 exc_info = request.exc_info
666 666 store_exception(id(exc_info), exc_info)
667 667
668 668 traceback_info = 'unavailable'
669 669 if request.exc_info:
670 670 traceback_info = format_exc(request.exc_info)
671 671
672 672 log.error(
673 673 'error occurred handling this request for path: %s, \n%s',
674 674 request.path, traceback_info)
675 675
676 676 statsd = request.registry.statsd
677 677 if statsd:
678 678 exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
679 679 statsd.incr('vcsserver_exception_total',
680 680 tags=[f"type:{exc_type}"])
681 681 raise exception
682 682
683 683
684 684 class ResponseFilter:
685 685
686 686 def __init__(self, start_response):
687 687 self._start_response = start_response
688 688
689 689 def __call__(self, status, response_headers, exc_info=None):
690 690 headers = tuple(
691 691 (h, v) for h, v in response_headers
692 692 if not wsgiref.util.is_hop_by_hop(h))
693 693 return self._start_response(status, headers, exc_info)
694 694
695 695
696 696 def sanitize_settings_and_apply_defaults(global_config, settings):
697 697 _global_settings_maker = SettingsMaker(global_config)
698 698 settings_maker = SettingsMaker(settings)
699 699
700 700 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
701 701
702 702 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
703 703 settings_maker.enable_logging(logging_conf)
704 704
705 705 # Default includes, possible to change as a user
706 706 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
707 707 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
708 708
709 709 settings_maker.make_setting('__file__', global_config.get('__file__'))
710 710
711 711 settings_maker.make_setting('pyramid.default_locale_name', 'en')
712 712 settings_maker.make_setting('locale', 'en_US.UTF-8')
713 713
714 714 settings_maker.make_setting(
715 715 'core.binary_dir', '/usr/local/bin/rhodecode_bin/vcs_bin',
716 716 default_when_empty=True, parser='string:noquote')
717 717
718 718 temp_store = tempfile.gettempdir()
719 719 default_cache_dir = os.path.join(temp_store, 'rc_cache')
720 720 # save default, cache dir, and use it for all backends later.
721 721 default_cache_dir = settings_maker.make_setting(
722 722 'cache_dir',
723 723 default=default_cache_dir, default_when_empty=True,
724 724 parser='dir:ensured')
725 725
726 726 # exception store cache
727 727 settings_maker.make_setting(
728 728 'exception_tracker.store_path',
729 729 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
730 730 parser='dir:ensured'
731 731 )
732 732
733 733 # repo_object cache defaults
734 734 settings_maker.make_setting(
735 735 'rc_cache.repo_object.backend',
736 736 default='dogpile.cache.rc.file_namespace',
737 737 parser='string')
738 738 settings_maker.make_setting(
739 739 'rc_cache.repo_object.expiration_time',
740 740 default=30 * 24 * 60 * 60, # 30days
741 741 parser='int')
742 742 settings_maker.make_setting(
743 743 'rc_cache.repo_object.arguments.filename',
744 744 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
745 745 parser='string')
746 746
747 747 # statsd
748 748 settings_maker.make_setting('statsd.enabled', False, parser='bool')
749 749 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
750 750 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
751 751 settings_maker.make_setting('statsd.statsd_prefix', '')
752 752 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
753 753
754 754 settings_maker.env_expand()
755 755
756 756
757 757 def main(global_config, **settings):
758 758 start_time = time.time()
759 759 log.info('Pyramid app config starting')
760 760
761 761 if MercurialFactory:
762 762 hgpatches.patch_largefiles_capabilities()
763 763 hgpatches.patch_subrepo_type_mapping()
764 764
765 765 # Fill in and sanitize the defaults & do ENV expansion
766 766 sanitize_settings_and_apply_defaults(global_config, settings)
767 767
768 768 # init and bootstrap StatsdClient
769 769 StatsdClient.setup(settings)
770 770
771 771 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
772 772 total_time = time.time() - start_time
773 773 log.info('Pyramid app created and configured in %.2fs', total_time)
774 774 return pyramid_app
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now