##// END OF EJS Templates
feat(archive-cache): synced with CE codebase
super-admin -
r1251:f8e66197 default
parent child Browse files
Show More
@@ -0,0 +1,79 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import logging
20
21 from .backends.fanout_cache import FileSystemFanoutCache
22 from .backends.objectstore_cache import ObjectStoreCache
23
24 from .utils import archive_iterator # noqa
25 from .lock import ArchiveCacheGenerationLock # noqa
26
27 log = logging.getLogger(__name__)
28
29
30 cache_meta = None
31
32
33 def includeme(config):
34 return # vcsserver gets its config from rhodecode on a remote call
35 # init our cache at start
36 settings = config.get_settings()
37 get_archival_cache_store(settings)
38
39
40 def get_archival_config(config):
41
42 final_config = {
43
44 }
45
46 for k, v in config.items():
47 if k.startswith('archive_cache'):
48 final_config[k] = v
49
50 return final_config
51
52
53 def get_archival_cache_store(config, always_init=False):
54
55 global cache_meta
56 if cache_meta is not None and not always_init:
57 return cache_meta
58
59 config = get_archival_config(config)
60 backend = config['archive_cache.backend.type']
61
62 archive_cache_locking_url = config['archive_cache.locking.url']
63
64 match backend:
65 case 'filesystem':
66 d_cache = FileSystemFanoutCache(
67 locking_url=archive_cache_locking_url,
68 **config
69 )
70 case 'objectstore':
71 d_cache = ObjectStoreCache(
72 locking_url=archive_cache_locking_url,
73 **config
74 )
75 case _:
76 raise ValueError(f'archive_cache.backend.type only supports "filesystem" or "objectstore" got {backend} ')
77
78 cache_meta = d_cache
79 return cache_meta
@@ -0,0 +1,17 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
@@ -0,0 +1,348 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import os
20 import functools
21 import logging
22 import typing
23 import time
24 import zlib
25
26 from ...ext_json import json
27 from ..utils import StatsDB, NOT_GIVEN, ShardFileReader, EVICTION_POLICY, format_size
28 from ..lock import GenerationLock
29
30 log = logging.getLogger(__name__)
31
32
33 class BaseShard:
34 storage_type: str = ''
35 fs = None
36
37 @classmethod
38 def hash(cls, key):
39 """Compute portable hash for `key`.
40
41 :param key: key to hash
42 :return: hash value
43
44 """
45 mask = 0xFFFFFFFF
46 return zlib.adler32(key.encode('utf-8')) & mask # noqa
47
48 def _write_file(self, full_path, read_iterator, mode):
49 raise NotImplementedError
50
51 def _get_keyfile(self, key):
52 raise NotImplementedError
53
54 def random_filename(self):
55 raise NotImplementedError
56
57 def _store(self, key, value_reader, metadata, mode):
58 (filename, # hash-name
59 full_path # full-path/hash-name
60 ) = self.random_filename()
61
62 key_file, key_file_path = self._get_keyfile(key)
63
64 # STORE METADATA
65 _metadata = {
66 "version": "v1",
67
68 "key_file": key_file, # this is the .key.json file storing meta
69 "key_file_path": key_file_path, # full path to key_file
70 "archive_key": key, # original name we stored archive under, e.g my-archive.zip
71 "archive_filename": filename, # the actual filename we stored that file under
72 "archive_full_path": full_path,
73
74 "store_time": time.time(),
75 "access_count": 0,
76 "access_time": 0,
77
78 "size": 0
79 }
80 if metadata:
81 _metadata.update(metadata)
82
83 read_iterator = iter(functools.partial(value_reader.read, 2**22), b'')
84 size, sha256 = self._write_file(full_path, read_iterator, mode)
85 _metadata['size'] = size
86 _metadata['sha256'] = sha256
87
88 # after archive is finished, we create a key to save the presence of the binary file
89 with self.fs.open(key_file_path, 'wb') as f:
90 f.write(json.dumps(_metadata))
91
92 return key, filename, size, _metadata
93
94 def _fetch(self, key, retry, retry_attempts, retry_backoff):
95 if retry is NOT_GIVEN:
96 retry = False
97 if retry_attempts is NOT_GIVEN:
98 retry_attempts = 0
99
100 if retry and retry_attempts > 0:
101 for attempt in range(1, retry_attempts + 1):
102 if key in self:
103 break
104 # we didn't find the key, wait retry_backoff N seconds, and re-check
105 time.sleep(retry_backoff)
106
107 if key not in self:
108 log.exception(f'requested key={key} not found in {self} retry={retry}, attempts={retry_attempts}')
109 raise KeyError(key)
110
111 key_file, key_file_path = self._get_keyfile(key)
112 with self.fs.open(key_file_path, 'rb') as f:
113 metadata = json.loads(f.read())
114
115 archive_path = metadata['archive_full_path']
116
117 try:
118 return ShardFileReader(self.fs.open(archive_path, 'rb')), metadata
119 finally:
120 # update usage stats, count and accessed
121 metadata["access_count"] = metadata.get("access_count", 0) + 1
122 metadata["access_time"] = time.time()
123 log.debug('Updated %s with access snapshot, access_count=%s access_time=%s',
124 key_file, metadata['access_count'], metadata['access_time'])
125 with self.fs.open(key_file_path, 'wb') as f:
126 f.write(json.dumps(metadata))
127
128 def _remove(self, key):
129 if key not in self:
130 log.exception(f'requested key={key} not found in {self}')
131 raise KeyError(key)
132
133 key_file, key_file_path = self._get_keyfile(key)
134 with self.fs.open(key_file_path, 'rb') as f:
135 metadata = json.loads(f.read())
136
137 archive_path = metadata['archive_full_path']
138 self.fs.rm(archive_path)
139 self.fs.rm(key_file_path)
140 return 1
141
142 @property
143 def storage_medium(self):
144 return getattr(self, self.storage_type)
145
146 @property
147 def key_suffix(self):
148 return 'key.json'
149
150 def __contains__(self, key):
151 """Return `True` if `key` matching item is found in cache.
152
153 :param key: key matching item
154 :return: True if key matching item
155
156 """
157 key_file, key_file_path = self._get_keyfile(key)
158 return self.fs.exists(key_file_path)
159
160
161 class BaseCache:
162 _locking_url: str = ''
163 _storage_path: str = ''
164 _config = {}
165 retry = False
166 retry_attempts = 0
167 retry_backoff = 1
168 _shards = tuple()
169
170 def __contains__(self, key):
171 """Return `True` if `key` matching item is found in cache.
172
173 :param key: key matching item
174 :return: True if key matching item
175
176 """
177 return self.has_key(key)
178
179 def __repr__(self):
180 return f'<{self.__class__.__name__}(storage={self._storage_path})>'
181
182 @classmethod
183 def gb_to_bytes(cls, gb):
184 return gb * (1024 ** 3)
185
186 @property
187 def storage_path(self):
188 return self._storage_path
189
190 @classmethod
191 def get_stats_db(cls):
192 return StatsDB()
193
194 def get_conf(self, key, pop=False):
195 if key not in self._config:
196 raise ValueError(f"No configuration key '{key}', please make sure it exists in archive_cache config")
197 val = self._config[key]
198 if pop:
199 del self._config[key]
200 return val
201
202 def _get_shard(self, key):
203 raise NotImplementedError
204
205 def _get_size(self, shard, archive_path):
206 raise NotImplementedError
207
208 def store(self, key, value_reader, metadata=None):
209 shard = self._get_shard(key)
210 return shard.store(key, value_reader, metadata)
211
212 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN) -> tuple[typing.BinaryIO, dict]:
213 """
214 Return file handle corresponding to `key` from specific shard cache.
215 """
216 if retry is NOT_GIVEN:
217 retry = self.retry
218 if retry_attempts is NOT_GIVEN:
219 retry_attempts = self.retry_attempts
220 retry_backoff = self.retry_backoff
221
222 shard = self._get_shard(key)
223 return shard.fetch(key, retry=retry, retry_attempts=retry_attempts, retry_backoff=retry_backoff)
224
225 def remove(self, key):
226 shard = self._get_shard(key)
227 return shard.remove(key)
228
229 def has_key(self, archive_key):
230 """Return `True` if `key` matching item is found in cache.
231
232 :param archive_key: key for item, this is a unique archive name we want to store data under. e.g my-archive-svn.zip
233 :return: True if key is found
234
235 """
236 shard = self._get_shard(archive_key)
237 return archive_key in shard
238
239 def iter_keys(self):
240 for shard in self._shards:
241 if shard.fs.exists(shard.storage_medium):
242 for path, _dirs, _files in shard.fs.walk(shard.storage_medium):
243 for key_file_path in _files:
244 if key_file_path.endswith(shard.key_suffix):
245 yield shard, key_file_path
246
247 def get_lock(self, lock_key):
248 return GenerationLock(lock_key, self._locking_url)
249
250 def evict(self, policy=None, size_limit=None) -> int:
251 """
252 Remove old items based on the conditions
253
254
255 explanation of this algo:
256 iterate over each shard, then for each shard iterate over the .key files
257 read the key files metadata stored. This gives us a full list of keys, cached_archived, their size and
258 access data, time creation, and access counts.
259
260 Store that into a memory DB so we can run different sorting strategies easily.
261 Summing the size is a sum sql query.
262
263 Then we run a sorting strategy based on eviction policy.
264 We iterate over sorted keys, and remove each checking if we hit the overall limit.
265 """
266
267 policy = policy or self._eviction_policy
268 size_limit = size_limit or self._cache_size_limit
269
270 select_policy = EVICTION_POLICY[policy]['evict']
271
272 log.debug('Running eviction policy \'%s\', and checking for size limit: %s',
273 policy, format_size(size_limit))
274
275 if select_policy is None:
276 return 0
277
278 db = self.get_stats_db()
279
280 data = []
281 cnt = 1
282
283 for shard, key_file in self.iter_keys():
284 with shard.fs.open(os.path.join(shard.storage_medium, key_file), 'rb') as f:
285 metadata = json.loads(f.read())
286
287 key_file_path = os.path.join(shard.storage_medium, key_file)
288
289 archive_key = metadata['archive_key']
290 archive_path = metadata['archive_full_path']
291
292 size = metadata.get('size')
293 if not size:
294 # in case we don't have size re-calc it...
295 size = self._get_size(shard, archive_path)
296
297 data.append([
298 cnt,
299 key_file,
300 key_file_path,
301 archive_key,
302 archive_path,
303 metadata.get('store_time', 0),
304 metadata.get('access_time', 0),
305 metadata.get('access_count', 0),
306 size,
307 ])
308 cnt += 1
309
310 # Insert bulk data using executemany
311 db.bulk_insert(data)
312
313 total_size = db.get_total_size()
314 log.debug('Analyzed %s keys, occupying: %s, running eviction to match %s',
315 len(data), format_size(total_size), format_size(size_limit))
316
317 removed_items = 0
318 removed_size = 0
319 for key_file, archive_key, size in db.get_sorted_keys(select_policy):
320 # simulate removal impact BEFORE removal
321 total_size -= size
322
323 if total_size <= size_limit:
324 # we obtained what we wanted...
325 break
326
327 self.remove(archive_key)
328 removed_items += 1
329 removed_size += size
330
331 log.debug('Removed %s cache archives, and reduced size by: %s',
332 removed_items, format_size(removed_size))
333 return removed_items
334
335 def get_statistics(self):
336 total_files = 0
337 total_size = 0
338 meta = {}
339
340 for shard, key_file in self.iter_keys():
341 json_key = f"{shard.storage_medium}/{key_file}"
342 with shard.fs.open(json_key, 'rb') as f:
343 total_files += 1
344 metadata = json.loads(f.read())
345 total_size += metadata['size']
346
347 return total_files, total_size, meta
348
@@ -0,0 +1,166 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import codecs
20 import hashlib
21 import logging
22 import os
23
24 import fsspec
25
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
29
30 log = logging.getLogger(__name__)
31
32
33 class FileSystemShard(BaseShard):
34
35 def __init__(self, index, directory, **settings):
36 self._index = index
37 self._directory = directory
38 self.storage_type = 'directory'
39 self.fs = fsspec.filesystem('file')
40
41 @property
42 def directory(self):
43 """Cache directory."""
44 return self._directory
45
46 def _get_keyfile(self, archive_key) -> tuple[str, str]:
47 key_file = f'{archive_key}.{self.key_suffix}'
48 return key_file, os.path.join(self.directory, key_file)
49
50 def _get_writer(self, path, mode):
51 for count in range(1, 11):
52 try:
53 # Another cache may have deleted the directory before
54 # the file could be opened.
55 return self.fs.open(path, mode)
56 except OSError:
57 if count == 10:
58 # Give up after 10 tries to open the file.
59 raise
60 continue
61
62 def _write_file(self, full_path, iterator, mode):
63 # ensure dir exists
64 destination, _ = os.path.split(full_path)
65 if not self.fs.exists(destination):
66 self.fs.makedirs(destination)
67
68 writer = self._get_writer(full_path, mode)
69
70 digest = hashlib.sha256()
71 with writer:
72 size = 0
73 for chunk in iterator:
74 size += len(chunk)
75 digest.update(chunk)
76 writer.write(chunk)
77 writer.flush()
78 # Get the file descriptor
79 fd = writer.fileno()
80
81 # Sync the file descriptor to disk, helps with NFS cases...
82 os.fsync(fd)
83 sha256 = digest.hexdigest()
84 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
85 return size, sha256
86
87 def store(self, key, value_reader, metadata: dict | None = None):
88 return self._store(key, value_reader, metadata, mode='xb')
89
90 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
91 return self._fetch(key, retry, retry_attempts, retry_backoff)
92
93 def remove(self, key):
94 return self._remove(key)
95
96 def random_filename(self):
97 """Return filename and full-path tuple for file storage.
98
99 Filename will be a randomly generated 28 character hexadecimal string
100 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
101 reduce the size of directories. On older filesystems, lookups in
102 directories with many files may be slow.
103 """
104
105 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
106
107 archive_name = hex_name[4:] + '.archive_cache'
108 filename = f"{hex_name[:2]}/{hex_name[2:4]}/{archive_name}"
109
110 full_path = os.path.join(self.directory, filename)
111 return archive_name, full_path
112
113 def __repr__(self):
114 return f'{self.__class__.__name__}(index={self._index}, dir={self.directory})'
115
116
117 class FileSystemFanoutCache(BaseCache):
118
119 def __init__(self, locking_url, **settings):
120 """
121 Initialize file system cache instance.
122
123 :param str locking_url: redis url for a lock
124 :param settings: settings dict
125
126 """
127 self._locking_url = locking_url
128 self._config = settings
129 cache_dir = self.get_conf('archive_cache.filesystem.store_dir')
130 directory = str(cache_dir)
131 directory = os.path.expanduser(directory)
132 directory = os.path.expandvars(directory)
133 self._directory = directory
134 self._storage_path = directory
135
136 # check if it's ok to write, and re-create the archive cache
137 if not os.path.isdir(self._directory):
138 os.makedirs(self._directory, exist_ok=True)
139
140 self._count = int(self.get_conf('archive_cache.filesystem.cache_shards', pop=True))
141
142 self._eviction_policy = self.get_conf('archive_cache.filesystem.eviction_policy', pop=True)
143 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.filesystem.cache_size_gb')))
144
145 self.retry = str2bool(self.get_conf('archive_cache.filesystem.retry', pop=True))
146 self.retry_attempts = int(self.get_conf('archive_cache.filesystem.retry_attempts', pop=True))
147 self.retry_backoff = int(self.get_conf('archive_cache.filesystem.retry_backoff', pop=True))
148
149 log.debug('Initializing archival cache instance under %s', self._directory)
150 self._shards = tuple(
151 FileSystemShard(
152 index=num,
153 directory=os.path.join(directory, 'shard_%03d' % num),
154 **settings,
155 )
156 for num in range(self._count)
157 )
158 self._hash = self._shards[0].hash
159
160 def _get_shard(self, key) -> FileSystemShard:
161 index = self._hash(key) % self._count
162 shard = self._shards[index]
163 return shard
164
165 def _get_size(self, shard, archive_path):
166 return os.stat(archive_path).st_size
@@ -0,0 +1,150 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import codecs
20 import hashlib
21 import logging
22 import os
23
24 import fsspec
25
26 from .base import BaseCache, BaseShard
27 from ..utils import ShardFileReader, NOT_GIVEN
28 from ...type_utils import str2bool
29
30 log = logging.getLogger(__name__)
31
32
33 class S3Shard(BaseShard):
34
35 def __init__(self, index, bucket, **settings):
36 self._index = index
37 self._bucket = bucket
38 self.storage_type = 'bucket'
39
40 endpoint_url = settings.pop('archive_cache.objectstore.url')
41 key = settings.pop('archive_cache.objectstore.key')
42 secret = settings.pop('archive_cache.objectstore.secret')
43
44 self.fs = fsspec.filesystem('s3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret)
45
46 @property
47 def bucket(self):
48 """Cache bucket."""
49 return self._bucket
50
51 def _get_keyfile(self, archive_key) -> tuple[str, str]:
52 key_file = f'{archive_key}-{self.key_suffix}'
53 return key_file, os.path.join(self.bucket, key_file)
54
55 def _get_writer(self, path, mode):
56 return self.fs.open(path, 'wb')
57
58 def _write_file(self, full_path, iterator, mode):
59 # ensure bucket exists
60 destination = self.bucket
61 if not self.fs.exists(destination):
62 self.fs.mkdir(destination, s3_additional_kwargs={})
63
64 writer = self._get_writer(full_path, mode)
65
66 digest = hashlib.sha256()
67 with writer:
68 size = 0
69 for chunk in iterator:
70 size += len(chunk)
71 digest.update(chunk)
72 writer.write(chunk)
73
74 sha256 = digest.hexdigest()
75 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
76 return size, sha256
77
78 def store(self, key, value_reader, metadata: dict | None = None):
79 return self._store(key, value_reader, metadata, mode='wb')
80
81 def fetch(self, key, retry=NOT_GIVEN, retry_attempts=NOT_GIVEN, retry_backoff=1) -> tuple[ShardFileReader, dict]:
82 return self._fetch(key, retry, retry_attempts, retry_backoff)
83
84 def remove(self, key):
85 return self._remove(key)
86
87 def random_filename(self):
88 """Return filename and full-path tuple for file storage.
89
90 Filename will be a randomly generated 28 character hexadecimal string
91 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
92 reduce the size of directories. On older filesystems, lookups in
93 directories with many files may be slow.
94 """
95
96 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
97
98 archive_name = hex_name[4:] + '.archive_cache'
99 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
100
101 full_path = os.path.join(self.bucket, filename)
102 return archive_name, full_path
103
104 def __repr__(self):
105 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
106
107
108 class ObjectStoreCache(BaseCache):
109
110 def __init__(self, locking_url, **settings):
111 """
112 Initialize objectstore cache instance.
113
114 :param str locking_url: redis url for a lock
115 :param settings: settings dict
116
117 """
118 self._locking_url = locking_url
119 self._config = settings
120
121 objectstore_url = self.get_conf('archive_cache.objectstore.url')
122 self._storage_path = objectstore_url
123
124 self._count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
125
126 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
127 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
128
129 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
130 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
131 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
132
133 log.debug('Initializing archival cache instance under %s', objectstore_url)
134 self._shards = tuple(
135 S3Shard(
136 index=num,
137 bucket='rhodecode-archivecache-%03d' % num,
138 **settings,
139 )
140 for num in range(self._count)
141 )
142 self._hash = self._shards[0].hash
143
144 def _get_shard(self, key) -> S3Shard:
145 index = self._hash(key) % self._count
146 shard = self._shards[index]
147 return shard
148
149 def _get_size(self, shard, archive_path):
150 return shard.fs.info(archive_path)['size']
@@ -0,0 +1,62 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import redis
20 from .._vendor import redis_lock
21
22
23 class ArchiveCacheGenerationLock(Exception):
24 pass
25
26
27 class GenerationLock:
28 """
29 Locking mechanism that detects if a lock is acquired
30
31 with GenerationLock(lock_key):
32 compute_archive()
33 """
34 lock_timeout = 7200
35
36 def __init__(self, lock_key, url):
37 self.lock_key = lock_key
38 self._create_client(url)
39 self.lock = self.get_lock()
40
41 def _create_client(self, url):
42 connection_pool = redis.ConnectionPool.from_url(url)
43 self.writer_client = redis.StrictRedis(
44 connection_pool=connection_pool
45 )
46 self.reader_client = self.writer_client
47
48 def get_lock(self):
49 return redis_lock.Lock(
50 redis_client=self.writer_client,
51 name=self.lock_key,
52 expire=self.lock_timeout,
53 strict=True
54 )
55
56 def __enter__(self):
57 acquired = self.lock.acquire(blocking=False)
58 if not acquired:
59 raise ArchiveCacheGenerationLock('Failed to create a lock')
60
61 def __exit__(self, exc_type, exc_val, exc_tb):
62 self.lock.release()
@@ -0,0 +1,134 b''
1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import sqlite3
20 import s3fs.core
21
22 NOT_GIVEN = -917
23
24
25 EVICTION_POLICY = {
26 'none': {
27 'evict': None,
28 },
29 'least-recently-stored': {
30 'evict': 'SELECT {fields} FROM archive_cache ORDER BY store_time',
31 },
32 'least-recently-used': {
33 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_time',
34 },
35 'least-frequently-used': {
36 'evict': 'SELECT {fields} FROM archive_cache ORDER BY access_count',
37 },
38 }
39
40
41 def archive_iterator(_reader, block_size: int = 4096 * 512):
42 # 4096 * 64 = 64KB
43 while 1:
44 data = _reader.read(block_size)
45 if not data:
46 break
47 yield data
48
49
50 def format_size(size):
51 # Convert size in bytes to a human-readable format (e.g., KB, MB, GB)
52 for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
53 if size < 1024:
54 return f"{size:.2f} {unit}"
55 size /= 1024
56
57
58 class StatsDB:
59
60 def __init__(self):
61 self.connection = sqlite3.connect(':memory:')
62 self._init_db()
63
64 def _init_db(self):
65 qry = '''
66 CREATE TABLE IF NOT EXISTS archive_cache (
67 rowid INTEGER PRIMARY KEY,
68 key_file TEXT,
69 key_file_path TEXT,
70 archive_key TEXT,
71 archive_path TEXT,
72 store_time REAL,
73 access_time REAL,
74 access_count INTEGER DEFAULT 0,
75 size INTEGER DEFAULT 0
76 )
77 '''
78
79 self.sql(qry)
80 self.connection.commit()
81
82 @property
83 def sql(self):
84 return self.connection.execute
85
86 def bulk_insert(self, rows):
87 qry = '''
88 INSERT INTO archive_cache (
89 rowid,
90 key_file,
91 key_file_path,
92 archive_key,
93 archive_path,
94 store_time,
95 access_time,
96 access_count,
97 size
98 )
99 VALUES (
100 ?, ?, ?, ?, ?, ?, ?, ?, ?
101 )
102 '''
103 cursor = self.connection.cursor()
104 cursor.executemany(qry, rows)
105 self.connection.commit()
106
107 def get_total_size(self):
108 qry = 'SELECT COALESCE(SUM(size), 0) FROM archive_cache'
109 ((total_size,),) = self.sql(qry).fetchall()
110 return total_size
111
112 def get_sorted_keys(self, select_policy):
113 select_policy_qry = select_policy.format(fields='key_file, archive_key, size')
114 return self.sql(select_policy_qry).fetchall()
115
116
117 class ShardFileReader:
118
119 def __init__(self, file_like_reader):
120 self._file_like_reader = file_like_reader
121
122 def __getattr__(self, item):
123 if isinstance(self._file_like_reader, s3fs.core.S3File):
124 match item:
125 case 'name':
126 # S3 FileWrapper doesn't support name attribute, and we use it
127 return self._file_like_reader.full_name
128 case _:
129 return getattr(self._file_like_reader, item)
130 else:
131 return getattr(self._file_like_reader, item)
132
133 def __repr__(self):
134 return f'<{self.__class__.__name__}={self._file_like_reader}>'
@@ -1,73 +1,102 b''
1 # deps, generated via pipdeptree --exclude setuptools,wheel,pipdeptree,pip -f | tr '[:upper:]' '[:lower:]'
1 # deps, generated via pipdeptree --exclude setuptools,wheel,pipdeptree,pip -f | tr '[:upper:]' '[:lower:]'
2
2
3 async-timeout==4.0.3
3 async-timeout==4.0.3
4 atomicwrites==1.4.1
4 atomicwrites==1.4.1
5 celery==5.3.6
5 celery==5.3.6
6 billiard==4.2.0
6 billiard==4.2.0
7 click==8.1.3
7 click==8.1.3
8 click-didyoumean==0.3.0
8 click-didyoumean==0.3.0
9 click==8.1.3
9 click==8.1.3
10 click-plugins==1.1.1
10 click-plugins==1.1.1
11 click==8.1.3
11 click==8.1.3
12 click-repl==0.2.0
12 click-repl==0.2.0
13 click==8.1.3
13 click==8.1.3
14 prompt-toolkit==3.0.38
14 prompt-toolkit==3.0.38
15 wcwidth==0.2.6
15 wcwidth==0.2.6
16 six==1.16.0
16 six==1.16.0
17 kombu==5.3.5
17 kombu==5.3.5
18 amqp==5.2.0
18 amqp==5.2.0
19 vine==5.1.0
19 vine==5.1.0
20 vine==5.1.0
20 vine==5.1.0
21 python-dateutil==2.8.2
21 python-dateutil==2.8.2
22 six==1.16.0
22 six==1.16.0
23 tzdata==2024.1
23 tzdata==2024.1
24 vine==5.1.0
24 vine==5.1.0
25 contextlib2==21.6.0
25 contextlib2==21.6.0
26 dogpile.cache==1.3.3
26 dogpile.cache==1.3.3
27 decorator==5.1.1
27 decorator==5.1.1
28 stevedore==5.1.0
28 stevedore==5.1.0
29 pbr==5.11.1
29 pbr==5.11.1
30 dulwich==0.21.6
30 dulwich==0.21.6
31 urllib3==1.26.14
31 urllib3==1.26.14
32 fsspec==2024.6.0
32 gunicorn==21.2.0
33 gunicorn==21.2.0
33 packaging==24.0
34 packaging==24.0
34 hg-evolve==11.1.3
35 hg-evolve==11.1.3
35 importlib-metadata==6.0.0
36 importlib-metadata==6.0.0
36 zipp==3.15.0
37 zipp==3.15.0
37 mercurial==6.7.4
38 mercurial==6.7.4
38 more-itertools==9.1.0
39 more-itertools==9.1.0
39 msgpack==1.0.8
40 msgpack==1.0.8
40 orjson==3.10.3
41 orjson==3.10.3
41 psutil==5.9.8
42 psutil==5.9.8
42 py==1.11.0
43 py==1.11.0
43 pygit2==1.13.3
44 pygit2==1.13.3
44 cffi==1.16.0
45 cffi==1.16.0
45 pycparser==2.21
46 pycparser==2.21
46 pygments==2.15.1
47 pygments==2.15.1
47 pyparsing==3.1.1
48 pyparsing==3.1.1
48 pyramid==2.0.2
49 pyramid==2.0.2
49 hupper==1.12
50 hupper==1.12
50 plaster==1.1.2
51 plaster==1.1.2
51 plaster-pastedeploy==1.0.1
52 plaster-pastedeploy==1.0.1
52 pastedeploy==3.1.0
53 pastedeploy==3.1.0
53 plaster==1.1.2
54 plaster==1.1.2
54 translationstring==1.4
55 translationstring==1.4
55 venusian==3.0.0
56 venusian==3.0.0
56 webob==1.8.7
57 webob==1.8.7
57 zope.deprecation==5.0.0
58 zope.deprecation==5.0.0
58 zope.interface==6.3.0
59 zope.interface==6.3.0
59 redis==5.0.4
60 redis==5.0.4
60 async-timeout==4.0.3
61 async-timeout==4.0.3
61 repoze.lru==0.7
62 repoze.lru==0.7
63 s3fs==2024.6.0
64 aiobotocore==2.13.0
65 aiohttp==3.9.5
66 aiosignal==1.3.1
67 frozenlist==1.4.1
68 attrs==22.2.0
69 frozenlist==1.4.1
70 multidict==6.0.5
71 yarl==1.9.4
72 idna==3.4
73 multidict==6.0.5
74 aioitertools==0.11.0
75 botocore==1.34.106
76 jmespath==1.0.1
77 python-dateutil==2.8.2
78 six==1.16.0
79 urllib3==1.26.14
80 wrapt==1.16.0
81 aiohttp==3.9.5
82 aiosignal==1.3.1
83 frozenlist==1.4.1
84 attrs==22.2.0
85 frozenlist==1.4.1
86 multidict==6.0.5
87 yarl==1.9.4
88 idna==3.4
89 multidict==6.0.5
90 fsspec==2024.6.0
62 scandir==1.10.0
91 scandir==1.10.0
63 setproctitle==1.3.3
92 setproctitle==1.3.3
64 subvertpy==0.11.0
93 subvertpy==0.11.0
65 waitress==3.0.0
94 waitress==3.0.0
66 wcwidth==0.2.6
95 wcwidth==0.2.6
67
96
68
97
69 ## test related requirements
98 ## test related requirements
70 #-r requirements_test.txt
99 #-r requirements_test.txt
71
100
72 ## uncomment to add the debug libraries
101 ## uncomment to add the debug libraries
73 #-r requirements_debug.txt
102 #-r requirements_debug.txt
@@ -1,187 +1,187 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 import os
17 import os
18 import sys
18 import sys
19 import tempfile
19 import tempfile
20 import logging
20 import logging
21 import urllib.parse
21 import urllib.parse
22
22
23 from vcsserver.lib.rc_cache.archive_cache import get_archival_cache_store
23 from vcsserver.lib.archive_cache import get_archival_cache_store
24
24
25 from vcsserver import exceptions
25 from vcsserver import exceptions
26 from vcsserver.exceptions import NoContentException
26 from vcsserver.exceptions import NoContentException
27 from vcsserver.hgcompat import archival
27 from vcsserver.hgcompat import archival
28 from vcsserver.lib.str_utils import safe_bytes
28 from vcsserver.lib.str_utils import safe_bytes
29 from vcsserver.lib.exc_tracking import format_exc
29 from vcsserver.lib.exc_tracking import format_exc
30 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
31
31
32
32
33 class RepoFactory:
33 class RepoFactory:
34 """
34 """
35 Utility to create instances of repository
35 Utility to create instances of repository
36
36
37 It provides internal caching of the `repo` object based on
37 It provides internal caching of the `repo` object based on
38 the :term:`call context`.
38 the :term:`call context`.
39 """
39 """
40 repo_type = None
40 repo_type = None
41
41
42 def __init__(self):
42 def __init__(self):
43 pass
43 pass
44
44
45 def _create_config(self, path, config):
45 def _create_config(self, path, config):
46 config = {}
46 config = {}
47 return config
47 return config
48
48
49 def _create_repo(self, wire, create):
49 def _create_repo(self, wire, create):
50 raise NotImplementedError()
50 raise NotImplementedError()
51
51
52 def repo(self, wire, create=False):
52 def repo(self, wire, create=False):
53 raise NotImplementedError()
53 raise NotImplementedError()
54
54
55
55
56 def obfuscate_qs(query_string):
56 def obfuscate_qs(query_string):
57 if query_string is None:
57 if query_string is None:
58 return None
58 return None
59
59
60 parsed = []
60 parsed = []
61 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
61 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
62 if k in ['auth_token', 'api_key']:
62 if k in ['auth_token', 'api_key']:
63 v = "*****"
63 v = "*****"
64 parsed.append((k, v))
64 parsed.append((k, v))
65
65
66 return '&'.join('{}{}'.format(
66 return '&'.join('{}{}'.format(
67 k, f'={v}' if v else '') for k, v in parsed)
67 k, f'={v}' if v else '') for k, v in parsed)
68
68
69
69
70 def raise_from_original(new_type, org_exc: Exception):
70 def raise_from_original(new_type, org_exc: Exception):
71 """
71 """
72 Raise a new exception type with original args and traceback.
72 Raise a new exception type with original args and traceback.
73 """
73 """
74 exc_info = sys.exc_info()
74 exc_info = sys.exc_info()
75 exc_type, exc_value, exc_traceback = exc_info
75 exc_type, exc_value, exc_traceback = exc_info
76 new_exc = new_type(*exc_value.args)
76 new_exc = new_type(*exc_value.args)
77
77
78 # store the original traceback into the new exc
78 # store the original traceback into the new exc
79 new_exc._org_exc_tb = format_exc(exc_info)
79 new_exc._org_exc_tb = format_exc(exc_info)
80
80
81 try:
81 try:
82 raise new_exc.with_traceback(exc_traceback)
82 raise new_exc.with_traceback(exc_traceback)
83 finally:
83 finally:
84 del exc_traceback
84 del exc_traceback
85
85
86
86
87 class ArchiveNode:
87 class ArchiveNode:
88 def __init__(self, path, mode, is_link, raw_bytes):
88 def __init__(self, path, mode, is_link, raw_bytes):
89 self.path = path
89 self.path = path
90 self.mode = mode
90 self.mode = mode
91 self.is_link = is_link
91 self.is_link = is_link
92 self.raw_bytes = raw_bytes
92 self.raw_bytes = raw_bytes
93
93
94
94
95 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
95 def store_archive_in_cache(node_walker, archive_key, kind, mtime, archive_at_path, archive_dir_name,
96 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
96 commit_id, write_metadata=True, extra_metadata=None, cache_config=None):
97 """
97 """
98 Function that would store generate archive and send it to a dedicated backend store
98 Function that would store generate archive and send it to a dedicated backend store
99 In here we use diskcache
99 In here we use diskcache
100
100
101 :param node_walker: a generator returning nodes to add to archive
101 :param node_walker: a generator returning nodes to add to archive
102 :param archive_key: key used to store the path
102 :param archive_key: key used to store the path
103 :param kind: archive kind
103 :param kind: archive kind
104 :param mtime: time of creation
104 :param mtime: time of creation
105 :param archive_at_path: default '/' the path at archive was started.
105 :param archive_at_path: default '/' the path at archive was started.
106 If this is not '/' it means it's a partial archive
106 If this is not '/' it means it's a partial archive
107 :param archive_dir_name: inside dir name when creating an archive
107 :param archive_dir_name: inside dir name when creating an archive
108 :param commit_id: commit sha of revision archive was created at
108 :param commit_id: commit sha of revision archive was created at
109 :param write_metadata:
109 :param write_metadata:
110 :param extra_metadata:
110 :param extra_metadata:
111 :param cache_config:
111 :param cache_config:
112
112
113 walker should be a file walker, for example,
113 walker should be a file walker, for example,
114 def node_walker():
114 def node_walker():
115 for file_info in files:
115 for file_info in files:
116 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
116 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
117 """
117 """
118 extra_metadata = extra_metadata or {}
118 extra_metadata = extra_metadata or {}
119
119
120 d_cache = get_archival_cache_store(config=cache_config)
120 d_cache = get_archival_cache_store(config=cache_config)
121
121
122 if archive_key in d_cache:
122 if archive_key in d_cache:
123 reader, metadata = d_cache.fetch(archive_key)
123 reader, metadata = d_cache.fetch(archive_key)
124 return reader.name
124 return reader.name
125
125
126 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
126 archive_tmp_path = safe_bytes(tempfile.mkstemp()[1])
127 log.debug('Creating new temp archive in %s', archive_tmp_path)
127 log.debug('Creating new temp archive in %s', archive_tmp_path)
128
128
129 if kind == "tgz":
129 if kind == "tgz":
130 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
130 archiver = archival.tarit(archive_tmp_path, mtime, b"gz")
131 elif kind == "tbz2":
131 elif kind == "tbz2":
132 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
132 archiver = archival.tarit(archive_tmp_path, mtime, b"bz2")
133 elif kind == 'zip':
133 elif kind == 'zip':
134 archiver = archival.zipit(archive_tmp_path, mtime)
134 archiver = archival.zipit(archive_tmp_path, mtime)
135 else:
135 else:
136 raise exceptions.ArchiveException()(
136 raise exceptions.ArchiveException()(
137 f'Remote does not support: "{kind}" archive type.')
137 f'Remote does not support: "{kind}" archive type.')
138
138
139 for f in node_walker(commit_id, archive_at_path):
139 for f in node_walker(commit_id, archive_at_path):
140 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
140 f_path = os.path.join(safe_bytes(archive_dir_name), safe_bytes(f.path).lstrip(b'/'))
141
141
142 try:
142 try:
143 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
143 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
144 except NoContentException:
144 except NoContentException:
145 # NOTE(marcink): this is a special case for SVN so we can create "empty"
145 # NOTE(marcink): this is a special case for SVN so we can create "empty"
146 # directories which are not supported by archiver
146 # directories which are not supported by archiver
147 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
147 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, b'')
148
148
149 metadata = dict([
149 metadata = dict([
150 ('commit_id', commit_id),
150 ('commit_id', commit_id),
151 ('mtime', mtime),
151 ('mtime', mtime),
152 ])
152 ])
153 metadata.update(extra_metadata)
153 metadata.update(extra_metadata)
154 if write_metadata:
154 if write_metadata:
155 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
155 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
156 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
156 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
157 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
157 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
158
158
159 archiver.done()
159 archiver.done()
160
160
161 with open(archive_tmp_path, 'rb') as archive_file:
161 with open(archive_tmp_path, 'rb') as archive_file:
162 add_result = d_cache.store(archive_key, archive_file, metadata=metadata)
162 add_result = d_cache.store(archive_key, archive_file, metadata=metadata)
163 if not add_result:
163 if not add_result:
164 log.error('Failed to store cache for key=%s', archive_key)
164 log.error('Failed to store cache for key=%s', archive_key)
165
165
166 os.remove(archive_tmp_path)
166 os.remove(archive_tmp_path)
167
167
168 reader, metadata = d_cache.fetch(archive_key)
168 reader, metadata = d_cache.fetch(archive_key)
169
169
170 return reader.name
170 return reader.name
171
171
172
172
173 class BinaryEnvelope:
173 class BinaryEnvelope:
174 def __init__(self, val):
174 def __init__(self, val):
175 self.val = val
175 self.val = val
176
176
177
177
178 class BytesEnvelope(bytes):
178 class BytesEnvelope(bytes):
179 def __new__(cls, content):
179 def __new__(cls, content):
180 if isinstance(content, bytes):
180 if isinstance(content, bytes):
181 return super().__new__(cls, content)
181 return super().__new__(cls, content)
182 else:
182 else:
183 raise TypeError('BytesEnvelope content= param must be bytes. Use BinaryEnvelope to wrap other types')
183 raise TypeError('BytesEnvelope content= param must be bytes. Use BinaryEnvelope to wrap other types')
184
184
185
185
186 class BinaryBytesEnvelope(BytesEnvelope):
186 class BinaryBytesEnvelope(BytesEnvelope):
187 pass
187 pass
@@ -1,774 +1,774 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import io
18 import io
19 import os
19 import os
20 import platform
20 import platform
21 import sys
21 import sys
22 import locale
22 import locale
23 import logging
23 import logging
24 import uuid
24 import uuid
25 import time
25 import time
26 import wsgiref.util
26 import wsgiref.util
27 import tempfile
27 import tempfile
28 import psutil
28 import psutil
29
29
30 from itertools import chain
30 from itertools import chain
31
31
32 import msgpack
32 import msgpack
33 import configparser
33 import configparser
34
34
35 from pyramid.config import Configurator
35 from pyramid.config import Configurator
36 from pyramid.wsgi import wsgiapp
36 from pyramid.wsgi import wsgiapp
37 from pyramid.response import Response
37 from pyramid.response import Response
38
38
39 from vcsserver.base import BytesEnvelope, BinaryEnvelope
39 from vcsserver.base import BytesEnvelope, BinaryEnvelope
40 from vcsserver.lib.ext_json import json
40 from vcsserver.lib.ext_json import json
41 from vcsserver.config.settings_maker import SettingsMaker
41 from vcsserver.config.settings_maker import SettingsMaker
42 from vcsserver.lib.str_utils import safe_int
42 from vcsserver.lib.str_utils import safe_int
43 from vcsserver.lib.statsd_client import StatsdClient
43 from vcsserver.lib.statsd_client import StatsdClient
44 from vcsserver.tweens.request_wrapper import get_headers_call_context
44 from vcsserver.tweens.request_wrapper import get_headers_call_context
45
45
46 import vcsserver
46 import vcsserver
47 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
47 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
48 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
48 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
49 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
49 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
50 from vcsserver.echo_stub.echo_app import EchoApp
50 from vcsserver.echo_stub.echo_app import EchoApp
51 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
51 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
52 from vcsserver.lib.exc_tracking import store_exception, format_exc
52 from vcsserver.lib.exc_tracking import store_exception, format_exc
53 from vcsserver.server import VcsServer
53 from vcsserver.server import VcsServer
54
54
55 strict_vcs = True
55 strict_vcs = True
56
56
57 git_import_err = None
57 git_import_err = None
58 try:
58 try:
59 from vcsserver.remote.git_remote import GitFactory, GitRemote
59 from vcsserver.remote.git_remote import GitFactory, GitRemote
60 except ImportError as e:
60 except ImportError as e:
61 GitFactory = None
61 GitFactory = None
62 GitRemote = None
62 GitRemote = None
63 git_import_err = e
63 git_import_err = e
64 if strict_vcs:
64 if strict_vcs:
65 raise
65 raise
66
66
67
67
68 hg_import_err = None
68 hg_import_err = None
69 try:
69 try:
70 from vcsserver.remote.hg_remote import MercurialFactory, HgRemote
70 from vcsserver.remote.hg_remote import MercurialFactory, HgRemote
71 except ImportError as e:
71 except ImportError as e:
72 MercurialFactory = None
72 MercurialFactory = None
73 HgRemote = None
73 HgRemote = None
74 hg_import_err = e
74 hg_import_err = e
75 if strict_vcs:
75 if strict_vcs:
76 raise
76 raise
77
77
78
78
79 svn_import_err = None
79 svn_import_err = None
80 try:
80 try:
81 from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote
81 from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote
82 except ImportError as e:
82 except ImportError as e:
83 SubversionFactory = None
83 SubversionFactory = None
84 SvnRemote = None
84 SvnRemote = None
85 svn_import_err = e
85 svn_import_err = e
86 if strict_vcs:
86 if strict_vcs:
87 raise
87 raise
88
88
89 log = logging.getLogger(__name__)
89 log = logging.getLogger(__name__)
90
90
91 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
91 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
92 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
92 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
93
93
94 try:
94 try:
95 locale.setlocale(locale.LC_ALL, '')
95 locale.setlocale(locale.LC_ALL, '')
96 except locale.Error as e:
96 except locale.Error as e:
97 log.error(
97 log.error(
98 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
98 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
99 os.environ['LC_ALL'] = 'C'
99 os.environ['LC_ALL'] = 'C'
100
100
101
101
102 def _is_request_chunked(environ):
102 def _is_request_chunked(environ):
103 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
103 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
104 return stream
104 return stream
105
105
106
106
107 def log_max_fd():
107 def log_max_fd():
108 try:
108 try:
109 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
109 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
110 log.info('Max file descriptors value: %s', maxfd)
110 log.info('Max file descriptors value: %s', maxfd)
111 except Exception:
111 except Exception:
112 pass
112 pass
113
113
114
114
115 class VCS:
115 class VCS:
116 def __init__(self, locale_conf=None, cache_config=None):
116 def __init__(self, locale_conf=None, cache_config=None):
117 self.locale = locale_conf
117 self.locale = locale_conf
118 self.cache_config = cache_config
118 self.cache_config = cache_config
119 self._configure_locale()
119 self._configure_locale()
120
120
121 log_max_fd()
121 log_max_fd()
122
122
123 if GitFactory and GitRemote:
123 if GitFactory and GitRemote:
124 git_factory = GitFactory()
124 git_factory = GitFactory()
125 self._git_remote = GitRemote(git_factory)
125 self._git_remote = GitRemote(git_factory)
126 else:
126 else:
127 log.error("Git client import failed: %s", git_import_err)
127 log.error("Git client import failed: %s", git_import_err)
128
128
129 if MercurialFactory and HgRemote:
129 if MercurialFactory and HgRemote:
130 hg_factory = MercurialFactory()
130 hg_factory = MercurialFactory()
131 self._hg_remote = HgRemote(hg_factory)
131 self._hg_remote = HgRemote(hg_factory)
132 else:
132 else:
133 log.error("Mercurial client import failed: %s", hg_import_err)
133 log.error("Mercurial client import failed: %s", hg_import_err)
134
134
135 if SubversionFactory and SvnRemote:
135 if SubversionFactory and SvnRemote:
136 svn_factory = SubversionFactory()
136 svn_factory = SubversionFactory()
137
137
138 # hg factory is used for svn url validation
138 # hg factory is used for svn url validation
139 hg_factory = MercurialFactory()
139 hg_factory = MercurialFactory()
140 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
140 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
141 else:
141 else:
142 log.error("Subversion client import failed: %s", svn_import_err)
142 log.error("Subversion client import failed: %s", svn_import_err)
143
143
144 self._vcsserver = VcsServer()
144 self._vcsserver = VcsServer()
145
145
146 def _configure_locale(self):
146 def _configure_locale(self):
147 if self.locale:
147 if self.locale:
148 log.info('Settings locale: `LC_ALL` to %s', self.locale)
148 log.info('Settings locale: `LC_ALL` to %s', self.locale)
149 else:
149 else:
150 log.info('Configuring locale subsystem based on environment variables')
150 log.info('Configuring locale subsystem based on environment variables')
151 try:
151 try:
152 # If self.locale is the empty string, then the locale
152 # If self.locale is the empty string, then the locale
153 # module will use the environment variables. See the
153 # module will use the environment variables. See the
154 # documentation of the package `locale`.
154 # documentation of the package `locale`.
155 locale.setlocale(locale.LC_ALL, self.locale)
155 locale.setlocale(locale.LC_ALL, self.locale)
156
156
157 language_code, encoding = locale.getlocale()
157 language_code, encoding = locale.getlocale()
158 log.info(
158 log.info(
159 'Locale set to language code "%s" with encoding "%s".',
159 'Locale set to language code "%s" with encoding "%s".',
160 language_code, encoding)
160 language_code, encoding)
161 except locale.Error:
161 except locale.Error:
162 log.exception('Cannot set locale, not configuring the locale system')
162 log.exception('Cannot set locale, not configuring the locale system')
163
163
164
164
165 class WsgiProxy:
165 class WsgiProxy:
166 def __init__(self, wsgi):
166 def __init__(self, wsgi):
167 self.wsgi = wsgi
167 self.wsgi = wsgi
168
168
169 def __call__(self, environ, start_response):
169 def __call__(self, environ, start_response):
170 input_data = environ['wsgi.input'].read()
170 input_data = environ['wsgi.input'].read()
171 input_data = msgpack.unpackb(input_data)
171 input_data = msgpack.unpackb(input_data)
172
172
173 error = None
173 error = None
174 try:
174 try:
175 data, status, headers = self.wsgi.handle(
175 data, status, headers = self.wsgi.handle(
176 input_data['environment'], input_data['input_data'],
176 input_data['environment'], input_data['input_data'],
177 *input_data['args'], **input_data['kwargs'])
177 *input_data['args'], **input_data['kwargs'])
178 except Exception as e:
178 except Exception as e:
179 data, status, headers = [], None, None
179 data, status, headers = [], None, None
180 error = {
180 error = {
181 'message': str(e),
181 'message': str(e),
182 '_vcs_kind': getattr(e, '_vcs_kind', None)
182 '_vcs_kind': getattr(e, '_vcs_kind', None)
183 }
183 }
184
184
185 start_response(200, {})
185 start_response(200, {})
186 return self._iterator(error, status, headers, data)
186 return self._iterator(error, status, headers, data)
187
187
188 def _iterator(self, error, status, headers, data):
188 def _iterator(self, error, status, headers, data):
189 initial_data = [
189 initial_data = [
190 error,
190 error,
191 status,
191 status,
192 headers,
192 headers,
193 ]
193 ]
194
194
195 for d in chain(initial_data, data):
195 for d in chain(initial_data, data):
196 yield msgpack.packb(d)
196 yield msgpack.packb(d)
197
197
198
198
199 def not_found(request):
199 def not_found(request):
200 return {'status': '404 NOT FOUND'}
200 return {'status': '404 NOT FOUND'}
201
201
202
202
203 class VCSViewPredicate:
203 class VCSViewPredicate:
204 def __init__(self, val, config):
204 def __init__(self, val, config):
205 self.remotes = val
205 self.remotes = val
206
206
207 def text(self):
207 def text(self):
208 return f'vcs view method = {list(self.remotes.keys())}'
208 return f'vcs view method = {list(self.remotes.keys())}'
209
209
210 phash = text
210 phash = text
211
211
212 def __call__(self, context, request):
212 def __call__(self, context, request):
213 """
213 """
214 View predicate that returns true if given backend is supported by
214 View predicate that returns true if given backend is supported by
215 defined remotes.
215 defined remotes.
216 """
216 """
217 backend = request.matchdict.get('backend')
217 backend = request.matchdict.get('backend')
218 return backend in self.remotes
218 return backend in self.remotes
219
219
220
220
221 class HTTPApplication:
221 class HTTPApplication:
222 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
222 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
223
223
224 remote_wsgi = remote_wsgi
224 remote_wsgi = remote_wsgi
225 _use_echo_app = False
225 _use_echo_app = False
226
226
227 def __init__(self, settings=None, global_config=None):
227 def __init__(self, settings=None, global_config=None):
228
228
229 self.config = Configurator(settings=settings)
229 self.config = Configurator(settings=settings)
230 # Init our statsd at very start
230 # Init our statsd at very start
231 self.config.registry.statsd = StatsdClient.statsd
231 self.config.registry.statsd = StatsdClient.statsd
232 self.config.registry.vcs_call_context = {}
232 self.config.registry.vcs_call_context = {}
233
233
234 self.global_config = global_config
234 self.global_config = global_config
235 self.config.include('vcsserver.lib.rc_cache')
235 self.config.include('vcsserver.lib.rc_cache')
236 self.config.include('vcsserver.lib.rc_cache.archive_cache')
236 self.config.include('vcsserver.lib.archive_cache')
237
237
238 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
238 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
239 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
239 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
240 self._remotes = {
240 self._remotes = {
241 'hg': vcs._hg_remote,
241 'hg': vcs._hg_remote,
242 'git': vcs._git_remote,
242 'git': vcs._git_remote,
243 'svn': vcs._svn_remote,
243 'svn': vcs._svn_remote,
244 'server': vcs._vcsserver,
244 'server': vcs._vcsserver,
245 }
245 }
246 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
246 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
247 self._use_echo_app = True
247 self._use_echo_app = True
248 log.warning("Using EchoApp for VCS operations.")
248 log.warning("Using EchoApp for VCS operations.")
249 self.remote_wsgi = remote_wsgi_stub
249 self.remote_wsgi = remote_wsgi_stub
250
250
251 self._configure_settings(global_config, settings)
251 self._configure_settings(global_config, settings)
252
252
253 self._configure()
253 self._configure()
254
254
255 def _configure_settings(self, global_config, app_settings):
255 def _configure_settings(self, global_config, app_settings):
256 """
256 """
257 Configure the settings module.
257 Configure the settings module.
258 """
258 """
259 settings_merged = global_config.copy()
259 settings_merged = global_config.copy()
260 settings_merged.update(app_settings)
260 settings_merged.update(app_settings)
261
261
262 binary_dir = app_settings['core.binary_dir']
262 binary_dir = app_settings['core.binary_dir']
263
263
264 settings.BINARY_DIR = binary_dir
264 settings.BINARY_DIR = binary_dir
265
265
266 # Store the settings to make them available to other modules.
266 # Store the settings to make them available to other modules.
267 vcsserver.PYRAMID_SETTINGS = settings_merged
267 vcsserver.PYRAMID_SETTINGS = settings_merged
268 vcsserver.CONFIG = settings_merged
268 vcsserver.CONFIG = settings_merged
269
269
270 def _configure(self):
270 def _configure(self):
271 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
271 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
272
272
273 self.config.add_route('service', '/_service')
273 self.config.add_route('service', '/_service')
274 self.config.add_route('status', '/status')
274 self.config.add_route('status', '/status')
275 self.config.add_route('hg_proxy', '/proxy/hg')
275 self.config.add_route('hg_proxy', '/proxy/hg')
276 self.config.add_route('git_proxy', '/proxy/git')
276 self.config.add_route('git_proxy', '/proxy/git')
277
277
278 # rpc methods
278 # rpc methods
279 self.config.add_route('vcs', '/{backend}')
279 self.config.add_route('vcs', '/{backend}')
280
280
281 # streaming rpc remote methods
281 # streaming rpc remote methods
282 self.config.add_route('vcs_stream', '/{backend}/stream')
282 self.config.add_route('vcs_stream', '/{backend}/stream')
283
283
284 # vcs operations clone/push as streaming
284 # vcs operations clone/push as streaming
285 self.config.add_route('stream_git', '/stream/git/*repo_name')
285 self.config.add_route('stream_git', '/stream/git/*repo_name')
286 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
286 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
287
287
288 self.config.add_view(self.status_view, route_name='status', renderer='json')
288 self.config.add_view(self.status_view, route_name='status', renderer='json')
289 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
289 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
290
290
291 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
291 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
292 self.config.add_view(self.git_proxy(), route_name='git_proxy')
292 self.config.add_view(self.git_proxy(), route_name='git_proxy')
293 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
293 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
294 vcs_view=self._remotes)
294 vcs_view=self._remotes)
295 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
295 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
296 vcs_view=self._remotes)
296 vcs_view=self._remotes)
297
297
298 self.config.add_view(self.hg_stream(), route_name='stream_hg')
298 self.config.add_view(self.hg_stream(), route_name='stream_hg')
299 self.config.add_view(self.git_stream(), route_name='stream_git')
299 self.config.add_view(self.git_stream(), route_name='stream_git')
300
300
301 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
301 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
302
302
303 self.config.add_notfound_view(not_found, renderer='json')
303 self.config.add_notfound_view(not_found, renderer='json')
304
304
305 self.config.add_view(self.handle_vcs_exception, context=Exception)
305 self.config.add_view(self.handle_vcs_exception, context=Exception)
306
306
307 self.config.add_tween(
307 self.config.add_tween(
308 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
308 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
309 )
309 )
310 self.config.add_request_method(
310 self.config.add_request_method(
311 'vcsserver.lib.request_counter.get_request_counter',
311 'vcsserver.lib.request_counter.get_request_counter',
312 'request_count')
312 'request_count')
313
313
314 def wsgi_app(self):
314 def wsgi_app(self):
315 return self.config.make_wsgi_app()
315 return self.config.make_wsgi_app()
316
316
317 def _vcs_view_params(self, request):
317 def _vcs_view_params(self, request):
318 remote = self._remotes[request.matchdict['backend']]
318 remote = self._remotes[request.matchdict['backend']]
319 payload = msgpack.unpackb(request.body, use_list=True)
319 payload = msgpack.unpackb(request.body, use_list=True)
320
320
321 method = payload.get('method')
321 method = payload.get('method')
322 params = payload['params']
322 params = payload['params']
323 wire = params.get('wire')
323 wire = params.get('wire')
324 args = params.get('args')
324 args = params.get('args')
325 kwargs = params.get('kwargs')
325 kwargs = params.get('kwargs')
326 context_uid = None
326 context_uid = None
327
327
328 request.registry.vcs_call_context = {
328 request.registry.vcs_call_context = {
329 'method': method,
329 'method': method,
330 'repo_name': payload.get('_repo_name'),
330 'repo_name': payload.get('_repo_name'),
331 }
331 }
332
332
333 if wire:
333 if wire:
334 try:
334 try:
335 wire['context'] = context_uid = uuid.UUID(wire['context'])
335 wire['context'] = context_uid = uuid.UUID(wire['context'])
336 except KeyError:
336 except KeyError:
337 pass
337 pass
338 args.insert(0, wire)
338 args.insert(0, wire)
339 repo_state_uid = wire.get('repo_state_uid') if wire else None
339 repo_state_uid = wire.get('repo_state_uid') if wire else None
340
340
341 # NOTE(marcink): trading complexity for slight performance
341 # NOTE(marcink): trading complexity for slight performance
342 if log.isEnabledFor(logging.DEBUG):
342 if log.isEnabledFor(logging.DEBUG):
343 # also we SKIP printing out any of those methods args since they maybe excessive
343 # also we SKIP printing out any of those methods args since they maybe excessive
344 just_args_methods = {
344 just_args_methods = {
345 'commitctx': ('content', 'removed', 'updated'),
345 'commitctx': ('content', 'removed', 'updated'),
346 'commit': ('content', 'removed', 'updated')
346 'commit': ('content', 'removed', 'updated')
347 }
347 }
348 if method in just_args_methods:
348 if method in just_args_methods:
349 skip_args = just_args_methods[method]
349 skip_args = just_args_methods[method]
350 call_args = ''
350 call_args = ''
351 call_kwargs = {}
351 call_kwargs = {}
352 for k in kwargs:
352 for k in kwargs:
353 if k in skip_args:
353 if k in skip_args:
354 # replace our skip key with dummy
354 # replace our skip key with dummy
355 call_kwargs[k] = f'RemovedParam({k})'
355 call_kwargs[k] = f'RemovedParam({k})'
356 else:
356 else:
357 call_kwargs[k] = kwargs[k]
357 call_kwargs[k] = kwargs[k]
358 else:
358 else:
359 call_args = args[1:]
359 call_args = args[1:]
360 call_kwargs = kwargs
360 call_kwargs = kwargs
361
361
362 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
362 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
363 method, call_args, call_kwargs, context_uid, repo_state_uid)
363 method, call_args, call_kwargs, context_uid, repo_state_uid)
364
364
365 statsd = request.registry.statsd
365 statsd = request.registry.statsd
366 if statsd:
366 if statsd:
367 statsd.incr(
367 statsd.incr(
368 'vcsserver_method_total', tags=[
368 'vcsserver_method_total', tags=[
369 f"method:{method}",
369 f"method:{method}",
370 ])
370 ])
371 return payload, remote, method, args, kwargs
371 return payload, remote, method, args, kwargs
372
372
373 def vcs_view(self, request):
373 def vcs_view(self, request):
374
374
375 payload, remote, method, args, kwargs = self._vcs_view_params(request)
375 payload, remote, method, args, kwargs = self._vcs_view_params(request)
376 payload_id = payload.get('id')
376 payload_id = payload.get('id')
377
377
378 try:
378 try:
379 resp = getattr(remote, method)(*args, **kwargs)
379 resp = getattr(remote, method)(*args, **kwargs)
380 except Exception as e:
380 except Exception as e:
381 exc_info = list(sys.exc_info())
381 exc_info = list(sys.exc_info())
382 exc_type, exc_value, exc_traceback = exc_info
382 exc_type, exc_value, exc_traceback = exc_info
383
383
384 org_exc = getattr(e, '_org_exc', None)
384 org_exc = getattr(e, '_org_exc', None)
385 org_exc_name = None
385 org_exc_name = None
386 org_exc_tb = ''
386 org_exc_tb = ''
387 if org_exc:
387 if org_exc:
388 org_exc_name = org_exc.__class__.__name__
388 org_exc_name = org_exc.__class__.__name__
389 org_exc_tb = getattr(e, '_org_exc_tb', '')
389 org_exc_tb = getattr(e, '_org_exc_tb', '')
390 # replace our "faked" exception with our org
390 # replace our "faked" exception with our org
391 exc_info[0] = org_exc.__class__
391 exc_info[0] = org_exc.__class__
392 exc_info[1] = org_exc
392 exc_info[1] = org_exc
393
393
394 should_store_exc = True
394 should_store_exc = True
395 if org_exc:
395 if org_exc:
396 def get_exc_fqn(_exc_obj):
396 def get_exc_fqn(_exc_obj):
397 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
397 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
398 return module_name + '.' + org_exc_name
398 return module_name + '.' + org_exc_name
399
399
400 exc_fqn = get_exc_fqn(org_exc)
400 exc_fqn = get_exc_fqn(org_exc)
401
401
402 if exc_fqn in ['mercurial.error.RepoLookupError',
402 if exc_fqn in ['mercurial.error.RepoLookupError',
403 'vcsserver.exceptions.RefNotFoundException']:
403 'vcsserver.exceptions.RefNotFoundException']:
404 should_store_exc = False
404 should_store_exc = False
405
405
406 if should_store_exc:
406 if should_store_exc:
407 store_exception(id(exc_info), exc_info, request_path=request.path)
407 store_exception(id(exc_info), exc_info, request_path=request.path)
408
408
409 tb_info = format_exc(exc_info)
409 tb_info = format_exc(exc_info)
410
410
411 type_ = e.__class__.__name__
411 type_ = e.__class__.__name__
412 if type_ not in self.ALLOWED_EXCEPTIONS:
412 if type_ not in self.ALLOWED_EXCEPTIONS:
413 type_ = None
413 type_ = None
414
414
415 resp = {
415 resp = {
416 'id': payload_id,
416 'id': payload_id,
417 'error': {
417 'error': {
418 'message': str(e),
418 'message': str(e),
419 'traceback': tb_info,
419 'traceback': tb_info,
420 'org_exc': org_exc_name,
420 'org_exc': org_exc_name,
421 'org_exc_tb': org_exc_tb,
421 'org_exc_tb': org_exc_tb,
422 'type': type_
422 'type': type_
423 }
423 }
424 }
424 }
425
425
426 try:
426 try:
427 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
427 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
428 except AttributeError:
428 except AttributeError:
429 pass
429 pass
430 else:
430 else:
431 resp = {
431 resp = {
432 'id': payload_id,
432 'id': payload_id,
433 'result': resp
433 'result': resp
434 }
434 }
435 log.debug('Serving data for method %s', method)
435 log.debug('Serving data for method %s', method)
436 return resp
436 return resp
437
437
438 def vcs_stream_view(self, request):
438 def vcs_stream_view(self, request):
439 payload, remote, method, args, kwargs = self._vcs_view_params(request)
439 payload, remote, method, args, kwargs = self._vcs_view_params(request)
440 # this method has a stream: marker we remove it here
440 # this method has a stream: marker we remove it here
441 method = method.split('stream:')[-1]
441 method = method.split('stream:')[-1]
442 chunk_size = safe_int(payload.get('chunk_size')) or 4096
442 chunk_size = safe_int(payload.get('chunk_size')) or 4096
443
443
444 resp = getattr(remote, method)(*args, **kwargs)
444 resp = getattr(remote, method)(*args, **kwargs)
445
445
446 def get_chunked_data(method_resp):
446 def get_chunked_data(method_resp):
447 stream = io.BytesIO(method_resp)
447 stream = io.BytesIO(method_resp)
448 while 1:
448 while 1:
449 chunk = stream.read(chunk_size)
449 chunk = stream.read(chunk_size)
450 if not chunk:
450 if not chunk:
451 break
451 break
452 yield chunk
452 yield chunk
453
453
454 response = Response(app_iter=get_chunked_data(resp))
454 response = Response(app_iter=get_chunked_data(resp))
455 response.content_type = 'application/octet-stream'
455 response.content_type = 'application/octet-stream'
456
456
457 return response
457 return response
458
458
459 def status_view(self, request):
459 def status_view(self, request):
460 import vcsserver
460 import vcsserver
461 _platform_id = platform.uname()[1] or 'instance'
461 _platform_id = platform.uname()[1] or 'instance'
462
462
463 return {
463 return {
464 "status": "OK",
464 "status": "OK",
465 "vcsserver_version": vcsserver.get_version(),
465 "vcsserver_version": vcsserver.get_version(),
466 "platform": _platform_id,
466 "platform": _platform_id,
467 "pid": os.getpid(),
467 "pid": os.getpid(),
468 }
468 }
469
469
470 def service_view(self, request):
470 def service_view(self, request):
471 import vcsserver
471 import vcsserver
472
472
473 payload = msgpack.unpackb(request.body, use_list=True)
473 payload = msgpack.unpackb(request.body, use_list=True)
474 server_config, app_config = {}, {}
474 server_config, app_config = {}, {}
475
475
476 try:
476 try:
477 path = self.global_config['__file__']
477 path = self.global_config['__file__']
478 config = configparser.RawConfigParser()
478 config = configparser.RawConfigParser()
479
479
480 config.read(path)
480 config.read(path)
481
481
482 if config.has_section('server:main'):
482 if config.has_section('server:main'):
483 server_config = dict(config.items('server:main'))
483 server_config = dict(config.items('server:main'))
484 if config.has_section('app:main'):
484 if config.has_section('app:main'):
485 app_config = dict(config.items('app:main'))
485 app_config = dict(config.items('app:main'))
486
486
487 except Exception:
487 except Exception:
488 log.exception('Failed to read .ini file for display')
488 log.exception('Failed to read .ini file for display')
489
489
490 environ = list(os.environ.items())
490 environ = list(os.environ.items())
491
491
492 resp = {
492 resp = {
493 'id': payload.get('id'),
493 'id': payload.get('id'),
494 'result': dict(
494 'result': dict(
495 version=vcsserver.get_version(),
495 version=vcsserver.get_version(),
496 config=server_config,
496 config=server_config,
497 app_config=app_config,
497 app_config=app_config,
498 environ=environ,
498 environ=environ,
499 payload=payload,
499 payload=payload,
500 )
500 )
501 }
501 }
502 return resp
502 return resp
503
503
504 def _msgpack_renderer_factory(self, info):
504 def _msgpack_renderer_factory(self, info):
505
505
506 def _render(value, system):
506 def _render(value, system):
507 bin_type = False
507 bin_type = False
508 res = value.get('result')
508 res = value.get('result')
509 if isinstance(res, BytesEnvelope):
509 if isinstance(res, BytesEnvelope):
510 log.debug('Result is wrapped in BytesEnvelope type')
510 log.debug('Result is wrapped in BytesEnvelope type')
511 bin_type = True
511 bin_type = True
512 elif isinstance(res, BinaryEnvelope):
512 elif isinstance(res, BinaryEnvelope):
513 log.debug('Result is wrapped in BinaryEnvelope type')
513 log.debug('Result is wrapped in BinaryEnvelope type')
514 value['result'] = res.val
514 value['result'] = res.val
515 bin_type = True
515 bin_type = True
516
516
517 request = system.get('request')
517 request = system.get('request')
518 if request is not None:
518 if request is not None:
519 response = request.response
519 response = request.response
520 ct = response.content_type
520 ct = response.content_type
521 if ct == response.default_content_type:
521 if ct == response.default_content_type:
522 response.content_type = 'application/x-msgpack'
522 response.content_type = 'application/x-msgpack'
523 if bin_type:
523 if bin_type:
524 response.content_type = 'application/x-msgpack-bin'
524 response.content_type = 'application/x-msgpack-bin'
525
525
526 return msgpack.packb(value, use_bin_type=bin_type)
526 return msgpack.packb(value, use_bin_type=bin_type)
527 return _render
527 return _render
528
528
529 def set_env_from_config(self, environ, config):
529 def set_env_from_config(self, environ, config):
530 dict_conf = {}
530 dict_conf = {}
531 try:
531 try:
532 for elem in config:
532 for elem in config:
533 if elem[0] == 'rhodecode':
533 if elem[0] == 'rhodecode':
534 dict_conf = json.loads(elem[2])
534 dict_conf = json.loads(elem[2])
535 break
535 break
536 except Exception:
536 except Exception:
537 log.exception('Failed to fetch SCM CONFIG')
537 log.exception('Failed to fetch SCM CONFIG')
538 return
538 return
539
539
540 username = dict_conf.get('username')
540 username = dict_conf.get('username')
541 if username:
541 if username:
542 environ['REMOTE_USER'] = username
542 environ['REMOTE_USER'] = username
543 # mercurial specific, some extension api rely on this
543 # mercurial specific, some extension api rely on this
544 environ['HGUSER'] = username
544 environ['HGUSER'] = username
545
545
546 ip = dict_conf.get('ip')
546 ip = dict_conf.get('ip')
547 if ip:
547 if ip:
548 environ['REMOTE_HOST'] = ip
548 environ['REMOTE_HOST'] = ip
549
549
550 if _is_request_chunked(environ):
550 if _is_request_chunked(environ):
551 # set the compatibility flag for webob
551 # set the compatibility flag for webob
552 environ['wsgi.input_terminated'] = True
552 environ['wsgi.input_terminated'] = True
553
553
554 def hg_proxy(self):
554 def hg_proxy(self):
555 @wsgiapp
555 @wsgiapp
556 def _hg_proxy(environ, start_response):
556 def _hg_proxy(environ, start_response):
557 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
557 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
558 return app(environ, start_response)
558 return app(environ, start_response)
559 return _hg_proxy
559 return _hg_proxy
560
560
561 def git_proxy(self):
561 def git_proxy(self):
562 @wsgiapp
562 @wsgiapp
563 def _git_proxy(environ, start_response):
563 def _git_proxy(environ, start_response):
564 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
564 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
565 return app(environ, start_response)
565 return app(environ, start_response)
566 return _git_proxy
566 return _git_proxy
567
567
568 def hg_stream(self):
568 def hg_stream(self):
569 if self._use_echo_app:
569 if self._use_echo_app:
570 @wsgiapp
570 @wsgiapp
571 def _hg_stream(environ, start_response):
571 def _hg_stream(environ, start_response):
572 app = EchoApp('fake_path', 'fake_name', None)
572 app = EchoApp('fake_path', 'fake_name', None)
573 return app(environ, start_response)
573 return app(environ, start_response)
574 return _hg_stream
574 return _hg_stream
575 else:
575 else:
576 @wsgiapp
576 @wsgiapp
577 def _hg_stream(environ, start_response):
577 def _hg_stream(environ, start_response):
578 log.debug('http-app: handling hg stream')
578 log.debug('http-app: handling hg stream')
579 call_context = get_headers_call_context(environ)
579 call_context = get_headers_call_context(environ)
580
580
581 repo_path = call_context['repo_path']
581 repo_path = call_context['repo_path']
582 repo_name = call_context['repo_name']
582 repo_name = call_context['repo_name']
583 config = call_context['repo_config']
583 config = call_context['repo_config']
584
584
585 app = scm_app.create_hg_wsgi_app(
585 app = scm_app.create_hg_wsgi_app(
586 repo_path, repo_name, config)
586 repo_path, repo_name, config)
587
587
588 # Consistent path information for hgweb
588 # Consistent path information for hgweb
589 environ['PATH_INFO'] = call_context['path_info']
589 environ['PATH_INFO'] = call_context['path_info']
590 environ['REPO_NAME'] = repo_name
590 environ['REPO_NAME'] = repo_name
591 self.set_env_from_config(environ, config)
591 self.set_env_from_config(environ, config)
592
592
593 log.debug('http-app: starting app handler '
593 log.debug('http-app: starting app handler '
594 'with %s and process request', app)
594 'with %s and process request', app)
595 return app(environ, ResponseFilter(start_response))
595 return app(environ, ResponseFilter(start_response))
596 return _hg_stream
596 return _hg_stream
597
597
598 def git_stream(self):
598 def git_stream(self):
599 if self._use_echo_app:
599 if self._use_echo_app:
600 @wsgiapp
600 @wsgiapp
601 def _git_stream(environ, start_response):
601 def _git_stream(environ, start_response):
602 app = EchoApp('fake_path', 'fake_name', None)
602 app = EchoApp('fake_path', 'fake_name', None)
603 return app(environ, start_response)
603 return app(environ, start_response)
604 return _git_stream
604 return _git_stream
605 else:
605 else:
606 @wsgiapp
606 @wsgiapp
607 def _git_stream(environ, start_response):
607 def _git_stream(environ, start_response):
608 log.debug('http-app: handling git stream')
608 log.debug('http-app: handling git stream')
609
609
610 call_context = get_headers_call_context(environ)
610 call_context = get_headers_call_context(environ)
611
611
612 repo_path = call_context['repo_path']
612 repo_path = call_context['repo_path']
613 repo_name = call_context['repo_name']
613 repo_name = call_context['repo_name']
614 config = call_context['repo_config']
614 config = call_context['repo_config']
615
615
616 environ['PATH_INFO'] = call_context['path_info']
616 environ['PATH_INFO'] = call_context['path_info']
617 self.set_env_from_config(environ, config)
617 self.set_env_from_config(environ, config)
618
618
619 content_type = environ.get('CONTENT_TYPE', '')
619 content_type = environ.get('CONTENT_TYPE', '')
620
620
621 path = environ['PATH_INFO']
621 path = environ['PATH_INFO']
622 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
622 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
623 log.debug(
623 log.debug(
624 'LFS: Detecting if request `%s` is LFS server path based '
624 'LFS: Detecting if request `%s` is LFS server path based '
625 'on content type:`%s`, is_lfs:%s',
625 'on content type:`%s`, is_lfs:%s',
626 path, content_type, is_lfs_request)
626 path, content_type, is_lfs_request)
627
627
628 if not is_lfs_request:
628 if not is_lfs_request:
629 # fallback detection by path
629 # fallback detection by path
630 if GIT_LFS_PROTO_PAT.match(path):
630 if GIT_LFS_PROTO_PAT.match(path):
631 is_lfs_request = True
631 is_lfs_request = True
632 log.debug(
632 log.debug(
633 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
633 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
634 path, is_lfs_request)
634 path, is_lfs_request)
635
635
636 if is_lfs_request:
636 if is_lfs_request:
637 app = scm_app.create_git_lfs_wsgi_app(
637 app = scm_app.create_git_lfs_wsgi_app(
638 repo_path, repo_name, config)
638 repo_path, repo_name, config)
639 else:
639 else:
640 app = scm_app.create_git_wsgi_app(
640 app = scm_app.create_git_wsgi_app(
641 repo_path, repo_name, config)
641 repo_path, repo_name, config)
642
642
643 log.debug('http-app: starting app handler '
643 log.debug('http-app: starting app handler '
644 'with %s and process request', app)
644 'with %s and process request', app)
645
645
646 return app(environ, start_response)
646 return app(environ, start_response)
647
647
648 return _git_stream
648 return _git_stream
649
649
650 def handle_vcs_exception(self, exception, request):
650 def handle_vcs_exception(self, exception, request):
651 _vcs_kind = getattr(exception, '_vcs_kind', '')
651 _vcs_kind = getattr(exception, '_vcs_kind', '')
652
652
653 if _vcs_kind == 'repo_locked':
653 if _vcs_kind == 'repo_locked':
654 headers_call_context = get_headers_call_context(request.environ)
654 headers_call_context = get_headers_call_context(request.environ)
655 status_code = safe_int(headers_call_context['locked_status_code'])
655 status_code = safe_int(headers_call_context['locked_status_code'])
656
656
657 return HTTPRepoLocked(
657 return HTTPRepoLocked(
658 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
658 title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')])
659
659
660 elif _vcs_kind == 'repo_branch_protected':
660 elif _vcs_kind == 'repo_branch_protected':
661 # Get custom repo-branch-protected status code if present.
661 # Get custom repo-branch-protected status code if present.
662 return HTTPRepoBranchProtected(
662 return HTTPRepoBranchProtected(
663 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
663 title=str(exception), headers=[('X-Rc-Branch-Protection', '1')])
664
664
665 exc_info = request.exc_info
665 exc_info = request.exc_info
666 store_exception(id(exc_info), exc_info)
666 store_exception(id(exc_info), exc_info)
667
667
668 traceback_info = 'unavailable'
668 traceback_info = 'unavailable'
669 if request.exc_info:
669 if request.exc_info:
670 traceback_info = format_exc(request.exc_info)
670 traceback_info = format_exc(request.exc_info)
671
671
672 log.error(
672 log.error(
673 'error occurred handling this request for path: %s, \n%s',
673 'error occurred handling this request for path: %s, \n%s',
674 request.path, traceback_info)
674 request.path, traceback_info)
675
675
676 statsd = request.registry.statsd
676 statsd = request.registry.statsd
677 if statsd:
677 if statsd:
678 exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
678 exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
679 statsd.incr('vcsserver_exception_total',
679 statsd.incr('vcsserver_exception_total',
680 tags=[f"type:{exc_type}"])
680 tags=[f"type:{exc_type}"])
681 raise exception
681 raise exception
682
682
683
683
684 class ResponseFilter:
684 class ResponseFilter:
685
685
686 def __init__(self, start_response):
686 def __init__(self, start_response):
687 self._start_response = start_response
687 self._start_response = start_response
688
688
689 def __call__(self, status, response_headers, exc_info=None):
689 def __call__(self, status, response_headers, exc_info=None):
690 headers = tuple(
690 headers = tuple(
691 (h, v) for h, v in response_headers
691 (h, v) for h, v in response_headers
692 if not wsgiref.util.is_hop_by_hop(h))
692 if not wsgiref.util.is_hop_by_hop(h))
693 return self._start_response(status, headers, exc_info)
693 return self._start_response(status, headers, exc_info)
694
694
695
695
696 def sanitize_settings_and_apply_defaults(global_config, settings):
696 def sanitize_settings_and_apply_defaults(global_config, settings):
697 _global_settings_maker = SettingsMaker(global_config)
697 _global_settings_maker = SettingsMaker(global_config)
698 settings_maker = SettingsMaker(settings)
698 settings_maker = SettingsMaker(settings)
699
699
700 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
700 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
701
701
702 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
702 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
703 settings_maker.enable_logging(logging_conf)
703 settings_maker.enable_logging(logging_conf)
704
704
705 # Default includes, possible to change as a user
705 # Default includes, possible to change as a user
706 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
706 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
707 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
707 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
708
708
709 settings_maker.make_setting('__file__', global_config.get('__file__'))
709 settings_maker.make_setting('__file__', global_config.get('__file__'))
710
710
711 settings_maker.make_setting('pyramid.default_locale_name', 'en')
711 settings_maker.make_setting('pyramid.default_locale_name', 'en')
712 settings_maker.make_setting('locale', 'en_US.UTF-8')
712 settings_maker.make_setting('locale', 'en_US.UTF-8')
713
713
714 settings_maker.make_setting(
714 settings_maker.make_setting(
715 'core.binary_dir', '/usr/local/bin/rhodecode_bin/vcs_bin',
715 'core.binary_dir', '/usr/local/bin/rhodecode_bin/vcs_bin',
716 default_when_empty=True, parser='string:noquote')
716 default_when_empty=True, parser='string:noquote')
717
717
718 temp_store = tempfile.gettempdir()
718 temp_store = tempfile.gettempdir()
719 default_cache_dir = os.path.join(temp_store, 'rc_cache')
719 default_cache_dir = os.path.join(temp_store, 'rc_cache')
720 # save default, cache dir, and use it for all backends later.
720 # save default, cache dir, and use it for all backends later.
721 default_cache_dir = settings_maker.make_setting(
721 default_cache_dir = settings_maker.make_setting(
722 'cache_dir',
722 'cache_dir',
723 default=default_cache_dir, default_when_empty=True,
723 default=default_cache_dir, default_when_empty=True,
724 parser='dir:ensured')
724 parser='dir:ensured')
725
725
726 # exception store cache
726 # exception store cache
727 settings_maker.make_setting(
727 settings_maker.make_setting(
728 'exception_tracker.store_path',
728 'exception_tracker.store_path',
729 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
729 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
730 parser='dir:ensured'
730 parser='dir:ensured'
731 )
731 )
732
732
733 # repo_object cache defaults
733 # repo_object cache defaults
734 settings_maker.make_setting(
734 settings_maker.make_setting(
735 'rc_cache.repo_object.backend',
735 'rc_cache.repo_object.backend',
736 default='dogpile.cache.rc.file_namespace',
736 default='dogpile.cache.rc.file_namespace',
737 parser='string')
737 parser='string')
738 settings_maker.make_setting(
738 settings_maker.make_setting(
739 'rc_cache.repo_object.expiration_time',
739 'rc_cache.repo_object.expiration_time',
740 default=30 * 24 * 60 * 60, # 30days
740 default=30 * 24 * 60 * 60, # 30days
741 parser='int')
741 parser='int')
742 settings_maker.make_setting(
742 settings_maker.make_setting(
743 'rc_cache.repo_object.arguments.filename',
743 'rc_cache.repo_object.arguments.filename',
744 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
744 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
745 parser='string')
745 parser='string')
746
746
747 # statsd
747 # statsd
748 settings_maker.make_setting('statsd.enabled', False, parser='bool')
748 settings_maker.make_setting('statsd.enabled', False, parser='bool')
749 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
749 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
750 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
750 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
751 settings_maker.make_setting('statsd.statsd_prefix', '')
751 settings_maker.make_setting('statsd.statsd_prefix', '')
752 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
752 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
753
753
754 settings_maker.env_expand()
754 settings_maker.env_expand()
755
755
756
756
757 def main(global_config, **settings):
757 def main(global_config, **settings):
758 start_time = time.time()
758 start_time = time.time()
759 log.info('Pyramid app config starting')
759 log.info('Pyramid app config starting')
760
760
761 if MercurialFactory:
761 if MercurialFactory:
762 hgpatches.patch_largefiles_capabilities()
762 hgpatches.patch_largefiles_capabilities()
763 hgpatches.patch_subrepo_type_mapping()
763 hgpatches.patch_subrepo_type_mapping()
764
764
765 # Fill in and sanitize the defaults & do ENV expansion
765 # Fill in and sanitize the defaults & do ENV expansion
766 sanitize_settings_and_apply_defaults(global_config, settings)
766 sanitize_settings_and_apply_defaults(global_config, settings)
767
767
768 # init and bootstrap StatsdClient
768 # init and bootstrap StatsdClient
769 StatsdClient.setup(settings)
769 StatsdClient.setup(settings)
770
770
771 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
771 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
772 total_time = time.time() - start_time
772 total_time = time.time() - start_time
773 log.info('Pyramid app created and configured in %.2fs', total_time)
773 log.info('Pyramid app created and configured in %.2fs', total_time)
774 return pyramid_app
774 return pyramid_app
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now